]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/core/fwp_endpoint.c
Fixing bugs related to tcp tranfer. Still not working
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
index e640f8a2adf2efe58647a0e5b2ca871b2f56d4e2..79c53f7a977267f65575b29373e89104d4c84cff 100644 (file)
 #include "fwp_endpoint.h"
+#include "fwp_msgb.h"
+#include <errno.h>
 
-typedef enum {
-       FWP_SEND_EPOINT = 0,
-       FWP_RECV_EPOINT = 1,
-} fwp_endpoint_type_t;
+#include <pthread.h>
 
-typedef enum {
-       FWP_EPOINT_FREE         = 0,
-       FWP_EPOINT_INACTIVE     = 1,
-       FWP_EPOINT_UNBOUND      = 2,
-       FWP_EPOINT_BOUND        = 3,
-} fwp_endpoint_status_t;
+typedef unsigned int fwp_endpoint_id_t;
+
+static fwp_endpoint_attr_t fwp_epoint_attr_default ={
+       .reliability = FWP_EPOINT_BESTEFFORT, 
+       .max_connections = 20,
+};
 
 /**
  * Structure of FWP endpoint.
- *
- * 
  */
 struct fwp_endpoint{
-       fwp_endpoint_type_t     type;
-       /**< the vres descriptor the send endpoint is bound to */       
-       fwp_vres_d_t            vresd;
-       /**< for send enpoint it contains destination address
-        * for receive endpoint it is filled with the msg source address
+       /** endpoint attributes */
+       fwp_endpoint_attr_t     attr;
+       /** for send enpoint it contains destination address for
+        * receive endpoint it is filled with the msg source address
         */
+       fwp_vres_d_t            vresd;
        struct fwp_sockaddr     peer;   
-       /**< source port */
-       unsigned int            sport;  
-       /**< dest port */
-       unsigned int            dport;  
-       /**< dest node */
+       /** source/destination port */
+       unsigned int            port;   
+       /** destination node */
        int                     node;
-       /**< socket descriptor*/
-       int                     sockd;  
-       /**< specific operation options*/
+       /** Socket descriptor.
+        * In case of rebliable epoint it is a listen socket.
+        * */
+       int                     sockd; 
+       fd_set                  fdset;
+       fd_set                  testfds;
+       int                     *c_sockd;
+       unsigned int            nr_connections;
+       /** specific operation options*/
        int                     flags;  
-       fwp_endpoint_status_t   status;
-};
-
-typedef
-struct fwp_endpoint_table {
-       unsigned int                    nr_endpoints;
-       fwp_endpoint_t                  *entry;
-       pthread_mutex_t                 lock;
-} fwp_endpoint_table_t;
-
-/* Global variable - endpoint table */
-static fwp_endpoint_table_t  fwp_endpoint_table = {
-       .nr_endpoints = 0,
-       .entry = NULL,
-       .lock = PTHREAD_MUTEX_INITIALIZER,
 };
 
-int fwp_endpoint_table_init(unsigned int nr_endpoints)
+/**
+ * Allocates endpoint
+ *
+ * \return On success returns endpoint structure. 
+ * On error, NULL is returned. 
+ *
+ */
+static fwp_endpoint_t* fwp_endpoint_alloc()
 {
-       int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
-
-       fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
-       if (!fwp_endpoint_table.entry)
-               return -ENOMEM;
-       memset(fwp_endpoint_table.entry, 0, table_size);
-       fwp_endpoint_table.nr_endpoints = nr_endpoints;
-       return 0;
+       return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
 }
 
