]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/core/fwp_endpoint.c
Fixing bugs related to tcp tranfer. Still not working
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
index 08a4a44b08975bc553595a8eb882443ff42858b0..79c53f7a977267f65575b29373e89104d4c84cff 100644 (file)
@@ -1,22 +1,11 @@
 #include "fwp_endpoint.h"
 #include "fwp_msgb.h"
+#include <errno.h>
 
 #include <pthread.h>
 
 typedef unsigned int fwp_endpoint_id_t;
 
-typedef enum {
-       FWP_SEND_EPOINT = 0,
-       FWP_RECV_EPOINT = 1,
-} fwp_endpoint_type_t;
-
-typedef enum {
-       FWP_EPOINT_FREE         = 0,
-       FWP_EPOINT_INACTIVE     = 1,
-       FWP_EPOINT_UNBOUND      = 2,
-       FWP_EPOINT_BOUND        = 3,
-} fwp_endpoint_status_t;
-
 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
        .reliability = FWP_EPOINT_BESTEFFORT, 
        .max_connections = 20,
@@ -25,16 +14,13 @@ static fwp_endpoint_attr_t fwp_epoint_attr_default ={
 /**
  * Structure of FWP endpoint.
  */
-typedef
 struct fwp_endpoint{
-       fwp_endpoint_type_t     type;
        /** endpoint attributes */
        fwp_endpoint_attr_t     attr;
-       /** the vres descriptor the send endpoint is bound to */        
-       fwp_vres_d_t            vresd;
        /** for send enpoint it contains destination address for
         * receive endpoint it is filled with the msg source address
         */
+       fwp_vres_d_t            vresd;
        struct fwp_sockaddr     peer;   
        /** source/destination port */
        unsigned int            port;   
@@ -50,49 +36,18 @@ struct fwp_endpoint{
        unsigned int            nr_connections;
        /** specific operation options*/
        int                     flags;  
-       fwp_endpoint_status_t   status;
-} fwp_endpoint_t;
-
-typedef
-struct fwp_endpoint_table {
-       unsigned int                    max_endpoints;
-       fwp_endpoint_t                  *entry;
-       pthread_mutex_t                 lock;
-} fwp_endpoint_table_t;
-
-/* Global variable - endpoint table */
-static fwp_endpoint_table_t  fwp_endpoint_table = {
-       .max_endpoints = 0,
-       .entry = NULL,
-       .lock = PTHREAD_MUTEX_INITIALIZER,
 };
 
-static inline int fwp_endpoint_is_valid(fwp_endpoint_d_t epointd)
-{
-       int id  = epointd - fwp_endpoint_table.entry;
-
-       if ((id < 0) || (id > fwp_endpoint_table.max_endpoints - 1)||
-               (epointd->status == FWP_EPOINT_FREE))
-               return 0;
-               
-       return 1; 
-}
-
-int fwp_endpoint_is_bound(fwp_endpoint_d_t epointd)
-{
-       return (epointd->status == FWP_EPOINT_BOUND);
-}
-
-int fwp_endpoint_table_init(unsigned int max_endpoints)
+/**
+ * Allocates endpoint
+ *
+ * \return On success returns endpoint structure. 
+ * On error, NULL is returned. 
+ *
+ */
+static fwp_endpoint_t* fwp_endpoint_alloc()
 {
-       int table_size = max_endpoints * sizeof(fwp_endpoint_t);
-
-       fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
-       if (!fwp_endpoint_table.entry)
-               return -ENOMEM;
-       memset(fwp_endpoint_table.entry, 0, table_size);
-       fwp_endpoint_table.max_endpoints = max_endpoints;
-       return 0;
+       return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
 }
 
 /**
@@ -102,26 +57,9 @@ int fwp_endpoint_table_init(unsigned int max_endpoints)
  * On error, NULL is returned. 
  *
  */
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
 {
-       int i, max_endpoints;
-
-       /* find free vres id */
-       pthread_mutex_lock(&fwp_endpoint_table.lock);
-       i = 0;
-       max_endpoints = fwp_endpoint_table.max_endpoints;
-       while ((i < max_endpoints) && 
-               (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
-               i++;
-       
-       if (i == max_endpoints) {
-               pthread_mutex_unlock(&fwp_endpoint_table.lock);
-               return NULL;
-       }
-
-       fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
-       pthread_mutex_unlock(&fwp_endpoint_table.lock);
-       return (&fwp_endpoint_table.entry[i]);
+       free(endpoint);
 }
 
 /**
@@ -129,18 +67,14 @@ static fwp_endpoint_t* fwp_endpoint_alloc()
  *
  * \param[in] epointd Endpoint descriptor
  * \return On success 0 is returned. 
- * On error, negative error value is returned. 
+ * On error, negative error value is returned and errno is set appropriately
  */
 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
 {
-       fwp_endpoint_t *epoint = epointd;
+       if (epointd->sockd > 0) 
+               close(epointd->sockd);
        
-       if (!fwp_endpoint_is_valid(epointd))
-               return -EINVAL;
-       
-       epoint->status = FWP_EPOINT_FREE;
-       if (epoint->sockd > 0) 
-               close(epoint->sockd);
+       fwp_endpoint_free(epointd);     
        return 0;
 }
 
@@ -182,77 +116,74 @@ int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
  * \param[in] attr Endpoint attributes
  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
  *
- * \return On success returns descriptor of endpoint. 
- * On error, negative error code is returned. 
+ * \return Zero on success, -1 on error and sets errno appropriately. 
  *
  */
-int fwp_send_endpoint_create(unsigned int node, unsigned int port,
-                               fwp_endpoint_attr_t *attr, 
-                               fwp_endpoint_d_t *epointdp)
+int fwp_send_endpoint_create(unsigned int node,
+                               unsigned int port, 
+                               fwp_endpoint_attr_t *attr,
+                               fwp_endpoint_t **epoint)
 {      
        struct sockaddr_in *addr;
-       fwp_endpoint_t *epoint;
+       fwp_endpoint_t *fwp_epoint;
 
-       epoint = fwp_endpoint_alloc();  
-        if (!epoint) {
-               return -ENOMEM;
+       fwp_epoint = fwp_endpoint_alloc();      
+        if (!fwp_epoint) {
+               errno = ENOMEM;
+               return -1;
        }
        
-       epoint->type = FWP_SEND_EPOINT;
+       /*epoint->type = FWP_SEND_EPOINT;
        epoint->status = FWP_EPOINT_UNBOUND;
        epoint->node = node;
        epoint->port = port;
-       if (attr) 
-               epoint->attr  = *attr;
-       else 
-               epoint->attr = fwp_epoint_attr_default;
+       */
+       if (attr)
+               fwp_epoint->attr  = *attr;
+       else
+               fwp_epoint->attr = fwp_epoint_attr_default;
                
-       addr = (struct sockaddr_in *)&(epoint->peer.addr);
+       addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
        bzero((char*) addr, sizeof(*addr));
        addr->sin_family = AF_INET;
        addr->sin_addr.s_addr = node;
        addr->sin_port = htons(port);
-        epoint->peer.addrlen = sizeof(struct sockaddr_in);
+        fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
        
-       if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
-               epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-
-               int yes = 1;
-               if (setsockopt(epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
-                              &yes, sizeof(yes)) == -1) {
-                       perror("setsockopt(SO_REUSEADDR)");
-                       return (-errno);
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
+               fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+               if (fwp_epoint->sockd < 0) {
+                       goto err;
                }
        } else {
-               epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
-
-               /* Enable broadcasts */
-               unsigned int yes = 1;
-               if (setsockopt(epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
-                              &yes, sizeof(yes)) == -1) {
-                       perror("setsockopt(SO_BROADCAST)");
-                       return (-errno);
+               fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+               if (fwp_epoint->sockd < 0) {
+                       goto err;
                }
        
-       }
-       
-       if (epoint->sockd < 0) {
-                       perror("Unable to open socket");
+               /* Enable broadcasts */
+               /*unsigned int yes = 1;
+               if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
+                              &yes, sizeof(yes)) == -1) {
+                       FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
                        goto err;
+               }*/
+       
        }
        
-       if (connect(epoint->sockd,(struct sockaddr*) &epoint->peer.addr, 
-                       epoint->peer.addrlen)) {
-               perror("Connect error");
+       if (connect(fwp_epoint->sockd,
+                       (struct sockaddr*) &fwp_epoint->peer.addr, 
+                       fwp_epoint->peer.addrlen)) {
+               FWP_DEBUG("FWp connect error\n"); 
                goto err;
        }
        
-       FWP_DEBUG("Send endpoint created.\n"); 
-       *epointdp = epoint;
+       FWP_DEBUG("FWP Send endpoint created.\n"); 
+       *epoint = fwp_epoint;
        return 0;               
 err:
-       fwp_endpoint_destroy(epoint);
-       return (-errno);        
+       fwp_endpoint_destroy(fwp_epoint);
+       return -1;      
 }
 
 /**
@@ -262,99 +193,109 @@ err:
  * \param[in] attr Endpoint attributes
  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
  *
- * \return On success returns descriptor of endpoint. 
- * On error, negative error code is returned. 
+ * \return Zero on success, -1 on error and errno is set.
  */
-int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
-                               fwp_endpoint_attr_t *attr, 
-                               fwp_endpoint_d_t *epointdp)
+int fwp_receive_endpoint_create(unsigned int port,
+                               fwp_endpoint_attr_t *attr,
+                               fwp_endpoint_t **epoint)
 {
-       fwp_endpoint_t *epoint;
        struct sockaddr_in *addr;
-       //int rcvbuf_size = 3000;
+       fwp_endpoint_t *fwp_epoint;
 
-       epoint = fwp_endpoint_alloc();  
-        if (!epoint) {
-               return -ENOMEM;
+       fwp_epoint = fwp_endpoint_alloc();      
+        if (!fwp_epoint) {
+               errno = ENOMEM;
+               return -1;
        }
        
-       epoint->type = FWP_RECV_EPOINT;
-       epoint->status = FWP_EPOINT_UNBOUND;
-       if (attr) 
-               epoint->attr  = *attr;
-       else 
-               epoint->attr = fwp_epoint_attr_default;
+       /*epoint->type = FWP_RECV_EPOINT;
+       epoint->status = FWP_EPOINT_UNBOUND;*/
        
-       addr = (struct sockaddr_in *)&(epoint->peer.addr);
+       if (attr)
+               fwp_epoint->attr  = *attr;
+       else
+               fwp_epoint->attr = fwp_epoint_attr_default;
+
+       addr = (struct sockaddr_in *) &(fwp_epoint->peer.addr);
        addr->sin_family = AF_INET;
        /* TODO: set listen interface, maybe through config struct*/
        addr->sin_addr.s_addr = INADDR_ANY;
        addr->sin_port = htons(port);
-        epoint->peer.addrlen = sizeof(struct sockaddr_in);
+        fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
        
-       if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
-               if ((epoint->sockd = socket(PF_INET, SOCK_STREAM, 
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
+               if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
                                                IPPROTO_TCP)) < 0) {
-                       perror("Unable to open socket");
+                       FWP_ERROR("Unable to open socket: %s", strerror(errno));
                        goto err;
                }       
                
-               if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
-                               epoint->peer.addrlen) == -1) {
-                       perror("Bind error");
+               int yes = 1;
+               if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
+                              &yes, sizeof(yes)) == -1) {
+                       FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
+                       goto err;
+               }
+
+               if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
+                               fwp_epoint->peer.addrlen) == -1) {
+                       FWP_ERROR("Bind error: %s", strerror(errno));
                        /* TODO: remove all error messages from all libraries */
                        goto err;
                }
                
-               listen(epoint->sockd, epoint->attr.max_connections); 
+               if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
+                       perror("Error on listen call\n");
+                       goto err;
+               }
                
-               FD_ZERO(&epoint->fdset);
-               FD_SET(epoint->sockd, &epoint->fdset); /*add listen socket */
-               epoint->testfds = epoint->fdset;
-               epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
-               bzero(epoint->c_sockd, epoint->attr.max_connections);
-               epoint->nr_connections = 0;
-
-               FWP_DEBUG("Receive endpoint\n");
-
+               FD_ZERO(&fwp_epoint->fdset);
+               /*add listen socket */
+               FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
+               fwp_epoint->testfds = fwp_epoint->fdset;
+               fwp_epoint->c_sockd = 
+                               (int*)malloc(fwp_epoint->attr.max_connections);
+               bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
+               fwp_epoint->nr_connections = 0;
+
+               FWP_DEBUG("Reliable receive endpoint port=%d created.\n", 
+                               fwp_epoint->port); 
        } else {
-               if ((epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
+               if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
                                                IPPROTO_UDP)) < 0) {
-                       perror("Unable to open socket");
+                       FWP_ERROR("Unable to open socket: %s", strerror(errno));
                        goto err;
                }
                
-               if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
-                               epoint->peer.addrlen) == -1) {
-                       perror("Bind error");
+               if (bind(fwp_epoint->sockd, 
+                       (struct sockaddr*) &fwp_epoint->peer.addr, 
+                       fwp_epoint->peer.addrlen) == -1) {
+                       
+                       FWP_ERROR("Bind error: %s", strerror(errno));
                        goto err;
                }
+               FWP_DEBUG("Best-Effort receive endpoint port=%d created.\n", 
+                               fwp_epoint->port); 
        }
                
        /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
                        &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
                
