]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_endpoint.c
Rename lib/core to lib/fwp. Clean-ups
[frescor/fwp.git] / fwp / lib / fwp / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2 #include "fwp_msgb.h"
3 #include <errno.h>
4
5 #include <pthread.h>
6
7 typedef unsigned int fwp_endpoint_id_t;
8
9 /**
10  * Default fwp endpoint attributes
11  */
12 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
13         .reliability = FWP_EPOINT_BESTEFFORT, 
14         .max_connections = 20,
15 };
16
17 /**
18  * FWP endpoint structure
19  */
20 struct fwp_endpoint{
21         /** Fwp endpoint attributes */
22         fwp_endpoint_attr_t     attr;
23         /* Vres this fwp endpoint is bound to */
24         fwp_vres_d_t            vresd;
25         /** For send enpoint it contains destination address for
26          * receive endpoint it is filled with the msg source address
27          */
28         struct fwp_sockaddr     peer;   
29         /** Source/destination port */
30         unsigned int            port;   
31         /** Destination node */
32         int                     node;
33         /** Socket descriptor.
34          * In case of rebliable epoint it is a listen tcp socket.
35          */
36         int                     sockd; 
37         /** File descriptor array of peers connected 
38          * to this fwp receive endpoint.*/
39         int                     *c_sockd;
40         /**
41          * Number of connections 
42          */
43         unsigned int            nr_connections;
44         /** client fdset */
45         fd_set                  fdset;
46         fd_set                  testfds;
47         /** specific operation options*/
48         int                     flags;  
49 };
50
51 /**
52  * Allocates endpoint
53  *
54  * \return On success returns fwp endpoint structure. 
55  * On error, NULL is returned. 
56  *
57  */
58 static fwp_endpoint_t* fwp_endpoint_alloc()
59 {
60         return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
61 }
62
63 /**
64  * Allocates endpoint
65  *
66  * \return On success returns endpoint structure. 
67  * On error, NULL is returned. 
68  *
69  */
70 static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
71 {
72         free(endpoint);
73 }
74
75 /**
76  * Destroy endpoint
77  *
78  * \param[in] epointd Endpoint descriptor
79  * \return On success 0 is returned. 
80  * On error, negative error value is returned and errno is set appropriately. 
81  */
82 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
83 {
84         if (epointd->sockd > 0) 
85                 close(epointd->sockd);
86         
87         fwp_endpoint_free(epointd);     
88         return 0;
89 }
90
91 /**
92  * Get endpoint parameters
93  *
94  * \param[in] epointd Endpoint descriptor
95  * \param[out] node Node identifier
96  * \param[out] port Port
97  * \param[out] attr Endpoint`s attributes
98  * \return On success 0 is returned. 
99  * On error, negative error value is returned. 
100  */
101 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
102                                 unsigned int *port, fwp_endpoint_attr_t *attr)
103 {
104         fwp_endpoint_t *epoint = epointd;
105
106         if (node) *node = epoint->node;
107         if (port) *port = epoint->port;
108         if (attr) *attr = epoint->attr;
109         
110         return 0;
111 }
112
113 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
114 {
115         bzero(attr, sizeof(fwp_endpoint_attr_t));
116         *attr = fwp_epoint_attr_default;
117
118         return 0;
119 }
120
121 /**
122  * Creates send endpoint
123  *
124  * \param[in] node IP address of destination node
125  * \param[in] port UDP port
126  * \param[in] attr Endpoint attributes
127  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
128  *
129  * \return Zero on success, -1 on error and sets errno appropriately. 
130  *
131  */
132 int fwp_send_endpoint_create(unsigned int node,
133                                 unsigned int port, 
134                                 fwp_endpoint_attr_t *attr,
135                                 fwp_endpoint_d_t *epointd)
136 {       
137         struct sockaddr_in *addr;
138         fwp_endpoint_t *fwp_epoint;
139
140         fwp_epoint = fwp_endpoint_alloc();      
141         if (!fwp_epoint) {
142                 errno = ENOMEM;
143                 return -1;
144         }
145         
146         /*epoint->type = FWP_SEND_EPOINT;
147         epoint->status = FWP_EPOINT_UNBOUND;
148         epoint->node = node;
149         epoint->port = port;
150         */
151         if (attr)
152                 fwp_epoint->attr  = *attr;
153         else
154                 fwp_epoint->attr = fwp_epoint_attr_default;
155                 
156         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
157         bzero((char*) addr, sizeof(*addr));
158         addr->sin_family = AF_INET;
159         addr->sin_addr.s_addr = node;
160         addr->sin_port = htons(port);
161         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
162         
163         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
164                 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
165                 if (fwp_epoint->sockd < 0) {
166                         goto err;
167                 }
168         } else {
169                 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
170                 if (fwp_epoint->sockd < 0) {
171                         goto err;
172                 }
173         
174                 /* Enable broadcasts */
175                 /*unsigned int yes = 1;
176                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
177                                &yes, sizeof(yes)) == -1) {
178                         FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
179                         goto err;
180                 }*/
181         
182         }
183         
184         unsigned int yes = 1;
185         if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR, 
186                                &yes, sizeof(yes)) == -1) {
187                         FWP_DEBUG("setsockopt(SO_REUSEADDR): %s", strerror(errno));
188                         goto err;
189         }
190
191         if (connect(fwp_epoint->sockd,
192                         (struct sockaddr*) &fwp_epoint->peer.addr, 
193                         fwp_epoint->peer.addrlen)) {
194                 FWP_DEBUG("FWp connect error\n"); 
195                 goto err;
196         }
197         
198         FWP_DEBUG("FWP Send endpoint created.\n"); 
199
200 #ifndef FWP_WITH_CONTNEGT
201         /* Create vres with default parameters */
202         FWP_DEBUG("Creating default vres\n");
203         if (fwp_vres_create(&fwp_vres_params_default, &fwp_epoint->vresd)) {
204                 goto err;
205         }
206         
207         fwp_send_endpoint_bind(fwp_epoint, fwp_epoint->vresd);
208 #endif
209         
210         *epointd = fwp_epoint;
211         return 0;               
212 err:
213         fwp_endpoint_destroy(fwp_epoint);
214         return -1;      
215 }
216
217 /**
218  * Creates receive endpoint
219  *
220  * \param[in] port UDP port
221  * \param[in] attr Endpoint attributes
222  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
223  *
224  * \return Zero on success, -1 on error and errno is set.
225  */
226 int fwp_receive_endpoint_create(unsigned int port,
227                                 fwp_endpoint_attr_t *attr,
228                                 fwp_endpoint_d_t *epointd)
229 {
230         struct sockaddr_in *addr;
231         fwp_endpoint_t *fwp_epoint;
232
233         fwp_epoint = fwp_endpoint_alloc();      
234         if (!fwp_epoint) {
235                 errno = ENOMEM;
236                 return -1;
237         }
238         
239         /*epoint->type = FWP_RECV_EPOINT;
240         epoint->status = FWP_EPOINT_UNBOUND;*/
241         
242         if (attr)
243                 fwp_epoint->attr  = *attr;
244         else
245                 fwp_epoint->attr = fwp_epoint_attr_default;
246
247         addr = (struct sockaddr_in *) &(fwp_epoint->peer.addr);
248         addr->sin_family = AF_INET;
249         /* TODO: set listen interface, maybe through config struct*/
250         addr->sin_addr.s_addr = FWP_ANY_NODE;
251         fwp_epoint->port  = addr->sin_port = htons(port);
252         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
253         
254         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
255                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
256                                                 IPPROTO_TCP)) < 0) {
257                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
258                         goto err;
259                 }       
260                 
261                 int yes = 1;
262                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
263                                &yes, sizeof(yes)) == -1) {
264                         FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
265                         goto err;
266                 }
267
268                 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
269                                 fwp_epoint->peer.addrlen) == -1) {
270                         FWP_ERROR("Bind error: %s", strerror(errno));
271                         /* TODO: remove all error messages from all libraries */
272                         goto err;
273                 }
274                 
275                 if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
276                         perror("Error on listen call\n");
277                         goto err;
278                 }
279                 
280                 FD_ZERO(&fwp_epoint->fdset);
281                 /*add listen socket */
282                 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
283                 fwp_epoint->testfds = fwp_epoint->fdset;
284                 fwp_epoint->c_sockd = 
285                                 (int*)malloc(fwp_epoint->attr.max_connections);
286                 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
287                 fwp_epoint->nr_connections = 0;
288
289                 FWP_DEBUG("Reliable receive endpoint port=%d created.\n", 
290                                 fwp_epoint->port); 
291         } else {
292                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
293                                                 IPPROTO_UDP)) < 0) {
294                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
295                         goto err;
296                 }
297                 
298                 if (bind(fwp_epoint->sockd, 
299                         (struct sockaddr*) &fwp_epoint->peer.addr, 
300                         fwp_epoint->peer.addrlen) == -1) {
301                         
302                         FWP_ERROR("Bind error: %s", strerror(errno));
303                         goto err;
304                 }
305                 FWP_DEBUG("Best-Effort receive endpoint port=%d created.\n", 
306                                 fwp_epoint->port); 
307         }
308                 
309         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
310                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
311                 
312                 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
313                 return -1;
314         }else {
315                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
316         }
317         */
318         
319         getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
320                         &fwp_epoint->peer.addrlen);
321
322         addr = (struct sockaddr_in*) fwp_epoint->peer.addr;
323         FWP_DEBUG("Recv port= %d\n",ntohs(addr->sin_port));     
324         *epointd = fwp_epoint;  
325         return 0;
326 err:
327         fwp_endpoint_destroy(fwp_epoint);
328         return errno;
329 }
330
331 /**
332  * Binds send endpoint to vres
333  *
334  * \param[in] vres_id identifier of vres
335  * \param[in] epoint_id send endpoint identifier
336  *
337  * \return On success returns 0. On error, -1 and errno is set appropriately.
338  */
339 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
340 {
341         int rv = 0;
342 #ifdef FWP_WITH_CONTNEGT
343         fwp_endpoint_t *fwp_epoint = epointd;
344         
345         fwp_epoint->vresd = vresd;      
346         rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
347 #endif
348         /* if send endpoint is already bound 
349         if (epoint->type == FWP_EPOINT_BOUND) {  
350                 fwp_send_endpoint_unbind(epoint);
351         }*/
352         return rv;
353 }
354
355 /**
356  * Unbinds send endpoint from vres
357  *
358  * \param[in] epointd Send endpoint descriptor 
359  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
360  *
361  */
362 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
363 {
364         int rv = 0;
365         fwp_endpoint_t *fwp_epoint = epointd;
366
367         /* unlink epoint-vres mutually */
368         if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0) 
369                 return rv;
370
371         return 0;
372 }
373
374 /**
375  * Accepts (TCP) client connection to receive endpoint
376  *
377  * \param[in] epointd Pointer to fwp endpoint
378  * \return
379  * On success, it returns zero.  
380  *
381  */
382 static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
383 {
384         int csockd;
385 //      fwp_endpoint_t *fwp_epoint = epointd;
386         fwp_sockaddr_t  peer;
387         int i;
388
389         if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
390                 return -1;
391
392         peer.addrlen = sizeof(struct sockaddr_in);
393         csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
394                         &peer.addrlen);
395         
396         if (csockd < 0) {
397                 perror("Error on accept\n");
398                 return errno;   
399         }               
400
401         FWP_DEBUG("New connection accepted\n");
402         /* find free place */           
403         i = 0;
404         while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
405                                 i++;
406         fwp_epoint->c_sockd[i] = csockd; 
407         fwp_epoint->nr_connections++;
408                 
409         FD_SET(csockd, &fwp_epoint->fdset);
410         return 0;       
411
412
413 /**
414  * Receives message from stream (TCP)
415  *
416  * \param[in] epointd Descriptor of endpoint
417  * \param[in] buffer Buffer to store message
418  * \param[in] buffer_size Size of buffer
419  *
420  * \return
421  * On success, it returns number of received bytes.  
422  * On error, -1 is returned and errno is set appropriately.
423  *
424  */
425 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer, 
426                                 size_t buffer_size)
427 {
428         fwp_endpoint_t *fwp_epoint = epointd;
429         fwp_sockaddr_t *peer = &fwp_epoint->peer;
430         fd_set fdset = fwp_epoint->fdset;
431         ssize_t len;
432         int i;
433
434         FWP_DEBUG("Checking for tcp data\n");
435         for (i = 0; i < fwp_epoint->nr_connections; i++) {
436                 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
437                         continue;       
438                 }       
439                         
440                 FWP_DEBUG("Prepare to receive tcp data\n");
441                 peer->addrlen = sizeof(struct sockaddr_in);
442                 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer, 
443                                         buffer_size,0, peer);
444
445                 if (len < 0) /* Error */
446                         return len;
447                 
448                 FWP_DEBUG("Received tcp data\n");
449                 if (len)
450                         return len;
451         
452                 /* tcp connection closed */
453                 FWP_DEBUG("Connection closed\n");
454                 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
455                 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1, 
456                         sizeof(int)*(fwp_epoint->nr_connections -i-1));
457                 fwp_epoint->nr_connections--;
458                 return 0;
459         }
460         return 0;
461 }
462
463 /**
464  * Receives message
465  *
466  * \param[in] epointd Descriptor of endpoint
467  * \param[in] buffer Buffer to store message
468  * \param[in] buffer_size Size of buffer
469  *
470  * \return
471  * On success, it returns number of received bytes.  
472  * On error, -1 is returned and errno is set appropriately.
473  *
474  */
475 ssize_t fwp_recv(fwp_endpoint_d_t epointd,
476                         void *buffer, const size_t buffer_size,
477                         unsigned int *from, int flags)
478 {
479         fwp_endpoint_t *fwp_epoint = epointd;
480         fwp_sockaddr_t *peer = &fwp_epoint->peer;
481         ssize_t len;
482         fd_set fdset;
483         
484         /*if (!fwp_endpoint_is_valid(epointd)) {
485                 errno = EINVAL;
486                 return -1;
487         }*/
488         
489         if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
490                 len = _fwp_recvfrom(fwp_epoint->sockd, buffer, 
491                                         buffer_size, 0, peer);
492                 return len;
493         }
494         
495         while (1){
496         /* FIXME: What about using a loop here and continue instead of goto???? */
497                 /* FWP_EPOINT_RELIABLE */
498                 fdset = fwp_epoint->fdset;
499                 if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
500                            (fd_set *)0, NULL) < 0) {
501                 
502                         FWP_ERROR("Error in select: %s", strerror(errno));
503                         return -1;
504                 }
505         
506                 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
507                         fwp_receive_endpoint_accept(fwp_epoint);
508                         continue;
509                 }
510
511                 /* Check client TCP sockets */
512                 len = fwp_recv_conn(fwp_epoint, buffer, buffer_size);
513                 if (len)
514                         return len;
515         }
516 }
517
518 /**
519  * Sends message through vres
520  *
521  * \param[in] epointd Endpoint descriptor
522  * \param[in] msg Message to sent
523  * \param[in] size Message size
524  *
525  * \return
526  * On success, it returns zero.  
527  * On error, -1 is returned and errno is set appropriately.
528  *
529  */
530 int fwp_send(fwp_endpoint_d_t epointd,const void *msg, const size_t size, 
531                 int flags)
532 {
533         fwp_endpoint_t *fwp_epoint = epointd;
534         struct fwp_msgb *msgb;
535         
536 /*      if (!fwp_endpoint_is_valid(epointd)){
537                 errno = EINVAL;
538                 return -1;
539         }
540         if (!fwp_endpoint_is_bound(epointd)){
541                 errno = EPERM;
542                 return -1;
543         }*/
544
545         /*if (flags && MSG_DONTWAIT) 
546                 msgb = fwp_msgb_alloc(buffer_size);
547         else {*/
548                 if (!(msgb = fwp_msgb_alloc(size))) {
549                         errno = ENOMEM;
550                         return -1;
551                 }
552
553                 /*msgb->peer = &fwp_epoint->peer;*/
554                 /*msgb->data = msg;*/
555                 /*msgb->flags = epoint->flags;*/
556                 
557                 /* data must be copied since msg may change while
558                  * message is waiting in transmission queue 
559                  * */
560                 memcpy(msgb->data, msg, size);
561                 fwp_msgb_put(msgb, size);
562                 /*msgb->tail = msgb->data + size;
563                 msgb->len = size;*/
564
565         /*}*/
566
567         /* TODO: test whether _fwp_vres_send is successful */
568         return fwp_vres_send(fwp_epoint->vresd, msgb);
569 }
570