-static fwp_endpoint_t* fwp_endpoint_alloc()
+/**
+ * Allocates endpoint
+ *
+ * \return On success returns endpoint structure. 
+ * On error, NULL is returned. 
+ *
+ */
+static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
 {
-       int i, nr_endpoints;
-
-       /* find free vres id */
-       pthread_mutex_lock(&fwp_endpoint_table.lock);
-       i = 0;
-       nr_endpoints = fwp_endpoint_table.nr_endpoints;
-       while ((i < nr_endpoints) && 
-               (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
-               i++;
-       
-       if (i == nr_endpoints) {
-               pthread_mutex_unlock(&fwp_endpoint_table.lock);
-               return NULL;
-       }
-
-       fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
-       pthread_mutex_unlock(&fwp_endpoint_table.lock);
-       return (&fwp_endpoint_table.entry[i]);
+       free(endpoint);
 }
 
-static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
+/**
+ * Destroy endpoint
+ *
+ * \param[in] epointd Endpoint descriptor
+ * \return On success 0 is returned. 
+ * On error, negative error value is returned and errno is set appropriately. 
+ */
+int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
 {
-       epoint->status = FWP_EPOINT_FREE;
-       close(epoint->sockd);
+       if (epointd->sockd > 0) 
+               close(epointd->sockd);
+       
+       fwp_endpoint_free(epointd);     
+       return 0;
 }
 
 /**
+ * Get endpoint parameters
  *
- *
+ * \param[in] epointd Endpoint descriptor
+ * \param[out] node Node identifier
+ * \param[out] port Port
+ * \param[out] attr Endpoint`s attributes
+ * \return On success 0 is returned. 
+ * On error, negative error value is returned. 
  */
-int fwp_endpoint_get_params(unsigned int *node, unsigned int *port, int *flags
-                               fwp_endpoint_d_t epointd)
+int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node
+                               unsigned int *port, fwp_endpoint_attr_t *attr)
 {
        fwp_endpoint_t *epoint = epointd;
 
-       *node = epoint->node;
-       *port = epoint->sport;
-       *flags = epoint->flags;
+       if (node) *node = epoint->node;
+       if (port) *port = epoint->port;
+       if (attr) *attr = epoint->attr;
        
        return 0;
 }
 
+int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
+{
+       bzero(attr, sizeof(fwp_endpoint_attr_t));
+       *attr = fwp_epoint_attr_default;
+
+       return 0;
+}
+
 /**
  * Creates send endpoint
  *
  * \param[in] node IP address of destination node
  * \param[in] port UDP port
+ * \param[in] attr Endpoint attributes
+ * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
  *
- * \return On success returns identifier of endpoint. 
- * On error, negative error code is returned. 
+ * \return Zero on success, -1 on error and sets errno appropriately. 
  *
  */
-
-int fwp_send_endpoint_create(unsigned int node, unsigned int port, int flags, 
-                               fwp_endpoint_d_t *epointdp)
+int fwp_send_endpoint_create(unsigned int node,
+                               unsigned int port, 
+                               fwp_endpoint_attr_t *attr,
+                               fwp_endpoint_t **epoint)
 {      
        struct sockaddr_in *addr;
-       fwp_endpoint_t *epoint;
-       int sockd;
-       socklen_t addrlen;
+       fwp_endpoint_t *fwp_epoint;
 
-       epoint = fwp_endpoint_alloc();  
-        if (!epoint) {
-               return -ENOMEM;
+       fwp_epoint = fwp_endpoint_alloc();      
+        if (!fwp_epoint) {
+               errno = ENOMEM;
+               return -1;
        }
        
-       epoint->type = FWP_SEND_EPOINT;
+       /*epoint->type = FWP_SEND_EPOINT;
        epoint->status = FWP_EPOINT_UNBOUND;
        epoint->node = node;
-       epoint->dport = port;
-       epoint->flags = flags;
+       epoint->port = port;
+       */
+       if (attr)
+               fwp_epoint->attr  = *attr;
+       else
+               fwp_epoint->attr = fwp_epoint_attr_default;
                
-       addr = (struct sockaddr_in *)&(epoint->peer.addr);
+       addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
        bzero((char*) addr, sizeof(*addr));
        addr->sin_family = AF_INET;
        addr->sin_addr.s_addr = node;
        addr->sin_port = htons(port);
-        epoint->peer.addrlen = sizeof(struct sockaddr_in);
+        fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
        
-       if (flags && FWP_EPOINT_MNGT) {
-               if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
-                       perror("Unable to open socket");
-                       goto err;
-               }
-
-               if (connect(sockd,(struct sockaddr*) &epoint->peer.addr, 
-                               epoint->peer.addrlen)) {
-                       
-                       perror("Connect error");
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
+               fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+               if (fwp_epoint->sockd < 0) {
                        goto err;
                }
-
        } else {
-               struct sockaddr_in myaddr;
-       
-               bzero((char*) &myaddr, sizeof(myaddr));
-               myaddr.sin_family = AF_INET;
-               myaddr.sin_addr.s_addr = INADDR_ANY;
-               myaddr.sin_port = 0;
-
-               addrlen = sizeof(myaddr);
-
-               if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
-                       perror("Unable to open socket");
+               fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+               if (fwp_epoint->sockd < 0) {
                        goto err;
                }
-               
-               if (bind(sockd, (struct sockaddr*) &myaddr, 
-                       sizeof(myaddr)) == -1) {
-                       
-                       perror("Bind error");
+       
+               /* Enable broadcasts */
+               /*unsigned int yes = 1;
+               if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
+                              &yes, sizeof(yes)) == -1) {
+                       FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
                        goto err;
-               }
-               
-               getsockname(sockd, (struct sockaddr*) &myaddr, 
-                               &addrlen);
-               epoint->sport = ntohs(myaddr.sin_port);
-  
+               }*/
+       
        }
-
-       epoint->sockd = sockd;
-       FWP_DEBUG("Send endpoint created.\n"); 
-       *epointdp = epoint;
-
+       
+       if (connect(fwp_epoint->sockd,
+                       (struct sockaddr*) &fwp_epoint->peer.addr, 
+                       fwp_epoint->peer.addrlen)) {
+               FWP_DEBUG("FWp connect error\n"); 
+               goto err;
+       }
+       
+       FWP_DEBUG("FWP Send endpoint created.\n"); 
+       *epoint = fwp_epoint;
        return 0;               
 err:
-       fwp_endpoint_free(epoint);
-       return (-errno);        
+       fwp_endpoint_destroy(fwp_epoint);
+       return -1;      
 }
 
 /**
  * Creates receive endpoint
  *
  * \param[in] port UDP port
+ * \param[in] attr Endpoint attributes
+ * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
  *
- * \return On success returns identifier of endpoint. 
- * On error, negative error code is returned.
- *
+ * \return Zero on success, -1 on error and errno is set.
  */
-int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
-                               int flags, fwp_endpoint_d_t *epointdp)
+int fwp_receive_endpoint_create(unsigned int port,
+                               fwp_endpoint_attr_t *attr,
+                               fwp_endpoint_t **epoint)
 {
-       fwp_endpoint_t *epoint;
-       int sockd;
        struct sockaddr_in *addr;
-       //int rcvbuf_size = 3000;
+       fwp_endpoint_t *fwp_epoint;
 
-       epoint = fwp_endpoint_alloc();  
-        if (!epoint) {
-               return -ENOMEM;
+       fwp_epoint = fwp_endpoint_alloc();      
+        if (!fwp_epoint) {
+               errno = ENOMEM;
+               return -1;
        }
        
-       epoint->type = FWP_RECV_EPOINT;
-       epoint->status = FWP_EPOINT_UNBOUND;
-       epoint->flags  = flags;
+       /*epoint->type = FWP_RECV_EPOINT;
+       epoint->status = FWP_EPOINT_UNBOUND;*/
        
-       addr = (struct sockaddr_in *)&(epoint->peer.addr);
+       if (attr)
+               fwp_epoint->attr  = *attr;
+       else
+               fwp_epoint->attr = fwp_epoint_attr_default;
+
+       addr = (struct sockaddr_in *) &(fwp_epoint->peer.addr);
        addr->sin_family = AF_INET;
        /* TODO: set listen interface, maybe through config struct*/
        addr->sin_addr.s_addr = INADDR_ANY;
        addr->sin_port = htons(port);
-        epoint->peer.addrlen = sizeof(struct sockaddr_in);
+        fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
        
-       if (flags && FWP_EPOINT_MNGT) {
-               if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
-                       perror("Unable to open socket");
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
+               if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
+                                               IPPROTO_TCP)) < 0) {
+                       FWP_ERROR("Unable to open socket: %s", strerror(errno));
                        goto err;
-               }
+               }       
                
-               if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
-                       epoint->peer.addrlen) == -1) {
-                       
-                       perror("Bind error");
+               int yes = 1;
+               if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
+                              &yes, sizeof(yes)) == -1) {
+                       FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
                        goto err;
                }
 