-               perror("Unable to set socket buffer size");
+               FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
                return -1;
        }else {
                FWP_DEBUG("Receive endpoint buffer size is set.\n");
        }
        */
        
-       getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
-                       &epoint->peer.addrlen);
+       getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
+                       &fwp_epoint->peer.addrlen);
        
-       epoint->port = ntohs(addr->sin_port);
-       /*TODO: set node*/
-       epoint->node =  ntohl(addr->sin_addr.s_addr);
-       FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port); 
-       
-       *epointdp = epoint; 
+       *epoint = fwp_epoint;   
        return 0;
-
-err:   
-       fwp_endpoint_destroy(epoint);
-       return (-errno);        
+err:
+       fwp_endpoint_destroy(fwp_epoint);
+       return errno;
 }
 
 /**
@@ -363,37 +304,20 @@ err:
  * \param[in] vres_id identifier of vres
  * \param[in] epoint_id send endpoint identifier
  *
- * \return On success returns 0. On error, negative error code is returned 
+ * \return On success returns 0. On error, -1 and errno is set appropriately.
  */
-int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
+int fwp_send_endpoint_bind(fwp_endpoint_t *epoint, fwp_vres_d_t vresd)
 {
-       fwp_endpoint_t *epoint = epointd;
-       int rv = 0;
-               
-       /* link epoint-vres mutually */
-       pthread_mutex_lock(&fwp_endpoint_table.lock);
-       if ((!fwp_endpoint_is_valid(epointd))|| 
-               (epoint->type != FWP_SEND_EPOINT)) {    
-               rv = -EINVAL;
-               goto err;
-       }
-       if (epoint->status == FWP_EPOINT_BOUND) { /* already bound*/
-               rv = -EPERM;
-               goto err;
-       }
-       if ((rv = _fwp_vres_bind(vresd, epoint->sockd)) < 0) { 
-               goto err;
-       }
-
+       int rv ;
+       fwp_endpoint_t *fwp_epoint = epoint;
+       
+       fwp_epoint->vresd = vresd;      
+       rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
        /* if send endpoint is already bound 
        if (epoint->type == FWP_EPOINT_BOUND) {  
                fwp_send_endpoint_unbind(epoint);
        }*/
-       epoint->vresd = vresd;
-       epoint->status = FWP_EPOINT_BOUND;
        
-err:
-       pthread_mutex_unlock(&fwp_endpoint_table.lock);
        return rv;
 }
 
