]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
Handle EINTR in epoll
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61 #include <forb/config.h>
62 #include "discovery.h"
63
64 /**
65  * @file   proto_inet.c
66  * @author Michal Sojka <sojkam1@fel.cvut.cz>
67  * @date   Sun Oct 12 16:10:23 2008
68  * 
69  * @brief  FORB transport protocol based on INET family sockets.
70  * 
71  * UDP is used for broadcasts and TCP for requests/replies. There
72  * exist two uni-drectional connections between any two communicating
73  * peers.
74  */
75
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
77
78 #define MCAST_PORT 15514        /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
80
81 /** Address used by inet protocol. All values are stored in network
82  * byte order. */
83 struct inet_addr {
84         struct in_addr addr;
85         uint16_t port;          /**< TCP listening port */
86 };
87
88 /** INET protocol data for ports. */
89 struct inet_port {
90         int udp_socket;         /**< Socket for sending and receiving broadcasts */
91         int listen_socket;      /*  */
92         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
93         struct inet_addr addr;  /**< Address of this port */
94         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
95         struct in_addr multicast_addr;
96         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
97 };
98
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100                  struct inet_port,   /* cust_head_t */
101                  forb_peer_t,        /* cust_item_t */
102                  new_peers,          /* cust_head_field */
103                  lnode)              /* cust_node_field */
104
105
106 /** INET protocol data associated with every peer */
107 struct inet_peer {
108         int socket;             /**< Connected socket to the peer */
109 };
110
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
113
114 static CORBA_boolean
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
116 {
117         const struct inet_addr *a = addr;
118         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119         CORBA_unsigned_short hport = ntohs(a->port);
120         CORBA_boolean ret;
121         ret = CORBA_unsigned_long_serialize(codec, &haddr);
122         if (!ret)
123                 return ret;
124         return CORBA_unsigned_short_serialize(codec, &hport);
125 }
126
127 static CORBA_boolean
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
129 {
130         struct inet_addr *a;
131         CORBA_unsigned_long s_addr;
132         CORBA_unsigned_short port;
133         CORBA_boolean ret;
134
135         a = forb_malloc(sizeof(*a));
136         if (!a)
137                 return CORBA_FALSE;
138         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
139         if (!ret)
140                 return ret;
141         ret = CORBA_unsigned_short_deserialize(codec, &port);
142         a->addr.s_addr = htonl(s_addr);
143         a->port = htons(port);
144         *addr = a;
145         return ret;
146 }
147
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
150 {
151         struct inet_peer *ipeer;
152         struct sockaddr_in sa;
153         struct inet_addr *addr = peer->addr;
154         int ret;
155
156         if (!addr) {
157                 ul_logerr("No address to connect\n");
158                 return NULL;
159         }
160         ipeer = forb_malloc(sizeof(*ipeer));
161         if (!ipeer)
162                 goto err;
163         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164         if (!ipeer->socket) {
165                 ul_logerr("socket(): %s\n", strerror(errno));
166                 goto err_free;
167         }
168         sa.sin_family = AF_INET;
169         sa.sin_port = addr->port;
170         sa.sin_addr = addr->addr;
171         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
173         if (ret) {
174                 ul_logerr("connect error: %s\n", strerror(errno));
175                 goto err_close;
176         }
177
178         struct epoll_event ev;
179         struct inet_port *p = peer->port->desc.proto_priv;
180         memset(&ev, 0, sizeof(ev));
181         ev.events = EPOLLIN | EPOLLET;
182         ev.data.fd = ipeer->socket;
183         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
184         if (ret) {
185                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
186                 goto err_close;
187         }
188
189
190         return ipeer;
191 err_close:
192         close(ipeer->socket);
193 err_free:
194         forb_free(ipeer);
195 err:
196         return NULL;
197         
198 }
199
200 static ssize_t
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
202 {
203         struct inet_peer *ipeer = peer->proto_priv;
204         ssize_t ret, sent;
205
206         if (!ipeer) {
207                 ipeer = inet_connect(peer);
208                 if (!ipeer) {
209                         return -1;
210                 }
211                 peer->proto_priv = ipeer;
212                 
213         }
214
215         sent = 0;
216         ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
217         do {
218                 ret = send(ipeer->socket, buf, len, 0);
219                 if (ret < 0) {
220                         ul_logerr("send error: %s\n", strerror(errno));
221                         return ret;
222                 }
223                 sent += ret;
224                 buf += ret;
225                 len -= ret;
226         } while (len > 0);
227
228         return sent;
229 }
230
231 /*----------------------------------------------------------------------
232  Portable function to set a socket into nonblocking mode.
233  Calling this on a socket causes all future read() and write() calls on
234  that socket to do only as much as they can immediately, and return 
235  without waiting.
236  If no data can be read or written, they return -1 and set errno
237  to EAGAIN (or EWOULDBLOCK).
238  Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
241 {
242     int flags;
243
244     /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
248         flags = 0;
249     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 #else
251     /* Otherwise, use the old way of doing it */
252     flags = 1;
253     return ioctl(fd, FIOBIO, &flags);
254 #endif
255 }     
256
257 static int
258 inet_accept_connection(forb_port_t *port)
259 {
260         struct inet_port *p = port->desc.proto_priv;
261         int client;
262         struct sockaddr_in addr;
263         socklen_t addrlen = sizeof(addr);
264         struct epoll_event ev;
265         int ret;
266         forb_peer_t *peer;
267                                 
268         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
269         if (client < 0){
270                 //perror("accept");
271                 return -1;
272         }
273         ret = setnonblocking(client);
274         if (ret) {
275                 close(client);
276                 return -1;
277         }
278
279         peer = forb_peer_new();
280         if (peer) {
281                 struct inet_peer *ipeer;
282
283                 ipeer = forb_malloc(sizeof(*ipeer));
284                 if (ipeer) {
285                         ipeer->socket = client;
286                         peer->proto_priv = ipeer;
287                         peer->port = port;
288                         peer->state = FORB_PEER_DISCOVERED;
289                         inet_port_new_peer_insert(p, peer);
290                         //printf("New connection d=%d\n", client);
291                 } else {
292                         forb_peer_put(peer);
293                 }
294         }
295         
296         memset(&ev, 0, sizeof(ev));
297         ev.events = EPOLLIN | EPOLLET;
298         ev.data.fd = client;
299         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
300 }
301
302 static ssize_t
303 inet_recv(forb_port_t *port, void *buf, size_t len)
304 {
305         struct inet_port *iport = port->desc.proto_priv;
306 #if 1
307         struct epoll_event ev;
308         ssize_t ret;
309         int nfds;
310         forb_peer_t *peer;
311         bool exported_new_peer = false;
312         
313         for (;;) {
314                 if (iport->last_recv_fd == -1) {
315                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
316                         if (nfds == -1 && errno == EINTR)
317                                 continue;
318                         if (nfds < 1)
319                                 return -1;
320                         if (ev.data.fd == iport->listen_socket) {
321                                 ret = inet_accept_connection(port);
322                                 if (ret) {
323                                         ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
324                                         return -1;
325                                 } else
326                                         continue;
327                         } else {
328                                 iport->last_recv_fd = ev.data.fd;
329                         }
330                 }
331                 /* Check for first reception form a just connected peer */
332                 ul_list_for_each(inet_port_new_peer, iport, peer) {
333                         struct inet_peer *ipeer = peer->proto_priv;
334                         //printf("checking new peer with fd=%d\n", ipeer->socket);
335                         if (ipeer->socket == iport->last_recv_fd) {
336                                 inet_port_new_peer_delete(iport, peer);
337
338                                 if (port->new_peer) forb_peer_put(peer);
339
340                                 /* Let the upper layer assign forb ID
341                                  * to this peer according to the request*/
342                                 port->new_peer = peer;
343                                 exported_new_peer = true;
344                                 break;
345                         }
346                 }
347
348                 //printf("recv fd=%d\n", iport->last_recv_fd);
349                 ret = recv(iport->last_recv_fd, buf, len, 0);
350                 if (ret == -1) {
351                         if (exported_new_peer) {
352                                 forb_peer_put(peer);
353                                 port->new_peer = NULL;
354                         }
355                         if (errno != EAGAIN) {
356                                 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
357                         }
358                         iport->last_recv_fd = -1;
359                         continue;
360                 }
361                 if (ret == 0) {
362                         if (exported_new_peer) {
363                                 forb_peer_put(peer);
364                                 port->new_peer = NULL;
365                         }
366                         ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
367                         ul_list_for_each(forb_port_peer, port, peer) {
368                                 struct inet_peer *ipeer = peer->proto_priv;
369                                 if (ipeer && ipeer->socket == iport->last_recv_fd) {
370                                         forb_peer_disconnected(peer);
371                                         break;
372                                 }
373                         }
374                         iport->last_recv_fd = -1;
375                         continue;
376                 }
377                 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
378                 return ret;
379         }
380 #else
381         return recv(iport->udp_socket, buf, len, 0);    
382 #endif
383 }
384
385 static int
386 inet_port_destroy(forb_port_t * port)
387 {
388         struct inet_port *pd = port->desc.proto_priv;
389         close(pd->epoll_fd);
390         close(pd->udp_socket);
391         close(pd->listen_socket);
392         forb_free(pd);
393         return 0;
394 }
395
396 static ssize_t
397 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
398 {
399         struct inet_port *p = port->desc.proto_priv;
400         struct sockaddr_in addr;
401         ssize_t ret;
402         
403         addr.sin_family = AF_INET;
404         addr.sin_port = htons(MCAST_PORT);
405         addr.sin_addr = p->multicast_addr;
406         
407         ret = sendto(p->udp_socket, buf, len, 0,
408                      (struct sockaddr*)&addr, sizeof(addr));
409         return ret;
410 }
411
412 static void
413 inet_peer_destroy(forb_peer_t *peer)
414 {
415         struct inet_peer *ipeer = peer->proto_priv;
416         if (ipeer) {
417                 peer->proto_priv = NULL;
418                 ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
419                 close(ipeer->socket);
420                 free(ipeer);
421         }
422 }
423
424 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
425 {
426         const struct inet_addr *a = addr;
427         size_t ret = 0;
428         if (addr) {
429                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
430         }
431         return ret;
432 }
433
434 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
435
436 #include <fcb.h>
437 #include <fcb_contact_info.h>
438 #include <stdlib.h>
439
440 static void inet_register_cb(forb_port_t *port)
441 {
442         struct inet_addr *ia;
443         
444         ia = malloc(sizeof(*ia));
445         if (!ia) return;
446
447         char *fcb_addr = getenv("FCB_ADDR");
448         if (!fcb_addr) fcb_addr = "127.0.0.1";
449         ia->addr.s_addr = inet_addr(fcb_addr);
450         ia->port = htons(FCB_TCP_PORT);
451         forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
452 }
453 #else
454 #define inet_register_cb NULL
455 #endif
456
457 static const forb_proto_t proto_inet = {
458         .hello_interval = 40 /* seconds */,
459         .port_destroy = inet_port_destroy,
460         .peer_destroy = inet_peer_destroy,
461         .send = inet_send,
462         .recv = inet_recv,
463         .broadcast = inet_broadcast,
464         .serialize_addr = inet_serialize_addr,
465         .deserialize_addr = inet_deserialize_addr,
466         .addr2str = inet_addr2str,
467         .register_cb = inet_register_cb,
468 };
469
470 #define MAX_INTERFACES 10
471 int get_local_address(int sock, struct in_addr *addr)
472 {
473         struct ifconf  ifc;
474         struct ifreq   *ifr, req[MAX_INTERFACES];
475         
476
477         ifc.ifc_len = sizeof(req);
478         ifc.ifc_req = req;
479
480         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
481                 return -1;
482         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
483                 struct sockaddr_in ia;
484                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
485                 ioctl(sock, SIOCGIFFLAGS, ifr);
486                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
487                         *addr = ia.sin_addr;
488                         return 0;
489                 }
490         }
491         return -1;
492 }
493
494 /** 
495  * Initializes INET protocol port.
496  * 
497  * @param port_desc Port description to initialize.
498  * @return Zero on success, -1 on error and errno is set
499  * appropriately.
500  */
501 int
502 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
503                     uint16_t port)
504 {
505         int ret;       
506         struct inet_port *port_priv;
507         struct sockaddr_in addr;
508         socklen_t len;
509         struct epoll_event ev;
510         
511         port_priv = forb_malloc(sizeof(*port_priv));
512         if (!port_priv)
513                 return -1;
514
515         memset(port_priv, 0, sizeof(*port_priv));
516         port_priv->last_recv_fd = -1;
517         inet_port_new_peer_init_head(port_priv);
518         
519         /* Initialize UDP multicast socket */
520         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
521         if (port_priv->udp_socket == -1) goto err_free;
522         
523         int yes = 1;
524         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
525         if (ret)
526                 goto err_close_udp;
527
528         int reuse = 1;
529         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
530         if (ret)
531                 goto err_close_udp;
532
533         setnonblocking(port_priv->udp_socket);
534
535         struct ip_mreq mreq;
536         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
537         mreq.imr_multiaddr = port_priv->multicast_addr;
538         mreq.imr_interface.s_addr = INADDR_ANY;
539         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
540                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
541         if (ret)
542                 goto err_close_udp;
543
544         addr.sin_family = AF_INET;
545         addr.sin_port = htons(MCAST_PORT);
546         addr.sin_addr = port_priv->multicast_addr;
547
548         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
549         if (ret != 0) goto err_close_udp;
550
551         char loop = 1;
552         unsigned loop_size = sizeof(loop);
553         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
554                          IP_MULTICAST_LOOP, &loop, loop_size);
555         if (ret)
556                 goto err_close_udp;
557         
558
559         /* Initialize TCP socket */
560         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
561         if (port_priv->listen_socket == -1) goto err_close_udp;
562
563         reuse = 1;
564         setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
565         if (ret)
566                 goto err_close_listen;
567
568         addr.sin_family = AF_INET;
569         addr.sin_port = htons(port);
570         addr.sin_addr = listen_on;
571         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
572         if (ret != 0) goto err_close_listen;
573         if (setnonblocking(port_priv->listen_socket))
574                 goto err_close_listen;
575
576         ret = listen(port_priv->listen_socket, 10);
577         if (ret)
578                 goto err_close_listen;
579
580         /* Determine our address and port*/
581         len = sizeof(addr);
582         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
583         if (ret) {
584                 ul_logerr("Non-loopback inet address not found\n");
585                 goto err_close_listen;
586         }
587
588         port_priv->addr.port = addr.sin_port;
589         if (listen_on.s_addr == INADDR_ANY)
590                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
591         else
592                 port_priv->addr.addr = listen_on;
593
594         /* Initialize epoll descriptor */
595         port_priv->epoll_fd = epoll_create(10);
596         if (port_priv->epoll_fd == -1)
597                 goto err_close_listen;
598
599         memset(&ev, 0, sizeof(ev));
600         ev.events = EPOLLIN | EPOLLET;
601         ev.data.fd = port_priv->listen_socket;
602         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
603                         port_priv->listen_socket, &ev);
604         if (ret)
605                 goto err_close_epoll;
606
607 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT  
608         ev.events = EPOLLIN | EPOLLET;
609         ev.data.fd = port_priv->udp_socket;
610         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
611                         port_priv->udp_socket, &ev);
612         if (ret)
613                 goto err_close_epoll;
614 #endif
615
616         port_desc->proto = &proto_inet;
617         port_desc->proto_priv = port_priv;
618         port_desc->addr = &port_priv->addr;
619         return 0;
620 err_close_epoll:
621         close(port_priv->epoll_fd);
622 err_close_listen:
623         ret = errno;
624         close(port_priv->listen_socket);
625         errno = ret;
626 err_close_udp:
627         ret = errno;
628         close(port_priv->udp_socket);
629         errno = ret;
630 err_free:
631         ret = errno;
632         forb_free(port_priv);
633         errno = ret;
634         return -1;
635 }