-               if (listen(sockd,0)) {
-                       perror("Connect error");
+               if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
+                               fwp_epoint->peer.addrlen) == -1) {
+                       FWP_ERROR("Bind error: %s", strerror(errno));
+                       /* TODO: remove all error messages from all libraries */
                        goto err;
                }
-
+               
+               if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
+                       perror("Error on listen call\n");
+                       goto err;
+               }
+               
+               FD_ZERO(&fwp_epoint->fdset);
+               /*add listen socket */
+               FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
+               fwp_epoint->testfds = fwp_epoint->fdset;
+               fwp_epoint->c_sockd = 
+                               (int*)malloc(fwp_epoint->attr.max_connections);
+               bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
+               fwp_epoint->nr_connections = 0;
+
+               FWP_DEBUG("Reliable receive endpoint port=%d created.\n", 
+                               fwp_epoint->port); 
        } else {
-               if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
-                       perror("Unable to open socket");
+               if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
+                                               IPPROTO_UDP)) < 0) {
+                       FWP_ERROR("Unable to open socket: %s", strerror(errno));
                        goto err;
                }
                
-               if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
-                       epoint->peer.addrlen) == -1) {
+               if (bind(fwp_epoint->sockd, 
+                       (struct sockaddr*) &fwp_epoint->peer.addr, 
+                       fwp_epoint->peer.addrlen) == -1) {
                        
-                       perror("Bind error");
+                       FWP_ERROR("Bind error: %s", strerror(errno));
                        goto err;
                }
-   
+               FWP_DEBUG("Best-Effort receive endpoint port=%d created.\n", 
+                               fwp_epoint->port); 
        }