@@ -401,27 +325,103 @@ err:
  * Unbinds send endpoint from vres
  *
  * \param[in] epointd Send endpoint descriptor 
- * \return On success returns 0. On error, negative error code is returned 
+ * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
+int fwp_send_endpoint_unbind(fwp_endpoint_t *epoint)
 {
-       fwp_endpoint_t *epoint = epointd;
        int rv = 0;
-
-       if (!fwp_endpoint_is_valid(epointd))
-               return -EINVAL;
-       if (epoint->status != FWP_EPOINT_BOUND)
-               return -EPERM;
+       fwp_endpoint_t *fwp_epoint = epoint;
 
        /* unlink epoint-vres mutually */
-       if ((rv = _fwp_vres_unbind(epoint->vresd)) < 0) 
+       if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0) 
                return rv;
-       epoint->status = FWP_EPOINT_UNBOUND;
 
        return 0;
 }
 
+static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+{
+       int csockd;
+//     fwp_endpoint_t *fwp_epoint = epointd;
+       fwp_sockaddr_t  peer;
+       int i;
+
+       if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
+               return -1;
+
+       peer.addrlen = sizeof(struct sockaddr_in);
+       csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
+                       &peer.addrlen);
+       
+       if (csockd < 0) {
+               perror("Error on accept\n");
+               return errno;   
+       }               
+
+       FWP_DEBUG("New connection accepted\n");
+       /* find free place */           
+       i = 0;
+       while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
+                               i++;
+       fwp_epoint->c_sockd[i] = csockd; 
+       FWP_DEBUG("Index = %d\n", i);
+       fwp_epoint->nr_connections++;
+               
+       FWP_DEBUG("before\n");
+       FD_SET(csockd, &fwp_epoint->fdset);
+       FWP_DEBUG("SET fdset\n");
+       return 0;       
+} 
+
+/**
+ * Receives message from stream (TCP)
+ *
+ * \param[in] epointd Descriptor of endpoint
+ * \param[in] buffer Buffer to store message
+ * \param[in] buffer_size Size of buffer
+ *
+ * \return
+ * On success, it returns number of received bytes.  
+ * On error, -1 is returned and errno is set appropriately.
+ *
+ */
+int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer, 
+                               size_t buffer_size)
+{
+       fwp_endpoint_t *fwp_epoint = epointd;
+       fwp_sockaddr_t *peer = &fwp_epoint->peer;
+       fd_set fdset;
+       ssize_t len;
+       int i;
+
+       for (i = 0; i < fwp_epoint->nr_connections; i++) {
+               if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
+                       continue;       
+               }       
+                       
+               peer->addrlen = sizeof(struct sockaddr_in);
+               len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer, 
+                                       buffer_size,0, peer);
+
+               if (len < 0) /* Error */
+                       return len;
+               
+               FWP_DEBUG("Received tcp data\n");
+               if (len)
+                       return len;
+       
+               /* tcp connection closed */
+               FWP_DEBUG("Connection closed\n");
+               FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
+               memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1, 
+                       sizeof(int)*(fwp_epoint->nr_connections -i-1));
+               fwp_epoint->nr_connections--;
+               return 0;
+       }
+       return 0;
+}
+
 /**
  * Receives message
  *
@@ -431,78 +431,50 @@ int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
  *
  * \return
  * On success, it returns number of received bytes.  
- * On error, negative error code is returned,
+ * On error, -1 is returned and errno is set appropriately.
  *
  */
-ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
-                       int flags)
+ssize_t fwp_recv(fwp_endpoint_t *endpoint,
+                       void *buffer, const size_t buffer_size,
+                       unsigned int *from, int flags)
 {
-       fwp_endpoint_t *epoint = epointd;
-       fwp_sockaddr_t *peer = &epoint->peer;
+       fwp_sockaddr_t *peer = &endpoint->peer;
        ssize_t len;
        fd_set fdset;
-       int csockd, i;
+       fwp_endpoint_t *fwp_epoint = endpoint;
        
-       if (!fwp_endpoint_is_valid(epointd))
-               return -EINVAL;
+/*     if (!fwp_endpoint_is_valid(epointd)) {
+               errno = EINVAL;
+               return -1;
+       }*/
        
-       if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
-               _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0
-                       peer->addr, &peer->addrlen);
+       if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
+               len = _fwp_recvfrom(fwp_epoint->sockd, buffer
+                                       buffer_size, 0, peer);
                return len;
        }
        
-       while (1) {
+       while (1){
        /* FIXME: What about using a loop here and continue instead of goto???? */
-       /* FWP_EPOINT_RELIABLE */
-       fdset = epoint->fdset;
-       if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
-               (fd_set *)0, NULL) < 0) {
+               /* FWP_EPOINT_RELIABLE */
+               fdset = fwp_epoint->fdset;
+               if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
+                          (fd_set *)0, NULL) < 0) {
                
-               perror("Error in select");
-               return (-errno);
-       }
+                       FWP_ERROR("Error in select: %s", strerror(errno));
+                       return -1;
+               }
        
