]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_endpoint.c
7eb7d0b7fb70086f3e71b25c44580bb6b5f3da84
[frescor/fwp.git] / fwp / lib / fwp / fwp_endpoint.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FWP (Frescor WLAN Protocol)                      */
26 /*                                                                        */
27 /* FWP is free software; you can redistribute it and/or modify it         */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FWP is distributed in the hope that it will be         */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FWP; see file        */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FWP header files in a file,          */
39 /* instantiating FWP generics or templates, or linking other files        */
40 /* with FWP objects to produce an executable application, does not        */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46 #include "fwp_endpoint.h"
47 #include "fwp_msgb.h"
48 #include <errno.h>
49 #include <stdlib.h>
50 #include <unistd.h>
51 #include <netinet/in.h>
52 #include "fwp_utils.h"
53 #include "fwp_vres.h"
54 #include <frsh_error.h>
55
56 #include <pthread.h>
57 #include "fwp_debug.h"
58 #include "fwp_msgq.h"
59
60 typedef unsigned int fwp_endpoint_id_t;
61
62 /**
63  * Default fwp endpoint attributes
64  */
65 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
66         .reliability = FWP_EPOINT_BESTEFFORT, 
67         .max_connections = 20,
68 };
69
70 /**
71  * FWP endpoint structure
72  */
73 struct fwp_endpoint{
74         /** Fwp endpoint attributes */
75         fwp_endpoint_attr_t     attr;
76         /* Vres this fwp endpoint is bound to */
77         fwp_vres_t              *vres;
78         /** For send enpoint it contains destination address for
79          * receive endpoint it is filled with the msg source address
80          */
81         struct fwp_sockaddr     peer;   
82         /** Source/destination port */
83         unsigned int            port;   
84         /** Destination node */
85         int                     node;
86         /** Socket descriptor.
87          * In case of rebliable epoint it is a listen tcp socket.
88          */
89         int                     sockd; 
90         /** File descriptor array of peers connected 
91          * to this fwp receive endpoint.*/
92         int                     *c_sockd;
93         /**
94          * Number of connections 
95          */
96         unsigned int            nr_connections;
97         /** client fdset */
98         fd_set                  fdset;
99         /** specific operation options*/
100         int                     flags;  
101         /** Forced source address. If non-zero, packets are sent over
102          * the specified interface. */
103         struct in_addr  src;
104 };
105
106 /**
107  * Allocates endpoint
108  *
109  * \return On success returns fwp endpoint structure. 
110  * On error, NULL is returned. 
111  *
112  */
113 static struct fwp_endpoint* fwp_endpoint_alloc()
114 {
115         return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
116 }
117
118 /**
119  * Allocates endpoint
120  *
121  * \return On success returns endpoint structure. 
122  * On error, NULL is returned. 
123  *
124  */
125 static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
126 {
127         free(endpoint);
128 }
129
130 /**
131  * Destroy endpoint
132  *
133  * \param[in] epd Endpoint descriptor
134  * \return On success 0 is returned. 
135  * On error, negative error value is returned and errno is set appropriately. 
136  */
137 int fwp_endpoint_destroy(struct fwp_endpoint *ep)
138 {
139         if (ep->sockd > 0) 
140                 close(ep->sockd);
141
142         fwp_endpoint_free(ep);  
143         return 0;
144 }
145
146 /**
147  * Get endpoint parameters
148  *
149  * \param[in] ep Endpoint descriptor
150  * \param[out] node Node identifier
151  * \param[out] port Port
152  * \param[out] attr Endpoint`s attributes
153  * \return On success 0 is returned. 
154  * On error, negative error value is returned. 
155  */
156 int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, 
157                                 unsigned int *port, fwp_endpoint_attr_t *attr)
158 {
159         if (node) *node = ep->node;
160         if (port) *port = ep->port;
161         if (attr) *attr = ep->attr;
162         
163         return 0;
164 }
165
166 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
167 {
168         bzero(attr, sizeof(fwp_endpoint_attr_t));
169         *attr = fwp_epoint_attr_default;
170
171         return 0;
172 }
173
174 /**
175  * Creates send endpoint
176  *
177  * \param[in] node IP address of destination node
178  * \param[in] port UDP port
179  * \param[in] attr Endpoint attributes
180  * \param[out] epp  Pointer to the descriptor of newly created endpoint
181  *
182  * \return Zero on success, -1 on error and sets errno appropriately. 
183  *
184  */
185 int fwp_send_endpoint_create(unsigned int node,
186                                 unsigned int port, 
187                                 fwp_endpoint_attr_t *attr,
188                                 struct fwp_endpoint **epoint)
189 {       
190         struct sockaddr_in *addr;
191         struct fwp_endpoint *fwp_epoint;
192
193         fwp_epoint = fwp_endpoint_alloc();      
194         if (!fwp_epoint) {
195                 errno = ENOMEM;
196                 return -1;
197         }
198         
199         /*epoint->type = FWP_SEND_EPOINT;
200         epoint->status = FWP_EPOINT_UNBOUND;
201         epoint->node = node;
202         epoint->port = port;
203         */
204         if (attr)
205                 fwp_epoint->attr  = *attr;
206         else
207                 fwp_epoint->attr = fwp_epoint_attr_default;
208                 
209         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
210         bzero((char*) addr, sizeof(*addr));
211         addr->sin_family = AF_INET;
212         addr->sin_addr.s_addr = node;
213         addr->sin_port = htons(port);
214         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
215         
216         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
217                 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
218                 if (fwp_epoint->sockd < 0) {
219                         goto err;
220                 }
221         } else {
222                 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
223                 if (fwp_epoint->sockd < 0) {
224                         goto err;
225                 }
226         
227                 /* Enable broadcasts */
228                 /*unsigned int yes = 1;
229                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
230                                &yes, sizeof(yes)) == -1) {
231                         FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
232                         goto err;
233                 }*/
234         
235         }
236         
237         unsigned int yes = 1;
238         if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR, 
239                                &yes, sizeof(yes)) == -1) {
240                         FWP_DEBUG("setsockopt(SO_REUSEADDR): %s", strerror(errno));
241                         goto err;
242         }
243
244         if (connect(fwp_epoint->sockd,
245                         (struct sockaddr*) &fwp_epoint->peer.addr, 
246                         fwp_epoint->peer.addrlen)) {
247                 FWP_DEBUG("FWp connect error\n"); 
248                 goto err;
249         }
250         
251         FWP_DEBUG("FWP Send endpoint created.\n"); 
252
253 #ifdef FWP_WITHOUT_CONTNEGT
254         /* Create vres with default parameters */
255         FWP_DEBUG("Creating default vres\n");
256         if (fwp_vres_create(&fwp_vres_params_default, &fwp_epoint->vresd)) {
257                 goto err;
258         }
259         
260         fwp_send_endpoint_bind(fwp_epoint, fwp_epoint->vresd);
261 #endif
262         
263         *epoint = fwp_epoint;
264         return fwp_epoint->sockd;               
265 err:
266         fwp_endpoint_destroy(fwp_epoint);
267         return -1;      
268 }
269
270 /**
271  * Creates receive endpoint
272  *
273  * \param[in] port UDP port
274  * \param[in] attr Endpoint attributes
275  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
276  *
277  * \return Zero on success, -1 on error and errno is set.
278  */
279 int fwp_receive_endpoint_create(unsigned int port,
280                                 fwp_endpoint_attr_t *attr,
281                                 struct fwp_endpoint **epp)
282 {
283         struct sockaddr_in *addr;
284         struct fwp_endpoint *fwp_epoint;
285
286         fwp_epoint = fwp_endpoint_alloc();      
287         if (!fwp_epoint) {
288                 errno = ENOMEM;
289                 return -1;
290         }
291         
292         /*epoint->type = FWP_RECV_EPOINT;
293         epoint->status = FWP_EPOINT_UNBOUND;*/
294         
295         if (attr)
296                 fwp_epoint->attr  = *attr;
297         else
298                 fwp_epoint->attr = fwp_epoint_attr_default;
299
300         addr = (struct sockaddr_in *) &(fwp_epoint->peer.addr);
301         addr->sin_family = AF_INET;
302         /* TODO: set listen interface, maybe through config struct*/
303         addr->sin_addr.s_addr = FWP_ANY_NODE;
304         fwp_epoint->port  = addr->sin_port = htons(port);
305         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
306         
307         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
308                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
309                                                 IPPROTO_TCP)) < 0) {
310                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
311                         goto err;
312                 }       
313                 
314                 int yes = 1;
315                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
316                                &yes, sizeof(yes)) == -1) {
317                         FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
318                         goto err;
319                 }
320
321                 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
322                                 fwp_epoint->peer.addrlen) == -1) {
323                         FWP_ERROR("Bind error: %s", strerror(errno));
324                         /* TODO: remove all error messages from all libraries */
325                         goto err;
326                 }
327                 
328                 if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
329                         FWP_ERROR("Error on listen call: %s\n", strerror(errno));
330                         goto err;
331                 }
332                 
333                 FD_ZERO(&fwp_epoint->fdset);
334                 /*add listen socket */
335                 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
336                 fwp_epoint->c_sockd = 
337                                 (int*)malloc(fwp_epoint->attr.max_connections);
338                 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
339                 fwp_epoint->nr_connections = 0;
340
341                 FWP_DEBUG("Reliable receive endpoint port=%d created.\n", 
342                                 fwp_epoint->port); 
343         } else {
344                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
345                                                 IPPROTO_UDP)) < 0) {
346                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
347                         goto err;
348                 }
349                 
350                 if (bind(fwp_epoint->sockd, 
351                         (struct sockaddr*) &fwp_epoint->peer.addr, 
352                         fwp_epoint->peer.addrlen) == -1) {
353                         
354                         FWP_ERROR("Bind error: %s", strerror(errno));
355                         goto err;
356                 }
357                 FWP_DEBUG("Best-Effort receive endpoint port=%d created.\n", 
358                                 fwp_epoint->port); 
359         }
360                 
361         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
362                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
363                 
364                 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
365                 return -1;
366         }else {
367                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
368         }
369         */
370         
371         getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
372                         &fwp_epoint->peer.addrlen);
373
374         addr = (struct sockaddr_in*) fwp_epoint->peer.addr;
375         fwp_epoint->port = ntohs(addr->sin_port);
376         FWP_DEBUG("Recv port= %d\n",ntohs(addr->sin_port));     
377         *epp = fwp_epoint;
378         return 0;
379 err:
380         fwp_endpoint_destroy(fwp_epoint);
381         return errno;
382 }
383
384 /**
385  * Binds send endpoint to vres
386  *
387  * \param[in] vres identifier of vres
388  * \param[in] ep send endpoint identifier
389  *
390  * \return On success returns 0. On error, -1 and errno is set appropriately.
391  */
392 int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
393 {
394         int rv = 0;
395
396         if (ep->vres)
397                 return FRSH_ERR_ALREADY_BOUND;
398         
399         ep->vres = vres;
400         rv = fwp_vres_bind(vres, ep, ep->sockd);
401
402         return rv;
403 }
404
405 /**
406  * Unbinds send endpoint from vres
407  *
408  * \param[in] epointd Send endpoint descriptor 
409  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
410  *
411  */
412 int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
413 {
414         int rv = 0;
415
416         /* unlink epoint-vres mutually */
417         if ((rv = fwp_vres_unbind(ep->vres)) < 0) 
418                 return rv;
419
420         return 0;
421 }
422
423 /**
424  * Accepts (TCP) client connection to receive endpoint
425  *
426  * \param[in] epointd Pointer to fwp endpoint
427  * \return
428  * On success, it returns zero.  
429  *
430  */
431 static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
432 {
433         int csockd;
434 //      struct fwp_endpoint *fwp_epoint = epointd;
435         fwp_sockaddr_t  peer;
436         int i;
437
438         if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
439                 return -1;
440
441         peer.addrlen = sizeof(struct sockaddr_in);
442         csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
443                         &peer.addrlen);
444         
445         if (csockd < 0) {
446                 FWP_ERROR("Error on accept: %s\n", strerror(errno));
447                 return errno;   
448         }               
449
450         FWP_DEBUG("New connection accepted\n");
451         /* find free place */           
452         i = 0;
453         while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
454                                 i++;
455         fwp_epoint->c_sockd[i] = csockd; 
456         fwp_epoint->nr_connections++;
457                 
458         FD_SET(csockd, &fwp_epoint->fdset);
459         return 0;       
460
461
462 /**
463  * Receives message from stream (TCP)
464  *
465  * \param[in] epointd Descriptor of endpoint
466  * \param[in] buffer Buffer to store message
467  * \param[in] buffer_size Size of buffer
468  *
469  * \return
470  * On success, it returns number of received bytes.  
471  * On error, -1 is returned and errno is set appropriately.
472  *
473  */
474 int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer, 
475                         size_t buffer_size)
476 {
477         fwp_sockaddr_t *peer = &ep->peer;
478         fd_set fdset = ep->fdset;
479         ssize_t len;
480         int i;
481
482         FWP_DEBUG("Checking for tcp data\n");
483         for (i = 0; i < ep->nr_connections; i++) {
484                 if (!FD_ISSET(ep->c_sockd[i], &fdset)) {
485                         continue;       
486                 }       
487                         
488                 FWP_DEBUG("Prepare to receive tcp data\n");
489                 peer->addrlen = sizeof(struct sockaddr_in);
490                 len = _fwp_recvfrom(ep->c_sockd[i], buffer, 
491                                         buffer_size,0, peer);
492
493                 if (len < 0) /* Error */
494                         return len;
495                 
496                 FWP_DEBUG("Received tcp data\n");
497                 if (len)
498                         return len;
499         
500                 /* tcp connection closed */
501                 FWP_DEBUG("Connection closed\n");
502                 FD_CLR(ep->c_sockd[i], &ep->fdset);
503                 memcpy(ep->c_sockd+i, ep->c_sockd+i+1, 
504                         sizeof(int)*(ep->nr_connections -i-1));
505                 ep->nr_connections--;
506                 return 0;
507         }
508         return 0;
509 }
510
511 /**
512  * Receives message
513  *
514  * \param[in] epointd Descriptor of endpoint
515  * \param[in] buffer Buffer to store message
516  * \param[in] buffer_size Size of buffer
517  *
518  * \return
519  * On success, it returns number of received bytes.  
520  * On error, -1 is returned and errno is set appropriately.
521  *
522  */
523 ssize_t fwp_recv(struct fwp_endpoint *ep,
524                         void *buffer, const size_t buffer_size,
525                         unsigned int *from, int flags)
526 {
527         fwp_sockaddr_t *peer = &ep->peer;
528         struct sockaddr_in *addr = (struct sockaddr_in*) ep->peer.addr;
529         ssize_t len;
530         fd_set fdset;
531         
532         /*if (!fwp_endpoint_is_valid(epointd)) {
533                 errno = EINVAL;
534                 return -1;
535         }*/
536         
537         if (ep->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
538                 len = _fwp_recvfrom(ep->sockd, buffer, 
539                                         buffer_size, 0, peer);
540                 
541                 *from = addr->sin_addr.s_addr;
542                 return len;
543         }
544         
545         while (1){
546                 /* FWP_EPOINT_RELIABLE */
547                 fdset = ep->fdset;
548                 if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
549                            (fd_set *)0, NULL) < 0) {
550                 
551                         FWP_ERROR("Error in select: %s", strerror(errno));
552                         return -1;
553                 }
554         
555                 if (FD_ISSET(ep->sockd, &fdset)) { /* is it listen socket? */
556                         fwp_receive_endpoint_accept(ep);
557                         continue;
558                 }
559
560                 /* Check client TCP sockets */
561                 len = fwp_recv_conn(ep, buffer, buffer_size);
562                 if (len) {
563                         *from = addr->sin_addr.s_addr;
564                         return len;
565                 }
566         }
567 }
568 /** 
569  * Physically send the message.
570  *
571  * This function should be called either by fwp_send_sync()/async() of
572  * by VRES to send delayed messaged.
573  * 
574  * @param ep 
575  * @param data 
576  * @param size 
577  * 
578  * @return 
579  */
580 ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
581                              const void *data, const size_t size)
582 {
583         struct iovec  iov;
584         struct msghdr msg = {0};
585         ssize_t ret;
586         char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
587
588         iov.iov_base = (void*)data;
589         iov.iov_len = size;
590
591         msg.msg_iov = &iov;
592         msg.msg_iovlen = 1;
593
594         if (ep->src.s_addr != 0) {
595                 struct cmsghdr *cmsg;
596                 struct in_pktinfo *ipi;
597
598                 memset(cmsg_buf, 0, sizeof(cmsg_buf));
599
600                 msg.msg_control = cmsg_buf;
601                 msg.msg_controllen = sizeof(cmsg_buf);
602
603                 cmsg = CMSG_FIRSTHDR(&msg);
604
605                 cmsg->cmsg_level = SOL_IP;
606                 cmsg->cmsg_type = IP_PKTINFO;
607                 cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
608
609                 ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
610                 ipi->ipi_spec_dst = ep->src;
611         }
612         ret = sendmsg(ep->sockd, &msg, 0);
613         return ret;
614 }
615
616 /**
617  * Sends message through vres
618  *
619  * \param[in] epointd Endpoint descriptor
620  * \param[in] msg Message to sent
621  * \param[in] size Message size
622  *
623  * \return
624  * On success, it returns zero.  
625  * On error, -1 is returned and errno is set appropriately.
626  *
627  */
628 int fwp_send_async(struct fwp_endpoint *ep, const void *msg, size_t size)
629 {
630         int ret;
631
632         if (!ep->vres)
633                 return FRSH_ERR_NOT_BOUND;
634
635         if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
636                 ret = fwp_endpoint_do_send(ep, msg, size);
637         else
638                 ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
639         return ret;
640 }
641
642 int fwp_send_sync(struct fwp_endpoint *ep, const void *msg, size_t size)
643 {
644         int ret;
645
646         if (!ep->vres)
647                 return FRSH_ERR_NOT_BOUND;
648
649         ret = fwp_vres_consume_budget(ep->vres, size, true);
650         if (ret)
651                 return ret;
652         ret = fwp_endpoint_do_send(ep, msg, size);
653         return ret;
654 }
655