-
-       epoint->sockd = sockd;
+               
        /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
                        &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
                
-               perror("Unable to set socket buffer size");
+               FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
                return -1;
        }else {
                FWP_DEBUG("Receive endpoint buffer size is set.\n");
        }
        */
        
-       getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
-                       &epoint->peer.addrlen);
-       
-       epoint->sport = ntohs(addr->sin_port);
-       /*TODO: set node*/
-       epoint->node =  ntohl(addr->sin_addr.s_addr);
-       FWP_DEBUG("Receive endpoint port=%d created.\n", ntohs(epoint->sport)); 
-       
-       *epointdp = epoint; 
+       getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
+                       &fwp_epoint->peer.addrlen);
        
+       *epoint = fwp_epoint;   
        return 0;
-
-err:   
-       fwp_endpoint_free(epoint);
-       return (-errno);        
+err:
+       fwp_endpoint_destroy(fwp_epoint);
+       return errno;
 }
 
 /**
@@ -296,105 +304,215 @@ err:
  * \param[in] vres_id identifier of vres
  * \param[in] epoint_id send endpoint identifier
  *
- * \return On success returns 0. On error, negative error code is returned 
+ * \return On success returns 0. On error, -1 and errno is set appropriately.
  */
-int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
+int fwp_send_endpoint_bind(fwp_endpoint_t *epoint, fwp_vres_d_t vresd)
 {
-       fwp_endpoint_t *epoint = epointd;
-       
-       if (epoint->type != FWP_SEND_EPOINT) {  
-               return (-EINVAL);
-       }
+       int rv ;
+       fwp_endpoint_t *fwp_epoint = epoint;
        
-       /* link epoint-vres mutually */
-       pthread_mutex_lock(&fwp_endpoint_table.lock);
-       if (_fwp_vres_bind(vresd, epoint) < 0) { 
-               pthread_mutex_unlock(&fwp_endpoint_table.lock);
-               return -EPERM;
-       }
-
-       if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
+       fwp_epoint->vresd = vresd;      
+       rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
+       /* if send endpoint is already bound 
+       if (epoint->type == FWP_EPOINT_BOUND) {  
                fwp_send_endpoint_unbind(epoint);
-       }
-
-       epoint->vresd = vresd;
-       epoint->status = FWP_EPOINT_BOUND;
+       }*/
        
-       pthread_mutex_unlock(&fwp_endpoint_table.lock);
-       return 0;
+       return rv;
 }
 
 /**
  * Unbinds send endpoint from vres
  *
- * \param[in] id send endpoint identifie
- * \return On success returns 0. On error, negative error code is returned 
+ * \param[in] epointd Send endpoint descripto
+ * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
+int fwp_send_endpoint_unbind(fwp_endpoint_t *epoint)
 {
-       fwp_endpoint_t *epoint = epointd;
-       
+       int rv = 0;
+       fwp_endpoint_t *fwp_epoint = epoint;
+
        /* unlink epoint-vres mutually */
