]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
1c356794219f2f04c5b4ac5815035731f1757cbb
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61 #include <forb/config.h>
62 #include "discovery.h"
63
64 /**
65  * @file   proto_inet.c
66  * @author Michal Sojka <sojkam1@fel.cvut.cz>
67  * @date   Sun Oct 12 16:10:23 2008
68  * 
69  * @brief  FORB transport protocol based on INET family sockets.
70  * 
71  * UDP is used for broadcasts and TCP for requests/replies. There
72  * exist two uni-drectional connections between any two communicating
73  * peers.
74  */
75
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
77
78 #define MCAST_PORT 15514        /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
80
81 /** Address used by inet protocol. All values are stored in network
82  * byte order. */
83 struct inet_addr {
84         struct in_addr addr;
85         uint16_t port;          /**< TCP listening port */
86 };
87
88 /** INET protocol data for ports. */
89 struct inet_port {
90         int udp_socket;         /**< Socket for sending and receiving broadcasts */
91         int listen_socket;      /*  */
92         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
93         struct inet_addr addr;  /**< Address of this port */
94         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
95         struct in_addr multicast_addr;
96         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
97 };
98
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100                  struct inet_port,   /* cust_head_t */
101                  forb_peer_t,        /* cust_item_t */
102                  new_peers,          /* cust_head_field */
103                  lnode)              /* cust_node_field */
104
105
106 /** INET protocol data associated with every peer */
107 struct inet_peer {
108         int socket;             /**< Connected socket to the peer */
109 };
110
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
113
114 static CORBA_boolean
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
116 {
117         const struct inet_addr *a = addr;
118         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119         CORBA_unsigned_short hport = ntohs(a->port);
120         CORBA_boolean ret;
121         ret = CORBA_unsigned_long_serialize(codec, &haddr);
122         if (!ret)
123                 return ret;
124         return CORBA_unsigned_short_serialize(codec, &hport);
125 }
126
127 static CORBA_boolean
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
129 {
130         struct inet_addr *a;
131         CORBA_unsigned_long s_addr;
132         CORBA_unsigned_short port;
133         CORBA_boolean ret;
134
135         a = forb_malloc(sizeof(*a));
136         if (!a)
137                 return CORBA_FALSE;
138         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
139         if (!ret)
140                 return ret;
141         ret = CORBA_unsigned_short_deserialize(codec, &port);
142         a->addr.s_addr = htonl(s_addr);
143         a->port = htons(port);
144         *addr = a;
145         return ret;
146 }
147
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
150 {
151         struct inet_peer *ipeer;
152         struct sockaddr_in sa;
153         struct inet_addr *addr = peer->addr;
154         int ret;
155
156         if (!addr) {
157                 ul_logerr("No address to connect\n");
158                 return NULL;
159         }
160         ipeer = forb_malloc(sizeof(*ipeer));
161         if (!ipeer)
162                 goto err;
163         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164         if (!ipeer->socket) {
165                 ul_logerr("socket(): %s\n", strerror(errno));
166                 goto err_free;
167         }
168         sa.sin_family = AF_INET;
169         sa.sin_port = addr->port;
170         sa.sin_addr = addr->addr;
171         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
173         if (ret) {
174                 ul_logerr("connect error: %s\n", strerror(errno));
175                 goto err_close;
176         }
177
178         struct epoll_event ev;
179         struct inet_port *p = peer->port->desc.proto_priv;
180         memset(&ev, 0, sizeof(ev));
181         ev.events = EPOLLIN | EPOLLET;
182         ev.data.fd = ipeer->socket;
183         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
184         if (ret) {
185                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
186                 goto err_close;
187         }
188
189
190         return ipeer;
191 err_close:
192         close(ipeer->socket);
193 err_free:
194         forb_free(ipeer);
195 err:
196         return NULL;
197         
198 }
199
200 static size_t
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
202 {
203         struct inet_peer *ipeer = peer->proto_priv;
204         size_t ret, sent;
205
206         if (!ipeer) {
207                 ipeer = inet_connect(peer);
208                 if (!ipeer) {
209                         return -1;
210                 }
211                 peer->proto_priv = ipeer;
212                 
213         }
214
215         sent = 0;
216         ul_logdeb("send fd=%d\n", ipeer->socket);
217         do {
218                 ret = send(ipeer->socket, buf, len, 0);
219                 if (ret < 0) {
220                         ul_logerr("send error: %s\n", strerror(errno));
221                         return ret;
222                 }
223                 sent += ret;
224                 buf += ret;
225                 len -= ret;
226         } while (len > 0);
227
228         return sent;
229 }
230
231 /*----------------------------------------------------------------------
232  Portable function to set a socket into nonblocking mode.
233  Calling this on a socket causes all future read() and write() calls on
234  that socket to do only as much as they can immediately, and return 
235  without waiting.
236  If no data can be read or written, they return -1 and set errno
237  to EAGAIN (or EWOULDBLOCK).
238  Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
241 {
242     int flags;
243
244     /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
248         flags = 0;
249     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 #else
251     /* Otherwise, use the old way of doing it */
252     flags = 1;
253     return ioctl(fd, FIOBIO, &flags);
254 #endif
255 }     
256
257 static int
258 inet_accept_connection(forb_port_t *port)
259 {
260         struct inet_port *p = port->desc.proto_priv;
261         int client;
262         struct sockaddr_in addr;
263         socklen_t addrlen = sizeof(addr);
264         struct epoll_event ev;
265         int ret;
266         forb_peer_t *peer;
267                                 
268         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
269         if (client < 0){
270                 //perror("accept");
271                 return -1;
272         }
273         ret = setnonblocking(client);
274         if (ret) {
275                 close(client);
276                 return -1;
277         }
278
279         peer = forb_peer_new();
280         if (peer) {
281                 struct inet_peer *ipeer;
282
283                 ipeer = forb_malloc(sizeof(*ipeer));
284                 if (ipeer) {
285                         ipeer->socket = client;
286                         peer->proto_priv = ipeer;
287                         peer->port = port;
288                         peer->state = FORB_PEER_DISCOVERED;
289                         inet_port_new_peer_insert(p, peer);
290                         //printf("New connection d=%d\n", client);
291                 } else {
292                         forb_peer_put(peer);
293                 }
294         }
295         
296         memset(&ev, 0, sizeof(ev));
297         ev.events = EPOLLIN | EPOLLET;
298         ev.data.fd = client;
299         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
300 }
301
302 static size_t
303 inet_recv(forb_port_t *port, void *buf, size_t len)
304 {
305         struct inet_port *iport = port->desc.proto_priv;
306 #if 1
307         struct epoll_event ev;
308         int ret, nfds;
309         forb_peer_t *peer;
310         bool exported_new_peer = false;
311         
312         for (;;) {
313                 if (iport->last_recv_fd == -1) {
314                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
315                         if (nfds < 1)
316                                 return -1;
317                         if (ev.data.fd == iport->listen_socket) {
318                                 ret = inet_accept_connection(port);
319                                 if (ret)
320                                         return -1;
321                                 else
322                                         continue;
323                         } else {
324                                 iport->last_recv_fd = ev.data.fd;
325                         }
326                 }
327                 /* Check for first reception form a just connected peer */
328                 ul_list_for_each(inet_port_new_peer, iport, peer) {
329                         struct inet_peer *ipeer = peer->proto_priv;
330                         //printf("checking new peer with fd=%d\n", ipeer->socket);
331                         if (ipeer->socket == iport->last_recv_fd) {
332                                 inet_port_new_peer_delete(iport, peer);
333
334                                 if (port->new_peer) forb_peer_put(peer);
335
336                                 /* Let the upper layer assign forb ID
337                                  * to this peer according to the request*/
338                                 port->new_peer = peer;
339                                 exported_new_peer = true;
340                                 break;
341                         }
342                 }
343
344                 //printf("recv fd=%d\n", iport->last_recv_fd);
345                 ret = recv(iport->last_recv_fd, buf, len, 0);
346                 if (ret == -1) {
347                         if (exported_new_peer) {
348                                 forb_peer_put(peer);
349                                 port->new_peer = NULL;
350                         }
351                         if (errno == EAGAIN) {
352                                 iport->last_recv_fd = -1;
353                                 continue;
354                         }
355                 }
356                 if (ret == 0) {
357                         if (exported_new_peer) {
358                                 forb_peer_put(peer);
359                                 port->new_peer = NULL;
360                         }
361                         close(iport->last_recv_fd);
362                         /* TODO: Notify FORB about peer disconnect */
363                         iport->last_recv_fd = -1;
364                         continue;
365                 }
366                 return ret;
367         }
368 #else
369         return recv(iport->udp_socket, buf, len, 0);    
370 #endif
371 }
372
373 static int
374 inet_port_destroy(forb_port_t * port)
375 {
376         struct inet_port *pd = port->desc.proto_priv;
377         close(pd->epoll_fd);
378         close(pd->udp_socket);
379         close(pd->listen_socket);
380         forb_free(pd);
381         return 0;
382 }
383
384 static size_t
385 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
386 {
387         struct inet_port *p = port->desc.proto_priv;
388         struct sockaddr_in addr;
389         int ret;
390         
391         addr.sin_family = AF_INET;
392         addr.sin_port = htons(MCAST_PORT);
393         addr.sin_addr = p->multicast_addr;
394         
395         ret = sendto(p->udp_socket, buf, len, 0,
396                      (struct sockaddr*)&addr, sizeof(addr));
397         return ret;
398 }
399
400 static void
401 inet_peer_destroy(forb_peer_t *peer)
402 {
403         struct inet_peer *ipeer = peer->proto_priv;
404         if (ipeer) {
405                 peer->proto_priv = NULL;
406                 close(ipeer->socket);
407                 free(ipeer);
408         }
409 }
410
411 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
412 {
413         const struct inet_addr *a = addr;
414         size_t ret = 0;
415         if (addr) {
416                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
417         }
418         return ret;
419 }
420
421 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
422
423 #include <fcb.h>
424 #include <fcb_contact_info.h>
425 #include <stdlib.h>
426
427 static void inet_register_cb(forb_port_t *port)
428 {
429         struct inet_addr *ia;
430         
431         ia = malloc(sizeof(*ia));
432         if (!ia) return;
433
434         char *fcb_addr = getenv("FCB_ADDR");
435         if (!fcb_addr) fcb_addr = "127.0.0.1";
436         ia->addr.s_addr = inet_addr(fcb_addr);
437         ia->port = htons(FCB_TCP_PORT);
438         forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
439 }
440 #else
441 #define inet_register_cb NULL
442 #endif
443
444 static const forb_proto_t proto_inet = {
445         .hello_interval = 40 /* seconds */,
446         .port_destroy = inet_port_destroy,
447         .peer_destroy = inet_peer_destroy,
448         .send = inet_send,
449         .recv = inet_recv,
450         .broadcast = inet_broadcast,
451         .serialize_addr = inet_serialize_addr,
452         .deserialize_addr = inet_deserialize_addr,
453         .addr2str = inet_addr2str,
454         .register_cb = inet_register_cb,
455 };
456
457 #define MAX_INTERFACES 10
458 int get_local_address(int sock, struct in_addr *addr)
459 {
460         struct ifconf  ifc;
461         struct ifreq   *ifr, req[MAX_INTERFACES];
462         
463
464         ifc.ifc_len = sizeof(req);
465         ifc.ifc_req = req;
466
467         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
468                 return -1;
469         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
470                 struct sockaddr_in ia;
471                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
472                 ioctl(sock, SIOCGIFFLAGS, ifr);
473                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
474                         *addr = ia.sin_addr;
475                         return 0;
476                 }
477         }
478         return -1;
479 }
480
481 /** 
482  * Initializes INET protocol port.
483  * 
484  * @param port_desc Port description to initialize.
485  * @return Zero on success, -1 on error and errno is set
486  * appropriately.
487  */
488 int
489 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
490                     uint16_t port)
491 {
492         int ret;       
493         struct inet_port *port_priv;
494         struct sockaddr_in addr;
495         socklen_t len;
496         struct epoll_event ev;
497         
498         port_priv = forb_malloc(sizeof(*port_priv));
499         if (!port_priv)
500                 return -1;
501
502         memset(port_priv, 0, sizeof(*port_priv));
503         port_priv->last_recv_fd = -1;
504         inet_port_new_peer_init_head(port_priv);
505         
506         /* Initialize UDP multicast socket */
507         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
508         if (port_priv->udp_socket == -1) goto err_free;
509         
510         int yes = 1;
511         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
512         if (ret)
513                 goto err_close_udp;
514
515         int reuse = 1;
516         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
517         if (ret)
518                 goto err_close_udp;
519
520         setnonblocking(port_priv->udp_socket);
521
522         struct ip_mreq mreq;
523         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
524         mreq.imr_multiaddr = port_priv->multicast_addr;
525         mreq.imr_interface.s_addr = INADDR_ANY;
526         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
527                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
528         if (ret)
529                 goto err_close_udp;
530
531         addr.sin_family = AF_INET;
532         addr.sin_port = htons(MCAST_PORT);
533         addr.sin_addr = port_priv->multicast_addr;
534
535         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
536         if (ret != 0) goto err_close_udp;
537
538         char loop = 1;
539         unsigned loop_size = sizeof(loop);
540         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
541                          IP_MULTICAST_LOOP, &loop, loop_size);
542         if (ret)
543                 goto err_close_udp;
544         
545
546         /* Initialize TCP socket */
547         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
548         if (port_priv->listen_socket == -1) goto err_close_udp;
549
550         addr.sin_family = AF_INET;
551         addr.sin_port = htons(port);
552         addr.sin_addr = listen_on;
553         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
554         if (ret != 0) goto err_close_listen;
555         if (setnonblocking(port_priv->listen_socket))
556                 goto err_close_listen;
557
558         ret = listen(port_priv->listen_socket, 10);
559         if (ret)
560                 goto err_close_listen;
561
562         /* Determine our address and port*/
563         len = sizeof(addr);
564         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
565         if (ret) {
566                 ul_logerr("Non-loopback inet address not found\n");
567                 goto err_close_listen;
568         }
569
570         port_priv->addr.port = addr.sin_port;
571         if (listen_on.s_addr == INADDR_ANY)
572                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
573         else
574                 port_priv->addr.addr = listen_on;
575
576         /* Initialize epoll descriptor */
577         port_priv->epoll_fd = epoll_create(10);
578         if (port_priv->epoll_fd == -1)
579                 goto err_close_listen;
580
581         memset(&ev, 0, sizeof(ev));
582         ev.events = EPOLLIN | EPOLLET;
583         ev.data.fd = port_priv->listen_socket;
584         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
585                         port_priv->listen_socket, &ev);
586         if (ret)
587                 goto err_close_epoll;
588
589 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT  
590         ev.events = EPOLLIN | EPOLLET;
591         ev.data.fd = port_priv->udp_socket;
592         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
593                         port_priv->udp_socket, &ev);
594         if (ret)
595                 goto err_close_epoll;
596 #endif
597
598         port_desc->proto = &proto_inet;
599         port_desc->proto_priv = port_priv;
600         port_desc->addr = &port_priv->addr;
601         return 0;
602 err_close_epoll:
603         close(port_priv->epoll_fd);
604 err_close_listen:
605         ret = errno;
606         close(port_priv->listen_socket);
607         errno = ret;
608 err_close_udp:
609         ret = errno;
610         close(port_priv->udp_socket);
611         errno = ret;
612 err_free:
613         ret = errno;
614         forb_free(port_priv);
615         errno = ret;
616         return -1;
617 }