-       if (FD_ISSET(epoint->sockd, &fdset)) { /* is it listen socket? */ 
-               if (epoint->nr_connections == epoint->attr.max_connections)
-                       continue;
-
-               csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
-                                &peer->addrlen);
-               
-               FWP_DEBUG("New connection accepted\n");
-               /* find free place */           
-               i = 0;
-               while ((epoint->c_sockd[i])&& 
-                       (i < epoint->nr_connections)) 
-                       i++;
-               epoint->c_sockd[i] = csockd; 
-               FWP_DEBUG("Index = %d\n", i);
-               epoint->nr_connections++;
-               
-               FD_SET(csockd, &epoint->fdset);
-               continue;
-       }
-
-       /* Check client TCP sockets */
-       for (i = 0; i < epoint->nr_connections; i++) {
-               if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
-                       _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
-                                       0, peer->addr, &peer->addrlen);
-                       
-                       FWP_DEBUG("Received tcp data\n");
-                       if (len)
-                               return len;
-                       /* tcp connection closed */
-                       FWP_DEBUG("Connection closed\n");
-                       FD_CLR(epoint->c_sockd[i], &epoint->fdset);
-                       memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
-                               sizeof(int)*(epoint->nr_connections -i-1));
-                       epoint->nr_connections--;
+               if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
+                       fwp_receive_endpoint_accept(fwp_epoint);
+                       FWP_DEBUG("After accepted\n");
                        continue;
                }
