]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
Set socket option RESUEADDR for management communication
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2 #include "fwp_msgb.h"
3
4 #include <pthread.h>
5
6 typedef unsigned int fwp_endpoint_id_t;
7
8 typedef enum {
9         FWP_SEND_EPOINT = 0,
10         FWP_RECV_EPOINT = 1,
11 } fwp_endpoint_type_t;
12
13 typedef enum {
14         FWP_EPOINT_FREE         = 0,
15         FWP_EPOINT_INACTIVE     = 1,
16         FWP_EPOINT_UNBOUND      = 2,
17         FWP_EPOINT_BOUND        = 3,
18 } fwp_endpoint_status_t;
19
20 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
21         .reliability = FWP_EPOINT_BESTEFFORT, 
22         .max_connections = 20,
23 };
24
25 /**
26  * Structure of FWP endpoint.
27  */
28 typedef
29 struct fwp_endpoint{
30         fwp_endpoint_type_t     type;
31         /** endpoint attributes */
32         fwp_endpoint_attr_t     attr;
33         /** the vres descriptor the send endpoint is bound to */        
34         fwp_vres_d_t            vresd;
35         /** for send enpoint it contains destination address for
36          * receive endpoint it is filled with the msg source address
37          */
38         struct fwp_sockaddr     peer;   
39         /** source/destination port */
40         unsigned int            port;   
41         /** destination node */
42         int                     node;
43         /** Socket descriptor.
44          * In case of rebliable epoint it is a listen socket.
45          * */
46         int                     sockd; 
47         fd_set                  fdset;
48         fd_set                  testfds;
49         int                     *c_sockd;
50         unsigned int            nr_connections;
51         /** specific operation options*/
52         int                     flags;  
53         fwp_endpoint_status_t   status;
54 } fwp_endpoint_t;
55
56 typedef
57 struct fwp_endpoint_table {
58         unsigned int                    max_endpoints;
59         fwp_endpoint_t                  *entry;
60         pthread_mutex_t                 lock;
61 } fwp_endpoint_table_t;
62
63 /* Global variable - endpoint table */
64 static fwp_endpoint_table_t  fwp_endpoint_table = {
65         .max_endpoints = 0,
66         .entry = NULL,
67         .lock = PTHREAD_MUTEX_INITIALIZER,
68 };
69
70 static inline int fwp_endpoint_is_valid(fwp_endpoint_d_t epointd)
71 {
72         int id  = epointd - fwp_endpoint_table.entry;
73
74         if ((id < 0) || (id > fwp_endpoint_table.max_endpoints - 1)||
75                 (epointd->status == FWP_EPOINT_FREE))
76                 return 0;
77                 
78         return 1; 
79 }
80
81 int fwp_endpoint_is_bound(fwp_endpoint_d_t epointd)
82 {
83         return (epointd->status == FWP_EPOINT_BOUND);
84 }
85
86 int fwp_endpoint_table_init(unsigned int max_endpoints)
87 {
88         int table_size = max_endpoints * sizeof(fwp_endpoint_t);
89
90         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
91         if (!fwp_endpoint_table.entry)
92                 return -ENOMEM;
93         memset(fwp_endpoint_table.entry, 0, table_size);
94         fwp_endpoint_table.max_endpoints = max_endpoints;
95         return 0;
96 }
97
98 /**
99  * Allocates endpoint
100  *
101  * \return On success returns endpoint structure. 
102  * On error, NULL is returned. 
103  *
104  */
105 static fwp_endpoint_t* fwp_endpoint_alloc()
106 {
107         int i, max_endpoints;
108
109         /* find free vres id */
110         pthread_mutex_lock(&fwp_endpoint_table.lock);
111         i = 0;
112         max_endpoints = fwp_endpoint_table.max_endpoints;
113         while ((i < max_endpoints) && 
114                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
115                 i++;
116         
117         if (i == max_endpoints) {
118                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
119                 return NULL;
120         }
121
122         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
123         pthread_mutex_unlock(&fwp_endpoint_table.lock);
124         return (&fwp_endpoint_table.entry[i]);
125 }
126
127 /**
128  * Destroy endpoint
129  *
130  * \param[in] epointd Endpoint descriptor
131  * \return On success 0 is returned. 
132  * On error, negative error value is returned. 
133  */
134 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
135 {
136         fwp_endpoint_t *epoint = epointd;
137         
138         if (!fwp_endpoint_is_valid(epointd))
139                 return -EINVAL;
140         
141         epoint->status = FWP_EPOINT_FREE;
142         if (epoint->sockd > 0) 
143                 close(epoint->sockd);
144         return 0;
145 }
146
147 /**
148  * Get endpoint parameters
149  *
150  * \param[in] epointd Endpoint descriptor
151  * \param[out] node Node identifier
152  * \param[out] port Port
153  * \param[out] attr Endpoint`s attributes
154  * \return On success 0 is returned. 
155  * On error, negative error value is returned. 
156  */
157 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
158                                 unsigned int *port, fwp_endpoint_attr_t *attr)
159 {
160         fwp_endpoint_t *epoint = epointd;
161
162         *node = epoint->node;
163         *port = epoint->port;
164         *attr = epoint->attr;
165         
166         return 0;
167 }
168
169 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
170 {
171         bzero(attr, sizeof(fwp_endpoint_attr_t));
172         *attr = fwp_epoint_attr_default;
173
174         return 0;
175 }
176
177 /**
178  * Creates send endpoint
179  *
180  * \param[in] node IP address of destination node
181  * \param[in] port UDP port
182  * \param[in] attr Endpoint attributes
183  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
184  *
185  * \return On success returns descriptor of endpoint. 
186  * On error, negative error code is returned. 
187  *
188  */
189 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
190                                 fwp_endpoint_attr_t *attr, 
191                                 fwp_endpoint_d_t *epointdp)
192 {       
193         struct sockaddr_in *addr;
194         fwp_endpoint_t *epoint;
195
196         epoint = fwp_endpoint_alloc();  
197         if (!epoint) {
198                 return -ENOMEM;
199         }
200         
201         epoint->type = FWP_SEND_EPOINT;
202         epoint->status = FWP_EPOINT_UNBOUND;
203         epoint->node = node;
204         epoint->port = port;
205         if (attr) 
206                 epoint->attr  = *attr;
207         else 
208                 epoint->attr = fwp_epoint_attr_default;
209                 
210         addr = (struct sockaddr_in *)&(epoint->peer.addr);
211         bzero((char*) addr, sizeof(*addr));
212         addr->sin_family = AF_INET;
213         addr->sin_addr.s_addr = node;
214         addr->sin_port = htons(port);
215         epoint->peer.addrlen = sizeof(struct sockaddr_in);
216         
217         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
218                 epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
219
220                 int yes = 1;
221                 if (setsockopt(epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
222                                &yes, sizeof(yes)) == -1) {
223                         perror("setsockopt(SO_REUSEADDR)");
224                         return (-errno);
225                 }
226         } else {
227                 epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
228
229                 /* Enable broadcasts */
230                 unsigned int yes = 1;
231                 if (setsockopt(epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
232                                &yes, sizeof(yes)) == -1) {
233                         perror("setsockopt(SO_BROADCAST)");
234                         return (-errno);
235                 }
236         
237         }
238         
239         if (epoint->sockd < 0) {
240                         perror("Unable to open socket");
241                         goto err;
242         }
243         
244         if (connect(epoint->sockd,(struct sockaddr*) &epoint->peer.addr, 
245                         epoint->peer.addrlen)) {
246                 perror("Connect error");
247                 goto err;
248         }
249         
250         FWP_DEBUG("Send endpoint created.\n"); 
251         *epointdp = epoint;
252         return 0;               
253 err:
254         fwp_endpoint_destroy(epoint);
255         return (-errno);        
256 }
257
258 /**
259  * Creates receive endpoint
260  *
261  * \param[in] port UDP port
262  * \param[in] attr Endpoint attributes
263  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
264  *
265  * \return On success returns descriptor of endpoint. 
266  * On error, negative error code is returned. 
267  */
268 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
269                                 fwp_endpoint_attr_t *attr, 
270                                 fwp_endpoint_d_t *epointdp)
271 {
272         fwp_endpoint_t *epoint;
273         struct sockaddr_in *addr;
274         //int rcvbuf_size = 3000;
275
276         epoint = fwp_endpoint_alloc();  
277         if (!epoint) {
278                 return -ENOMEM;
279         }
280         
281         epoint->type = FWP_RECV_EPOINT;
282         epoint->status = FWP_EPOINT_UNBOUND;
283         if (attr) 
284                 epoint->attr  = *attr;
285         else 
286                 epoint->attr = fwp_epoint_attr_default;
287         
288         addr = (struct sockaddr_in *)&(epoint->peer.addr);
289         addr->sin_family = AF_INET;
290         /* TODO: set listen interface, maybe through config struct*/
291         addr->sin_addr.s_addr = INADDR_ANY;
292         addr->sin_port = htons(port);
293         epoint->peer.addrlen = sizeof(struct sockaddr_in);
294         
295         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
296                 if ((epoint->sockd = socket(PF_INET, SOCK_STREAM, 
297                                                 IPPROTO_TCP)) < 0) {
298                         perror("Unable to open socket");
299                         goto err;
300                 }       
301                 
302                 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
303                                 epoint->peer.addrlen) == -1) {
304                         perror("Bind error");
305                         /* TODO: remove all error messages from all libraries */
306                         goto err;
307                 }
308                 
309                 listen(epoint->sockd, epoint->attr.max_connections); 
310                 
311                 FD_ZERO(&epoint->fdset);
312                 FD_SET(epoint->sockd, &epoint->fdset); /*add listen socket */
313                 epoint->testfds = epoint->fdset;
314                 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
315                 bzero(epoint->c_sockd, epoint->attr.max_connections);
316                 epoint->nr_connections = 0;
317
318                 FWP_DEBUG("Receive endpoint\n");
319
320         } else {
321                 if ((epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
322                                                 IPPROTO_UDP)) < 0) {
323                         perror("Unable to open socket");
324                         goto err;
325                 }
326                 
327                 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
328                                 epoint->peer.addrlen) == -1) {
329                         perror("Bind error");
330                         goto err;
331                 }
332         }
333                 
334         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
335                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
336                 
337                 perror("Unable to set socket buffer size");
338                 return -1;
339         }else {
340                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
341         }
342         */
343         
344         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
345                         &epoint->peer.addrlen);
346         
347         epoint->port = ntohs(addr->sin_port);
348         /*TODO: set node*/
349         epoint->node =  ntohl(addr->sin_addr.s_addr);
350         FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port); 
351         
352         *epointdp = epoint; 
353         return 0;
354
355 err:    
356         fwp_endpoint_destroy(epoint);
357         return (-errno);        
358 }
359
360 /**
361  * Binds send endpoint to vres
362  *
363  * \param[in] vres_id identifier of vres
364  * \param[in] epoint_id send endpoint identifier
365  *
366  * \return On success returns 0. On error, negative error code is returned 
367  */
368 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
369 {
370         fwp_endpoint_t *epoint = epointd;
371         int rv = 0;
372                 
373         /* link epoint-vres mutually */
374         pthread_mutex_lock(&fwp_endpoint_table.lock);
375         if ((!fwp_endpoint_is_valid(epointd))|| 
376                 (epoint->type != FWP_SEND_EPOINT)) {    
377                 rv = -EINVAL;
378                 goto err;
379         }
380         if (epoint->status == FWP_EPOINT_BOUND) { /* already bound*/
381                 rv = -EPERM;
382                 goto err;
383         }
384         if ((rv = _fwp_vres_bind(vresd, epoint->sockd)) < 0) { 
385                 goto err;
386         }
387
388         /* if send endpoint is already bound 
389         if (epoint->type == FWP_EPOINT_BOUND) {  
390                 fwp_send_endpoint_unbind(epoint);
391         }*/
392         epoint->vresd = vresd;
393         epoint->status = FWP_EPOINT_BOUND;
394         
395 err:
396         pthread_mutex_unlock(&fwp_endpoint_table.lock);
397         return rv;
398 }
399
400 /**
401  * Unbinds send endpoint from vres
402  *
403  * \param[in] epointd Send endpoint descriptor 
404  * \return On success returns 0. On error, negative error code is returned 
405  *
406  */
407 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
408 {
409         fwp_endpoint_t *epoint = epointd;
410         int rv = 0;
411
412         if (!fwp_endpoint_is_valid(epointd))
413                 return -EINVAL;
414         if (epoint->status != FWP_EPOINT_BOUND)
415                 return -EPERM;
416
417         /* unlink epoint-vres mutually */
418         if ((rv = _fwp_vres_unbind(epoint->vresd)) < 0) 
419                 return rv;
420         epoint->status = FWP_EPOINT_UNBOUND;
421
422         return 0;
423 }
424
425 /**
426  * Receives message
427  *
428  * \param[in] epointd Descriptor of endpoint
429  * \param[in] buffer Buffer to store message
430  * \param[in] buffer_size Size of buffer
431  *
432  * \return
433  * On success, it returns number of received bytes.  
434  * On error, negative error code is returned,
435  *
436  */
437 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
438                         int flags)
439 {
440         fwp_endpoint_t *epoint = epointd;
441         fwp_sockaddr_t *peer = &epoint->peer;
442         ssize_t len;
443         fd_set fdset;
444         int csockd, i;
445         
446         if (!fwp_endpoint_is_valid(epointd))
447                 return -EINVAL;
448         
449         if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
450                 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
451                         peer->addr, &peer->addrlen);
452                 return len;
453         }
454         
455         while (1) {
456         /* FIXME: What about using a loop here and continue instead of goto???? */
457         /* FWP_EPOINT_RELIABLE */
458         fdset = epoint->fdset;
459         FWP_DEBUG("Before select\n");
460         if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
461                 (fd_set *)0, NULL) < 0) {
462                 
463                 perror("Error in select");
464                 return (-errno);
465         }
466         FWP_DEBUG("After select\n");
467         
468         if (FD_ISSET(epoint->sockd, &fdset)) { /* is it listen socket? */ 
469                 if (epoint->nr_connections == epoint->attr.max_connections)
470                         continue;
471
472                 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
473                                  &peer->addrlen);
474                 
475                 FWP_DEBUG("New connection accepted\n");
476                 /* find free place */           
477                 i = 0;
478                 while ((epoint->c_sockd[i])&& 
479                         (i < epoint->nr_connections)) 
480                         i++;
481                 epoint->c_sockd[i] = csockd; 
482                 FWP_DEBUG("Index = %d\n", i);
483                 epoint->nr_connections++;
484                 
485                 FD_SET(csockd, &epoint->fdset);
486                 continue;
487         }
488
489         /* Check client TCP sockets */
490         for (i = 0; i < epoint->nr_connections; i++) {
491                 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
492                         _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
493                                         0, peer->addr, &peer->addrlen);
494                         
495                         FWP_DEBUG("Received tcp data\n");
496                         if (len)
497                                 return len;
498                         /* tcp connection closed */
499                         FWP_DEBUG("Connection closed\n");
500                         FD_CLR(epoint->c_sockd[i], &epoint->fdset);
501                         memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
502                                 sizeof(int)*(epoint->nr_connections -i-1));
503                         epoint->nr_connections--;
504                         continue;
505                 }
506         }
507         
508         }
509 }
510
511 /**
512  * Sends message through vres
513  *
514  * \param[in] epointd Endpoint descriptor
515  * \param[in] msg Message to sent
516  * \param[in] size Message size
517  *
518  * \return
519  * On success, it returns zero.  
520  * On error, negative error code is returned,
521  *
522  */
523 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
524 {
525         struct fwp_endpoint *epoint = epointd;
526         struct fwp_msgb *msgb;
527
528         if (!fwp_endpoint_is_valid(epointd)){
529                 return -EINVAL;
530         }
531         if (!fwp_endpoint_is_bound(epointd)){
532                 return -EPERM;
533         }
534
535         /*if (flags && MSG_DONTWAIT) 
536                 msgb = fwp_msgb_alloc(buffer_size);
537         else {*/
538                 if (!(msgb = fwp_msgb_alloc(size)))
539                         return -ENOMEM;
540
541                 msgb->peer = &epoint->peer;
542                 /*msgb->data = msg;*/
543                 /*msgb->flags = epoint->flags;*/
544                 
545                 /* data must be copied since msg may change while
546                  * message is waiting in transmission queue 
547                  * */
548                 memcpy(msgb->data, msg, size);
549                 fwp_msgb_put(msgb, size);
550                 /*msgb->tail = msgb->data + size;
551                 msgb->len = size;*/
552
553         /*}*/
554
555         /* TODO: test whether _fwp_vres_send is successful */
556         return _fwp_vres_send(epoint->vresd, msgb);
557 }
558