]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
8442dfc2d484bf4f713b20cdc8af06a3bc412610
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61 #include <forb/config.h>
62
63 /**
64  * @file   proto_inet.c
65  * @author Michal Sojka <sojkam1@fel.cvut.cz>
66  * @date   Sun Oct 12 16:10:23 2008
67  * 
68  * @brief  FORB transport protocol based on INET family sockets.
69  * 
70  * UDP is used for broadcasts and TCP for requests/replies. There
71  * exist two uni-drectional connections between any two communicating
72  * peers.
73  */
74
75 extern UL_LOG_CUST(ulogd_forb_proto_inet);
76
77 #define MCAST_PORT 15514        /**< Port used for multicasts */
78 #define MCAST_ADDR "225.15.5.14"
79
80 /** Address used by inet protocol. All values are stored in network
81  * byte order. */
82 struct inet_addr {
83         struct in_addr addr;
84         uint16_t port;          /**< TCP listening port */
85 };
86
87 /** INET protocol data for ports. */
88 struct inet_port {
89         int udp_socket;         /**< Socket for sending and receiving broadcasts */
90         int listen_socket;      /*  */
91         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
92         struct inet_addr addr;  /**< Address of this port */
93         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
94         struct in_addr multicast_addr;
95         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
96 };
97
98 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
99                  struct inet_port,   /* cust_head_t */
100                  forb_peer_t,        /* cust_item_t */
101                  new_peers,          /* cust_head_field */
102                  lnode)              /* cust_node_field */
103
104
105 /** INET protocol data associated with every peer */
106 struct inet_peer {
107         int socket;             /**< Connected socket to the peer */
108 };
109
110 /* static struct inet_port* */
111 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
112
113 static CORBA_boolean
114 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
115 {
116         const struct inet_addr *a = addr;
117         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
118         CORBA_unsigned_short hport = ntohs(a->port);
119         CORBA_boolean ret;
120         ret = CORBA_unsigned_long_serialize(codec, &haddr);
121         if (!ret)
122                 return ret;
123         return CORBA_unsigned_short_serialize(codec, &hport);
124 }
125
126 static CORBA_boolean
127 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
128 {
129         struct inet_addr *a;
130         CORBA_unsigned_long s_addr;
131         CORBA_unsigned_short port;
132         CORBA_boolean ret;
133
134         a = forb_malloc(sizeof(*a));
135         if (!a)
136                 return CORBA_FALSE;
137         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
138         if (!ret)
139                 return ret;
140         ret = CORBA_unsigned_short_deserialize(codec, &port);
141         a->addr.s_addr = htonl(s_addr);
142         a->port = htons(port);
143         *addr = a;
144         return ret;
145 }
146
147 static struct inet_peer *
148 inet_connect(forb_peer_t *peer)
149 {
150         struct inet_peer *ipeer;
151         struct sockaddr_in sa;
152         struct inet_addr *addr = peer->addr;
153         int ret;
154
155         if (!addr) {
156                 ul_logerr("No address to connect\n");
157                 return NULL;
158         }
159         ipeer = forb_malloc(sizeof(*ipeer));
160         if (!ipeer)
161                 goto err;
162         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
163         if (!ipeer->socket) {
164                 ul_logerr("socket(): %s\n", strerror(errno));
165                 goto err_free;
166         }
167         sa.sin_family = AF_INET;
168         sa.sin_port = addr->port;
169         sa.sin_addr = addr->addr;
170         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
171         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
172         if (ret) {
173                 ul_logerr("connect error: %s\n", strerror(errno));
174                 goto err_close;
175         }
176
177         struct epoll_event ev;
178         struct inet_port *p = peer->port->desc.proto_priv;
179         memset(&ev, 0, sizeof(ev));
180         ev.events = EPOLLIN | EPOLLET;
181         ev.data.fd = ipeer->socket;
182         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
183         if (ret) {
184                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
185                 goto err_close;
186         }
187
188
189         return ipeer;
190 err_close:
191         close(ipeer->socket);
192 err_free:
193         forb_free(ipeer);
194 err:
195         return NULL;
196         
197 }
198
199 static size_t
200 inet_send(forb_peer_t *peer, const void *buf, size_t len)
201 {
202         struct inet_peer *ipeer = peer->proto_priv;
203         size_t ret, sent;
204
205         if (!ipeer) {
206                 ipeer = inet_connect(peer);
207                 if (!ipeer) {
208                         return -1;
209                 }
210                 peer->proto_priv = ipeer;
211                 
212         }
213
214         sent = 0;
215         ul_logdeb("send fd=%d\n", ipeer->socket);
216         do {
217                 ret = send(ipeer->socket, buf, len, 0);
218                 if (ret < 0) {
219                         ul_logerr("send error: %s\n", strerror(errno));
220                         return ret;
221                 }
222                 sent += ret;
223                 buf += ret;
224                 len -= ret;
225         } while (len > 0);
226
227         return sent;
228 }
229
230 /*----------------------------------------------------------------------
231  Portable function to set a socket into nonblocking mode.
232  Calling this on a socket causes all future read() and write() calls on
233  that socket to do only as much as they can immediately, and return 
234  without waiting.
235  If no data can be read or written, they return -1 and set errno
236  to EAGAIN (or EWOULDBLOCK).
237  Thanks to Bjorn Reese for this code.
238 ----------------------------------------------------------------------*/
239 int setnonblocking(int fd)
240 {
241     int flags;
242
243     /* If they have O_NONBLOCK, use the Posix way to do it */
244 #if defined(O_NONBLOCK)
245     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
246     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
247         flags = 0;
248     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
249 #else
250     /* Otherwise, use the old way of doing it */
251     flags = 1;
252     return ioctl(fd, FIOBIO, &flags);
253 #endif
254 }     
255
256 static int
257 inet_accept_connection(forb_port_t *port)
258 {
259         struct inet_port *p = port->desc.proto_priv;
260         int client;
261         struct sockaddr_in addr;
262         socklen_t addrlen = sizeof(addr);
263         struct epoll_event ev;
264         int ret;
265         forb_peer_t *peer;
266                                 
267         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
268         if (client < 0){
269                 //perror("accept");
270                 return -1;
271         }
272         ret = setnonblocking(client);
273         if (ret) {
274                 close(client);
275                 return -1;
276         }
277
278         peer = forb_peer_new();
279         if (peer) {
280                 struct inet_peer *ipeer;
281
282                 ipeer = forb_malloc(sizeof(*ipeer));
283                 if (ipeer) {
284                         ipeer->socket = client;
285                         peer->proto_priv = ipeer;
286                         peer->port = port;
287                         peer->state = FORB_PEER_DISCOVERED;
288                         inet_port_new_peer_insert(p, peer);
289                         //printf("New connection d=%d\n", client);
290                 } else {
291                         forb_peer_put(peer);
292                 }
293         }
294         
295         memset(&ev, 0, sizeof(ev));
296         ev.events = EPOLLIN | EPOLLET;
297         ev.data.fd = client;
298         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
299 }
300
301 static size_t
302 inet_recv(forb_port_t *port, void *buf, size_t len)
303 {
304         struct inet_port *iport = port->desc.proto_priv;
305 #if 1
306         struct epoll_event ev;
307         int ret, nfds;
308         forb_peer_t *peer;
309         bool exported_new_peer = false;
310         
311         for (;;) {
312                 if (iport->last_recv_fd == -1) {
313                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
314                         if (nfds < 1)
315                                 return -1;
316                         if (ev.data.fd == iport->listen_socket) {
317                                 ret = inet_accept_connection(port);
318                                 if (ret)
319                                         return -1;
320                                 else
321                                         continue;
322                         } else {
323                                 iport->last_recv_fd = ev.data.fd;
324                         }
325                 }
326                 /* Check for first reception form a just connected peer */
327                 ul_list_for_each(inet_port_new_peer, iport, peer) {
328                         struct inet_peer *ipeer = peer->proto_priv;
329                         //printf("checking new peer with fd=%d\n", ipeer->socket);
330                         if (ipeer->socket == iport->last_recv_fd) {
331                                 inet_port_new_peer_delete(iport, peer);
332
333                                 if (port->new_peer) forb_peer_put(peer);
334
335                                 /* Let the upper layer assign forb ID
336                                  * to this peer according to the request*/
337                                 port->new_peer = peer;
338                                 exported_new_peer = true;
339                                 break;
340                         }
341                 }
342
343                 //printf("recv fd=%d\n", iport->last_recv_fd);
344                 ret = recv(iport->last_recv_fd, buf, len, 0);
345                 if (ret == -1) {
346                         if (exported_new_peer) {
347                                 forb_peer_put(peer);
348                                 port->new_peer = NULL;
349                         }
350                         if (errno == EAGAIN) {
351                                 iport->last_recv_fd = -1;
352                                 continue;
353                         }
354                 }
355                 if (ret == 0) {
356                         if (exported_new_peer) {
357                                 forb_peer_put(peer);
358                                 port->new_peer = NULL;
359                         }
360                         close(iport->last_recv_fd);
361                         /* TODO: Notify FORB about peer disconnect */
362                         iport->last_recv_fd = -1;
363                         continue;
364                 }
365                 return ret;
366         }
367 #else
368         return recv(iport->udp_socket, buf, len, 0);    
369 #endif
370 }
371
372 static int
373 inet_port_destroy(forb_port_t * port)
374 {
375         struct inet_port *pd = port->desc.proto_priv;
376         close(pd->epoll_fd);
377         close(pd->udp_socket);
378         close(pd->listen_socket);
379         forb_free(pd);
380         return 0;
381 }
382
383 static size_t
384 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
385 {
386         struct inet_port *p = port->desc.proto_priv;
387         struct sockaddr_in addr;
388         int ret;
389         
390         addr.sin_family = AF_INET;
391         addr.sin_port = htons(MCAST_PORT);
392         addr.sin_addr = p->multicast_addr;
393         
394         ret = sendto(p->udp_socket, buf, len, 0,
395                      (struct sockaddr*)&addr, sizeof(addr));
396         return ret;
397 }
398
399 static void
400 inet_peer_destroy(forb_peer_t *peer)
401 {
402         struct inet_peer *ipeer = peer->proto_priv;
403         if (ipeer) {
404                 peer->proto_priv = NULL;
405                 close(ipeer->socket);
406                 free(ipeer);
407         }
408 }
409
410 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
411 {
412         const struct inet_addr *a = addr;
413         size_t ret = 0;
414         if (addr) {
415                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
416         }
417         return ret;
418 }
419
420
421 static const forb_proto_t proto_inet = {
422         .hello_interval = 40 /* seconds */,
423         .port_destroy = inet_port_destroy,
424         .peer_destroy = inet_peer_destroy,
425         .send = inet_send,
426         .recv = inet_recv,
427         .broadcast = inet_broadcast,
428         .serialize_addr = inet_serialize_addr,
429         .deserialize_addr = inet_deserialize_addr,
430         .addr2str = inet_addr2str,
431 };
432
433 #define MAX_INTERFACES 10
434 int get_local_address(int sock, struct in_addr *addr)
435 {
436         struct ifconf  ifc;
437         struct ifreq   *ifr, req[MAX_INTERFACES];
438         
439
440         ifc.ifc_len = sizeof(req);
441         ifc.ifc_req = req;
442
443         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
444                 return -1;
445         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
446                 struct sockaddr_in ia;
447                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
448                 ioctl(sock, SIOCGIFFLAGS, ifr);
449                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
450                         *addr = ia.sin_addr;
451                         return 0;
452                 }
453         }
454         return -1;
455 }
456
457 /** 
458  * Initializes INET protocol port.
459  * 
460  * @param port_desc Port description to initialize.
461  * @return Zero on success, -1 on error and errno is set
462  * appropriately.
463  */
464 int
465 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
466                     uint16_t port)
467 {
468         int ret;       
469         struct inet_port *port_priv;
470         struct sockaddr_in addr;
471         socklen_t len;
472         struct epoll_event ev;
473         
474         port_priv = forb_malloc(sizeof(*port_priv));
475         if (!port_priv)
476                 return -1;
477
478         memset(port_priv, 0, sizeof(*port_priv));
479         port_priv->last_recv_fd = -1;
480         inet_port_new_peer_init_head(port_priv);
481         
482         /* Initialize UDP multicast socket */
483         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
484         if (port_priv->udp_socket == -1) goto err_free;
485         
486         int yes = 1;
487         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
488         if (ret)
489                 goto err_close_udp;
490
491         int reuse = 1;
492         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
493         if (ret)
494                 goto err_close_udp;
495
496         setnonblocking(port_priv->udp_socket);
497
498         struct ip_mreq mreq;
499         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
500         mreq.imr_multiaddr = port_priv->multicast_addr;
501         mreq.imr_interface.s_addr = INADDR_ANY;
502         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
503                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
504         if (ret)
505                 goto err_close_udp;
506
507         addr.sin_family = AF_INET;
508         addr.sin_port = htons(MCAST_PORT);
509         addr.sin_addr = port_priv->multicast_addr;
510
511         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
512         if (ret != 0) goto err_close_udp;
513
514         char loop = 1;
515         unsigned loop_size = sizeof(loop);
516         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
517                          IP_MULTICAST_LOOP, &loop, loop_size);
518         if (ret)
519                 goto err_close_udp;
520         
521
522         /* Initialize TCP socket */
523         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
524         if (port_priv->listen_socket == -1) goto err_close_udp;
525
526         addr.sin_family = AF_INET;
527         addr.sin_port = htons(port);
528         addr.sin_addr = listen_on;
529         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
530         if (ret != 0) goto err_close_listen;
531         if (setnonblocking(port_priv->listen_socket))
532                 goto err_close_listen;
533
534         ret = listen(port_priv->listen_socket, 10);
535         if (ret)
536                 goto err_close_listen;
537
538         /* Determine our address and port*/
539         len = sizeof(addr);
540         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
541         if (ret) {
542                 ul_logerr("Non-loopback inet address not found\n");
543                 goto err_close_listen;
544         }
545
546         port_priv->addr.port = addr.sin_port;
547         if (listen_on.s_addr == INADDR_ANY)
548                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
549         else
550                 port_priv->addr.addr = listen_on;
551
552         /* Initialize epoll descriptor */
553         port_priv->epoll_fd = epoll_create(10);
554         if (port_priv->epoll_fd == -1)
555                 goto err_close_listen;
556
557         memset(&ev, 0, sizeof(ev));
558         ev.events = EPOLLIN | EPOLLET;
559         ev.data.fd = port_priv->listen_socket;
560         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
561                         port_priv->listen_socket, &ev);
562         if (ret)
563                 goto err_close_epoll;
564
565 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT  
566         ev.events = EPOLLIN | EPOLLET;
567         ev.data.fd = port_priv->udp_socket;
568         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
569                         port_priv->udp_socket, &ev);
570         if (ret)
571                 goto err_close_epoll;
572 #endif
573
574         port_desc->proto = &proto_inet;
575         port_desc->proto_priv = port_priv;
576         port_desc->addr = &port_priv->addr;
577         return 0;
578 err_close_epoll:
579         close(port_priv->epoll_fd);
580 err_close_listen:
581         ret = errno;
582         close(port_priv->listen_socket);
583         errno = ret;
584 err_close_udp:
585         ret = errno;
586         close(port_priv->udp_socket);
587         errno = ret;
588 err_free:
589         ret = errno;
590         forb_free(port_priv);
591         errno = ret;
592         return -1;
593 }