-       }
-       
+
+               /* Check client TCP sockets */
+               len = fwp_recv_conn(endpoint, buffer, buffer_size);
+               if (len)
+                       return len;
        }
 }
 
@@ -515,28 +487,32 @@ ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
  *
  * \return
  * On success, it returns zero.  
- * On error, negative error code is returned,
+ * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
+int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
 {
-       struct fwp_endpoint *epoint = epointd;
        struct fwp_msgb *msgb;
-
-       if (!fwp_endpoint_is_valid(epointd)){
-               return -EINVAL;
+       /*fwp_endpoint_t *fwp_epoint;*/
+       
+/*     if (!fwp_endpoint_is_valid(epointd)){
+               errno = EINVAL;
+               return -1;
        }
        if (!fwp_endpoint_is_bound(epointd)){
-               return -EPERM;
-       }
+               errno = EPERM;
+               return -1;
+       }*/
 
        /*if (flags && MSG_DONTWAIT) 
                msgb = fwp_msgb_alloc(buffer_size);
        else {*/
-               if (!(msgb = fwp_msgb_alloc(size)))
-                       return -ENOMEM;
+               if (!(msgb = fwp_msgb_alloc(size))) {
+                       errno = ENOMEM;
+                       return -1;
+               }
 
-               msgb->peer = &epoint->peer;
+               /*msgb->peer = &fwp_epoint->peer;*/
                /*msgb->data = msg;*/
                /*msgb->flags = epoint->flags;*/
                
@@ -551,6 +527,6 @@ int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
        /*}*/
 
        /* TODO: test whether _fwp_vres_send is successful */
-       return _fwp_vres_send(epoint->vresd, msgb);
+       return fwp_vres_send(fwp_epoint->vresd, msgb);
 }