-       _fwp_vres_unbind(epoint->vresd);
-       epoint->status = FWP_EPOINT_UNBOUND;
+       if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0) 
+               return rv;
 
        return 0;
 }
 
+static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+{
+       int csockd;
+//     fwp_endpoint_t *fwp_epoint = epointd;
+       fwp_sockaddr_t  peer;
+       int i;
+
+       if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
+               return -1;
+
+       peer.addrlen = sizeof(struct sockaddr_in);
+       csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
+                       &peer.addrlen);
+       
+       if (csockd < 0) {
+               perror("Error on accept\n");
+               return errno;   
+       }               
+
+       FWP_DEBUG("New connection accepted\n");
+       /* find free place */           
+       i = 0;
+       while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
+                               i++;
+       fwp_epoint->c_sockd[i] = csockd; 
+       FWP_DEBUG("Index = %d\n", i);
+       fwp_epoint->nr_connections++;
+               
+       FWP_DEBUG("before\n");
+       FD_SET(csockd, &fwp_epoint->fdset);
+       FWP_DEBUG("SET fdset\n");
+       return 0;       
+} 
+
+/**
+ * Receives message from stream (TCP)
+ *
+ * \param[in] epointd Descriptor of endpoint
+ * \param[in] buffer Buffer to store message
+ * \param[in] buffer_size Size of buffer
+ *
+ * \return
+ * On success, it returns number of received bytes.  
+ * On error, -1 is returned and errno is set appropriately.
+ *
+ */
+int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer, 
+                               size_t buffer_size)
+{
+       fwp_endpoint_t *fwp_epoint = epointd;
+       fwp_sockaddr_t *peer = &fwp_epoint->peer;
+       fd_set fdset;
+       ssize_t len;
+       int i;
+
+       for (i = 0; i < fwp_epoint->nr_connections; i++) {
+               if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
+                       continue;       
+               }       
+                       
+               peer->addrlen = sizeof(struct sockaddr_in);
+               len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer, 
+                                       buffer_size,0, peer);
+
+               if (len < 0) /* Error */
+                       return len;
+               
+               FWP_DEBUG("Received tcp data\n");
+               if (len)
+                       return len;
+       
+               /* tcp connection closed */
+               FWP_DEBUG("Connection closed\n");
+               FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
+               memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1, 
+                       sizeof(int)*(fwp_epoint->nr_connections -i-1));
+               fwp_epoint->nr_connections--;
+               return 0;
+       }
+       return 0;
+}
+
 /**
  * Receives message
  *
- * \param[in] epoint_id  identificator of endpoint
- * \param[in] buffer buffer to store message
- * \param[in] buffer_size size of buffer
+ * \param[in] epointd Descriptor of endpoint
+ * \param[in] buffer Buffer to store message
+ * \param[in] buffer_size Size of buffer
  *
  * \return
  * On success, it returns number of received bytes.  
- * On error, negative error code is returned,
+ * On error, -1 is returned and errno is set appropriately.
  *
  */
-ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size)
+ssize_t fwp_recv(fwp_endpoint_t *endpoint,
+                       void *buffer, const size_t buffer_size,
+                       unsigned int *from, int flags)
 {
-       fwp_endpoint_t *epoint = epointd;
-       fwp_sockaddr_t *peer = &epoint->peer;
+       fwp_sockaddr_t *peer = &endpoint->peer;
        ssize_t len;
+       fd_set fdset;
+       fwp_endpoint_t *fwp_epoint = endpoint;
+       
+/*     if (!fwp_endpoint_is_valid(epointd)) {
+               errno = EINVAL;
+               return -1;
+       }*/
+       
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
+               len = _fwp_recvfrom(fwp_epoint->sockd, buffer, 
+                                       buffer_size, 0, peer);
+               return len;
+       }
        
-       _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
-                 peer->addr, &peer->addrlen);
+       while (1){
+       /* FIXME: What about using a loop here and continue instead of goto???? */
+               /* FWP_EPOINT_RELIABLE */
+               fdset = fwp_epoint->fdset;
+               if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
+                          (fd_set *)0, NULL) < 0) {
+               
+                       FWP_ERROR("Error in select: %s", strerror(errno));
+                       return -1;
+               }
        
-       return len;
+               if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
+                       fwp_receive_endpoint_accept(fwp_epoint);
+                       FWP_DEBUG("After accepted\n");
+                       continue;
+               }
+
+               /* Check client TCP sockets */
+               len = fwp_recv_conn(endpoint, buffer, buffer_size);
+               if (len)
+                       return len;
+       }
 }
 
 /**
  * Sends message through vres
  *
- * \param[in] epoint_id  identificator of endpoint
- * \param[in] msg message to sent
- * \param[in] size message size
+ * \param[in] epointd Endpoint descriptor
+ * \param[in] msg Message to sent
+ * \param[in] size Message size
  *
  * \return
  * On success, it returns zero.  
- * On error, negative error code is returned,
+ * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size)
+int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
 {
-       struct fwp_endpoint *epoint = epointd;
        struct fwp_msgb *msgb;
-
-       /* TODO: Validity test of epointd */
-       if (epoint->status != FWP_EPOINT_BOUND) {
-               return -EPERM;
+       /*fwp_endpoint_t *fwp_epoint;*/
+       
+/*     if (!fwp_endpoint_is_valid(epointd)){
+               errno = EINVAL;
+               return -1;
        }
+       if (!fwp_endpoint_is_bound(epointd)){
+               errno = EPERM;
+               return -1;
+       }*/
 
        /*if (flags && MSG_DONTWAIT) 
                msgb = fwp_msgb_alloc(buffer_size);
        else {*/
-               if (!(msgb = fwp_msgb_alloc(size)))
-                       return -ENOMEM;
+               if (!(msgb = fwp_msgb_alloc(size))) {
+                       errno = ENOMEM;
+                       return -1;
+               }
 
-               msgb->peer = &epoint->peer;
+               /*msgb->peer = &fwp_epoint->peer;*/
                /*msgb->data = msg;*/
                /*msgb->flags = epoint->flags;*/
                
@@ -408,6 +526,7 @@ int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size)
 
        /*}*/
 
-       return _fwp_vres_send(epoint->vresd, msgb);
+       /* TODO: test whether _fwp_vres_send is successful */
+       return fwp_vres_send(fwp_epoint->vresd, msgb);
 }