]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
1da3605d0987a9abdc47703db5b98b5be0c62f19
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61 #include <forb/config.h>
62 #include "discovery.h"
63
64 /**
65  * @file   proto_inet.c
66  * @author Michal Sojka <sojkam1@fel.cvut.cz>
67  * @date   Sun Oct 12 16:10:23 2008
68  * 
69  * @brief  FORB transport protocol based on INET family sockets.
70  * 
71  * UDP is used for broadcasts and TCP for requests/replies. There
72  * exist two uni-drectional connections between any two communicating
73  * peers.
74  */
75
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
77
78 #define MCAST_PORT 15514        /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
80
81 /** Address used by inet protocol. All values are stored in network
82  * byte order. */
83 struct inet_addr {
84         struct in_addr addr;
85         uint16_t port;          /**< TCP listening port */
86 };
87
88 /** INET protocol data for ports. */
89 struct inet_port {
90         int udp_socket;         /**< Socket for sending and receiving broadcasts */
91         int listen_socket;      /*  */
92         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
93         struct inet_addr addr;  /**< Address of this port */
94         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
95         struct in_addr multicast_addr;
96         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
97 };
98
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100                  struct inet_port,   /* cust_head_t */
101                  forb_peer_t,        /* cust_item_t */
102                  new_peers,          /* cust_head_field */
103                  lnode)              /* cust_node_field */
104
105
106 /** INET protocol data associated with every peer */
107 struct inet_peer {
108         int socket;             /**< Connected socket to the peer */
109 };
110
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
113
114 static CORBA_boolean
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
116 {
117         const struct inet_addr *a = addr;
118         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119         CORBA_unsigned_short hport = ntohs(a->port);
120         CORBA_boolean ret;
121         ret = CORBA_unsigned_long_serialize(codec, &haddr);
122         if (!ret)
123                 return ret;
124         return CORBA_unsigned_short_serialize(codec, &hport);
125 }
126
127 static CORBA_boolean
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
129 {
130         struct inet_addr *a;
131         CORBA_unsigned_long s_addr;
132         CORBA_unsigned_short port;
133         CORBA_boolean ret;
134
135         a = forb_malloc(sizeof(*a));
136         if (!a)
137                 return CORBA_FALSE;
138         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
139         if (!ret)
140                 return ret;
141         ret = CORBA_unsigned_short_deserialize(codec, &port);
142         a->addr.s_addr = htonl(s_addr);
143         a->port = htons(port);
144         *addr = a;
145         return ret;
146 }
147
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
150 {
151         struct inet_peer *ipeer;
152         struct sockaddr_in sa;
153         struct inet_addr *addr = peer->addr;
154         int ret;
155
156         if (!addr) {
157                 ul_logerr("No address to connect\n");
158                 return NULL;
159         }
160         ipeer = forb_malloc(sizeof(*ipeer));
161         if (!ipeer)
162                 goto err;
163         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164         if (!ipeer->socket) {
165                 ul_logerr("socket(): %s\n", strerror(errno));
166                 goto err_free;
167         }
168         sa.sin_family = AF_INET;
169         sa.sin_port = addr->port;
170         sa.sin_addr = addr->addr;
171         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
173         if (ret) {
174                 ul_logerr("connect error: %s\n", strerror(errno));
175                 goto err_close;
176         }
177
178         struct epoll_event ev;
179         struct inet_port *p = peer->port->desc.proto_priv;
180         memset(&ev, 0, sizeof(ev));
181         ev.events = EPOLLIN | EPOLLET;
182         ev.data.fd = ipeer->socket;
183         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
184         if (ret) {
185                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
186                 goto err_close;
187         }
188
189
190         return ipeer;
191 err_close:
192         close(ipeer->socket);
193 err_free:
194         forb_free(ipeer);
195 err:
196         return NULL;
197         
198 }
199
200 static ssize_t
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
202 {
203         struct inet_peer *ipeer = peer->proto_priv;
204         ssize_t ret, sent;
205
206         if (!ipeer) {
207                 ipeer = inet_connect(peer);
208                 if (!ipeer) {
209                         return -1;
210                 }
211                 peer->proto_priv = ipeer;
212                 
213         }
214
215         sent = 0;
216         ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
217         do {
218                 ret = send(ipeer->socket, buf, len, 0);
219                 if (ret < 0) {
220                         ul_logerr("send error: %s\n", strerror(errno));
221                         return ret;
222                 }
223                 sent += ret;
224                 buf += ret;
225                 len -= ret;
226         } while (len > 0);
227
228         return sent;
229 }
230
231 /*----------------------------------------------------------------------
232  Portable function to set a socket into nonblocking mode.
233  Calling this on a socket causes all future read() and write() calls on
234  that socket to do only as much as they can immediately, and return 
235  without waiting.
236  If no data can be read or written, they return -1 and set errno
237  to EAGAIN (or EWOULDBLOCK).
238  Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
241 {
242     int flags;
243
244     /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
248         flags = 0;
249     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 #else
251     /* Otherwise, use the old way of doing it */
252     flags = 1;
253     return ioctl(fd, FIOBIO, &flags);
254 #endif
255 }     
256
257 static int
258 inet_accept_connection(forb_port_t *port)
259 {
260         struct inet_port *p = port->desc.proto_priv;
261         int client;
262         struct sockaddr_in addr;
263         socklen_t addrlen = sizeof(addr);
264         struct epoll_event ev;
265         int ret;
266         forb_peer_t *peer;
267                                 
268         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
269         if (client < 0){
270                 //perror("accept");
271                 return -1;
272         }
273         ret = setnonblocking(client);
274         if (ret) {
275                 close(client);
276                 return -1;
277         }
278
279         peer = forb_peer_new();
280         if (peer) {
281                 struct inet_peer *ipeer;
282
283                 ipeer = forb_malloc(sizeof(*ipeer));
284                 if (ipeer) {
285                         ipeer->socket = client;
286                         peer->proto_priv = ipeer;
287                         peer->port = port;
288                         peer->state = FORB_PEER_DISCOVERED;
289                         inet_port_new_peer_insert(p, peer);
290                         //printf("New connection d=%d\n", client);
291                 } else {
292                         forb_peer_put(peer);
293                 }
294         }
295         
296         memset(&ev, 0, sizeof(ev));
297         ev.events = EPOLLIN | EPOLLET;
298         ev.data.fd = client;
299         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
300 }
301
302 static ssize_t
303 inet_recv(forb_port_t *port, void *buf, size_t len)
304 {
305         struct inet_port *iport = port->desc.proto_priv;
306 #if 1
307         struct epoll_event ev;
308         ssize_t ret;
309         int nfds;
310         forb_peer_t *peer;
311         bool exported_new_peer = false;
312         
313         for (;;) {
314                 if (iport->last_recv_fd == -1) {
315                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
316                         if (nfds < 1)
317                                 return -1;
318                         if (ev.data.fd == iport->listen_socket) {
319                                 ret = inet_accept_connection(port);
320                                 if (ret) {
321                                         ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
322                                         return -1;
323                                 } else
324                                         continue;
325                         } else {
326                                 iport->last_recv_fd = ev.data.fd;
327                         }
328                 }
329                 /* Check for first reception form a just connected peer */
330                 ul_list_for_each(inet_port_new_peer, iport, peer) {
331                         struct inet_peer *ipeer = peer->proto_priv;
332                         //printf("checking new peer with fd=%d\n", ipeer->socket);
333                         if (ipeer->socket == iport->last_recv_fd) {
334                                 inet_port_new_peer_delete(iport, peer);
335
336                                 if (port->new_peer) forb_peer_put(peer);
337
338                                 /* Let the upper layer assign forb ID
339                                  * to this peer according to the request*/
340                                 port->new_peer = peer;
341                                 exported_new_peer = true;
342                                 break;
343                         }
344                 }
345
346                 //printf("recv fd=%d\n", iport->last_recv_fd);
347                 ret = recv(iport->last_recv_fd, buf, len, 0);
348                 if (ret == -1) {
349                         if (exported_new_peer) {
350                                 forb_peer_put(peer);
351                                 port->new_peer = NULL;
352                         }
353                         if (errno != EAGAIN) {
354                                 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
355                         }
356                         iport->last_recv_fd = -1;
357                         continue;
358                 }
359                 if (ret == 0) {
360                         if (exported_new_peer) {
361                                 forb_peer_put(peer);
362                                 port->new_peer = NULL;
363                         }
364                         ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
365                         ul_list_for_each(forb_port_peer, port, peer) {
366                                 struct inet_peer *ipeer = peer->proto_priv;
367                                 if (ipeer && ipeer->socket == iport->last_recv_fd) {
368                                         forb_peer_disconnected(peer);
369                                         break;
370                                 }
371                         }
372                         iport->last_recv_fd = -1;
373                         continue;
374                 }
375                 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
376                 return ret;
377         }
378 #else
379         return recv(iport->udp_socket, buf, len, 0);    
380 #endif
381 }
382
383 static int
384 inet_port_destroy(forb_port_t * port)
385 {
386         struct inet_port *pd = port->desc.proto_priv;
387         close(pd->epoll_fd);
388         close(pd->udp_socket);
389         close(pd->listen_socket);
390         forb_free(pd);
391         return 0;
392 }
393
394 static ssize_t
395 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
396 {
397         struct inet_port *p = port->desc.proto_priv;
398         struct sockaddr_in addr;
399         ssize_t ret;
400         
401         addr.sin_family = AF_INET;
402         addr.sin_port = htons(MCAST_PORT);
403         addr.sin_addr = p->multicast_addr;
404         
405         ret = sendto(p->udp_socket, buf, len, 0,
406                      (struct sockaddr*)&addr, sizeof(addr));
407         return ret;
408 }
409
410 static void
411 inet_peer_destroy(forb_peer_t *peer)
412 {
413         struct inet_peer *ipeer = peer->proto_priv;
414         if (ipeer) {
415                 peer->proto_priv = NULL;
416                 ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
417                 close(ipeer->socket);
418                 free(ipeer);
419         }
420 }
421
422 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
423 {
424         const struct inet_addr *a = addr;
425         size_t ret = 0;
426         if (addr) {
427                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
428         }
429         return ret;
430 }
431
432 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
433
434 #include <fcb.h>
435 #include <fcb_contact_info.h>
436 #include <stdlib.h>
437
438 static void inet_register_cb(forb_port_t *port)
439 {
440         struct inet_addr *ia;
441         
442         ia = malloc(sizeof(*ia));
443         if (!ia) return;
444
445         char *fcb_addr = getenv("FCB_ADDR");
446         if (!fcb_addr) fcb_addr = "127.0.0.1";
447         ia->addr.s_addr = inet_addr(fcb_addr);
448         ia->port = htons(FCB_TCP_PORT);
449         forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
450 }
451 #else
452 #define inet_register_cb NULL
453 #endif
454
455 static const forb_proto_t proto_inet = {
456         .hello_interval = 40 /* seconds */,
457         .port_destroy = inet_port_destroy,
458         .peer_destroy = inet_peer_destroy,
459         .send = inet_send,
460         .recv = inet_recv,
461         .broadcast = inet_broadcast,
462         .serialize_addr = inet_serialize_addr,
463         .deserialize_addr = inet_deserialize_addr,
464         .addr2str = inet_addr2str,
465         .register_cb = inet_register_cb,
466 };
467
468 #define MAX_INTERFACES 10
469 int get_local_address(int sock, struct in_addr *addr)
470 {
471         struct ifconf  ifc;
472         struct ifreq   *ifr, req[MAX_INTERFACES];
473         
474
475         ifc.ifc_len = sizeof(req);
476         ifc.ifc_req = req;
477
478         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
479                 return -1;
480         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
481                 struct sockaddr_in ia;
482                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
483                 ioctl(sock, SIOCGIFFLAGS, ifr);
484                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
485                         *addr = ia.sin_addr;
486                         return 0;
487                 }
488         }
489         return -1;
490 }
491
492 /** 
493  * Initializes INET protocol port.
494  * 
495  * @param port_desc Port description to initialize.
496  * @return Zero on success, -1 on error and errno is set
497  * appropriately.
498  */
499 int
500 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
501                     uint16_t port)
502 {
503         int ret;       
504         struct inet_port *port_priv;
505         struct sockaddr_in addr;
506         socklen_t len;
507         struct epoll_event ev;
508         
509         port_priv = forb_malloc(sizeof(*port_priv));
510         if (!port_priv)
511                 return -1;
512
513         memset(port_priv, 0, sizeof(*port_priv));
514         port_priv->last_recv_fd = -1;
515         inet_port_new_peer_init_head(port_priv);
516         
517         /* Initialize UDP multicast socket */
518         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
519         if (port_priv->udp_socket == -1) goto err_free;
520         
521         int yes = 1;
522         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
523         if (ret)
524                 goto err_close_udp;
525
526         int reuse = 1;
527         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
528         if (ret)
529                 goto err_close_udp;
530
531         setnonblocking(port_priv->udp_socket);
532
533         struct ip_mreq mreq;
534         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
535         mreq.imr_multiaddr = port_priv->multicast_addr;
536         mreq.imr_interface.s_addr = INADDR_ANY;
537         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
538                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
539         if (ret)
540                 goto err_close_udp;
541
542         addr.sin_family = AF_INET;
543         addr.sin_port = htons(MCAST_PORT);
544         addr.sin_addr = port_priv->multicast_addr;
545
546         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
547         if (ret != 0) goto err_close_udp;
548
549         char loop = 1;
550         unsigned loop_size = sizeof(loop);
551         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
552                          IP_MULTICAST_LOOP, &loop, loop_size);
553         if (ret)
554                 goto err_close_udp;
555         
556
557         /* Initialize TCP socket */
558         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
559         if (port_priv->listen_socket == -1) goto err_close_udp;
560
561         reuse = 1;
562         setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
563         if (ret)
564                 goto err_close_listen;
565
566         addr.sin_family = AF_INET;
567         addr.sin_port = htons(port);
568         addr.sin_addr = listen_on;
569         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
570         if (ret != 0) goto err_close_listen;
571         if (setnonblocking(port_priv->listen_socket))
572                 goto err_close_listen;
573
574         ret = listen(port_priv->listen_socket, 10);
575         if (ret)
576                 goto err_close_listen;
577
578         /* Determine our address and port*/
579         len = sizeof(addr);
580         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
581         if (ret) {
582                 ul_logerr("Non-loopback inet address not found\n");
583                 goto err_close_listen;
584         }
585
586         port_priv->addr.port = addr.sin_port;
587         if (listen_on.s_addr == INADDR_ANY)
588                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
589         else
590                 port_priv->addr.addr = listen_on;
591
592         /* Initialize epoll descriptor */
593         port_priv->epoll_fd = epoll_create(10);
594         if (port_priv->epoll_fd == -1)
595                 goto err_close_listen;
596
597         memset(&ev, 0, sizeof(ev));
598         ev.events = EPOLLIN | EPOLLET;
599         ev.data.fd = port_priv->listen_socket;
600         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
601                         port_priv->listen_socket, &ev);
602         if (ret)
603                 goto err_close_epoll;
604
605 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT  
606         ev.events = EPOLLIN | EPOLLET;
607         ev.data.fd = port_priv->udp_socket;
608         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
609                         port_priv->udp_socket, &ev);
610         if (ret)
611                 goto err_close_epoll;
612 #endif
613
614         port_desc->proto = &proto_inet;
615         port_desc->proto_priv = port_priv;
616         port_desc->addr = &port_priv->addr;
617         return 0;
618 err_close_epoll:
619         close(port_priv->epoll_fd);
620 err_close_listen:
621         ret = errno;
622         close(port_priv->listen_socket);
623         errno = ret;
624 err_close_udp:
625         ret = errno;
626         close(port_priv->udp_socket);
627         errno = ret;
628 err_free:
629         ret = errno;
630         forb_free(port_priv);
631         errno = ret;
632         return -1;
633 }