]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
Added more debugging messages to protocols
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61 #include <forb/config.h>
62 #include "discovery.h"
63
64 /**
65  * @file   proto_inet.c
66  * @author Michal Sojka <sojkam1@fel.cvut.cz>
67  * @date   Sun Oct 12 16:10:23 2008
68  * 
69  * @brief  FORB transport protocol based on INET family sockets.
70  * 
71  * UDP is used for broadcasts and TCP for requests/replies. There
72  * exist two uni-drectional connections between any two communicating
73  * peers.
74  */
75
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
77
78 #define MCAST_PORT 15514        /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
80
81 /** Address used by inet protocol. All values are stored in network
82  * byte order. */
83 struct inet_addr {
84         struct in_addr addr;
85         uint16_t port;          /**< TCP listening port */
86 };
87
88 /** INET protocol data for ports. */
89 struct inet_port {
90         int udp_socket;         /**< Socket for sending and receiving broadcasts */
91         int listen_socket;      /*  */
92         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
93         struct inet_addr addr;  /**< Address of this port */
94         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
95         struct in_addr multicast_addr;
96         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
97 };
98
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100                  struct inet_port,   /* cust_head_t */
101                  forb_peer_t,        /* cust_item_t */
102                  new_peers,          /* cust_head_field */
103                  lnode)              /* cust_node_field */
104
105
106 /** INET protocol data associated with every peer */
107 struct inet_peer {
108         int socket;             /**< Connected socket to the peer */
109 };
110
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
113
114 static CORBA_boolean
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
116 {
117         const struct inet_addr *a = addr;
118         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119         CORBA_unsigned_short hport = ntohs(a->port);
120         CORBA_boolean ret;
121         ret = CORBA_unsigned_long_serialize(codec, &haddr);
122         if (!ret)
123                 return ret;
124         return CORBA_unsigned_short_serialize(codec, &hport);
125 }
126
127 static CORBA_boolean
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
129 {
130         struct inet_addr *a;
131         CORBA_unsigned_long s_addr;
132         CORBA_unsigned_short port;
133         CORBA_boolean ret;
134
135         a = forb_malloc(sizeof(*a));
136         if (!a)
137                 return CORBA_FALSE;
138         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
139         if (!ret)
140                 return ret;
141         ret = CORBA_unsigned_short_deserialize(codec, &port);
142         a->addr.s_addr = htonl(s_addr);
143         a->port = htons(port);
144         *addr = a;
145         return ret;
146 }
147
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
150 {
151         struct inet_peer *ipeer;
152         struct sockaddr_in sa;
153         struct inet_addr *addr = peer->addr;
154         int ret;
155
156         if (!addr) {
157                 ul_logerr("No address to connect\n");
158                 return NULL;
159         }
160         ipeer = forb_malloc(sizeof(*ipeer));
161         if (!ipeer)
162                 goto err;
163         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164         if (!ipeer->socket) {
165                 ul_logerr("socket(): %s\n", strerror(errno));
166                 goto err_free;
167         }
168         sa.sin_family = AF_INET;
169         sa.sin_port = addr->port;
170         sa.sin_addr = addr->addr;
171         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
173         if (ret) {
174                 ul_logerr("connect error: %s\n", strerror(errno));
175                 goto err_close;
176         }
177
178         struct epoll_event ev;
179         struct inet_port *p = peer->port->desc.proto_priv;
180         memset(&ev, 0, sizeof(ev));
181         ev.events = EPOLLIN | EPOLLET;
182         ev.data.fd = ipeer->socket;
183         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
184         if (ret) {
185                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
186                 goto err_close;
187         }
188
189
190         return ipeer;
191 err_close:
192         close(ipeer->socket);
193 err_free:
194         forb_free(ipeer);
195 err:
196         return NULL;
197         
198 }
199
200 static ssize_t
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
202 {
203         struct inet_peer *ipeer = peer->proto_priv;
204         ssize_t ret, sent;
205
206         if (!ipeer) {
207                 ipeer = inet_connect(peer);
208                 if (!ipeer) {
209                         return -1;
210                 }
211                 peer->proto_priv = ipeer;
212                 
213         }
214
215         sent = 0;
216         ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
217         do {
218                 ret = send(ipeer->socket, buf, len, 0);
219                 if (ret < 0) {
220                         ul_logerr("send error: %s\n", strerror(errno));
221                         return ret;
222                 }
223                 sent += ret;
224                 buf += ret;
225                 len -= ret;
226         } while (len > 0);
227
228         return sent;
229 }
230
231 /*----------------------------------------------------------------------
232  Portable function to set a socket into nonblocking mode.
233  Calling this on a socket causes all future read() and write() calls on
234  that socket to do only as much as they can immediately, and return 
235  without waiting.
236  If no data can be read or written, they return -1 and set errno
237  to EAGAIN (or EWOULDBLOCK).
238  Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
241 {
242     int flags;
243
244     /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
248         flags = 0;
249     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 #else
251     /* Otherwise, use the old way of doing it */
252     flags = 1;
253     return ioctl(fd, FIOBIO, &flags);
254 #endif
255 }     
256
257 static int
258 inet_accept_connection(forb_port_t *port)
259 {
260         struct inet_port *p = port->desc.proto_priv;
261         int client;
262         struct sockaddr_in addr;
263         socklen_t addrlen = sizeof(addr);
264         struct epoll_event ev;
265         int ret;
266         forb_peer_t *peer;
267                                 
268         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
269         if (client < 0){
270                 //perror("accept");
271                 return -1;
272         }
273         ret = setnonblocking(client);
274         if (ret) {
275                 close(client);
276                 return -1;
277         }
278
279         peer = forb_peer_new();
280         if (peer) {
281                 struct inet_peer *ipeer;
282
283                 ipeer = forb_malloc(sizeof(*ipeer));
284                 if (ipeer) {
285                         ipeer->socket = client;
286                         peer->proto_priv = ipeer;
287                         peer->port = port;
288                         peer->state = FORB_PEER_DISCOVERED;
289                         inet_port_new_peer_insert(p, peer);
290                         //printf("New connection d=%d\n", client);
291                 } else {
292                         forb_peer_put(peer);
293                 }
294         }
295         
296         memset(&ev, 0, sizeof(ev));
297         ev.events = EPOLLIN | EPOLLET;
298         ev.data.fd = client;
299         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
300 }
301
302 static ssize_t
303 inet_recv(forb_port_t *port, void *buf, size_t len)
304 {
305         struct inet_port *iport = port->desc.proto_priv;
306 #if 1
307         struct epoll_event ev;
308         ssize_t ret;
309         int nfds;
310         forb_peer_t *peer;
311         bool exported_new_peer = false;
312         
313         for (;;) {
314                 if (iport->last_recv_fd == -1) {
315                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
316                         if (nfds < 1)
317                                 return -1;
318                         if (ev.data.fd == iport->listen_socket) {
319                                 ret = inet_accept_connection(port);
320                                 if (ret) {
321                                         ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
322                                         return -1;
323                                 } else
324                                         continue;
325                         } else {
326                                 iport->last_recv_fd = ev.data.fd;
327                         }
328                 }
329                 /* Check for first reception form a just connected peer */
330                 ul_list_for_each(inet_port_new_peer, iport, peer) {
331                         struct inet_peer *ipeer = peer->proto_priv;
332                         //printf("checking new peer with fd=%d\n", ipeer->socket);
333                         if (ipeer->socket == iport->last_recv_fd) {
334                                 inet_port_new_peer_delete(iport, peer);
335
336                                 if (port->new_peer) forb_peer_put(peer);
337
338                                 /* Let the upper layer assign forb ID
339                                  * to this peer according to the request*/
340                                 port->new_peer = peer;
341                                 exported_new_peer = true;
342                                 break;
343                         }
344                 }
345
346                 //printf("recv fd=%d\n", iport->last_recv_fd);
347                 ret = recv(iport->last_recv_fd, buf, len, 0);
348                 if (ret == -1) {
349                         if (exported_new_peer) {
350                                 forb_peer_put(peer);
351                                 port->new_peer = NULL;
352                         }
353                         if (errno != EAGAIN) {
354                                 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
355                         }
356                         iport->last_recv_fd = -1;
357                         continue;
358                 }
359                 if (ret == 0) {
360                         if (exported_new_peer) {
361                                 forb_peer_put(peer);
362                                 port->new_peer = NULL;
363                         }
364                         ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
365                         close(iport->last_recv_fd);
366                         /* TODO: Notify FORB about peer disconnect */
367                         iport->last_recv_fd = -1;
368                         continue;
369                 }
370                 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
371                 return ret;
372         }
373 #else
374         return recv(iport->udp_socket, buf, len, 0);    
375 #endif
376 }
377
378 static int
379 inet_port_destroy(forb_port_t * port)
380 {
381         struct inet_port *pd = port->desc.proto_priv;
382         close(pd->epoll_fd);
383         close(pd->udp_socket);
384         close(pd->listen_socket);
385         forb_free(pd);
386         return 0;
387 }
388
389 static ssize_t
390 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
391 {
392         struct inet_port *p = port->desc.proto_priv;
393         struct sockaddr_in addr;
394         ssize_t ret;
395         
396         addr.sin_family = AF_INET;
397         addr.sin_port = htons(MCAST_PORT);
398         addr.sin_addr = p->multicast_addr;
399         
400         ret = sendto(p->udp_socket, buf, len, 0,
401                      (struct sockaddr*)&addr, sizeof(addr));
402         return ret;
403 }
404
405 static void
406 inet_peer_destroy(forb_peer_t *peer)
407 {
408         struct inet_peer *ipeer = peer->proto_priv;
409         if (ipeer) {
410                 peer->proto_priv = NULL;
411                 close(ipeer->socket);
412                 free(ipeer);
413         }
414 }
415
416 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
417 {
418         const struct inet_addr *a = addr;
419         size_t ret = 0;
420         if (addr) {
421                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
422         }
423         return ret;
424 }
425
426 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
427
428 #include <fcb.h>
429 #include <fcb_contact_info.h>
430 #include <stdlib.h>
431
432 static void inet_register_cb(forb_port_t *port)
433 {
434         struct inet_addr *ia;
435         
436         ia = malloc(sizeof(*ia));
437         if (!ia) return;
438
439         char *fcb_addr = getenv("FCB_ADDR");
440         if (!fcb_addr) fcb_addr = "127.0.0.1";
441         ia->addr.s_addr = inet_addr(fcb_addr);
442         ia->port = htons(FCB_TCP_PORT);
443         forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
444 }
445 #else
446 #define inet_register_cb NULL
447 #endif
448
449 static const forb_proto_t proto_inet = {
450         .hello_interval = 40 /* seconds */,
451         .port_destroy = inet_port_destroy,
452         .peer_destroy = inet_peer_destroy,
453         .send = inet_send,
454         .recv = inet_recv,
455         .broadcast = inet_broadcast,
456         .serialize_addr = inet_serialize_addr,
457         .deserialize_addr = inet_deserialize_addr,
458         .addr2str = inet_addr2str,
459         .register_cb = inet_register_cb,
460 };
461
462 #define MAX_INTERFACES 10
463 int get_local_address(int sock, struct in_addr *addr)
464 {
465         struct ifconf  ifc;
466         struct ifreq   *ifr, req[MAX_INTERFACES];
467         
468
469         ifc.ifc_len = sizeof(req);
470         ifc.ifc_req = req;
471
472         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
473                 return -1;
474         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
475                 struct sockaddr_in ia;
476                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
477                 ioctl(sock, SIOCGIFFLAGS, ifr);
478                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
479                         *addr = ia.sin_addr;
480                         return 0;
481                 }
482         }
483         return -1;
484 }
485
486 /** 
487  * Initializes INET protocol port.
488  * 
489  * @param port_desc Port description to initialize.
490  * @return Zero on success, -1 on error and errno is set
491  * appropriately.
492  */
493 int
494 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
495                     uint16_t port)
496 {
497         int ret;       
498         struct inet_port *port_priv;
499         struct sockaddr_in addr;
500         socklen_t len;
501         struct epoll_event ev;
502         
503         port_priv = forb_malloc(sizeof(*port_priv));
504         if (!port_priv)
505                 return -1;
506
507         memset(port_priv, 0, sizeof(*port_priv));
508         port_priv->last_recv_fd = -1;
509         inet_port_new_peer_init_head(port_priv);
510         
511         /* Initialize UDP multicast socket */
512         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
513         if (port_priv->udp_socket == -1) goto err_free;
514         
515         int yes = 1;
516         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
517         if (ret)
518                 goto err_close_udp;
519
520         int reuse = 1;
521         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
522         if (ret)
523                 goto err_close_udp;
524
525         setnonblocking(port_priv->udp_socket);
526
527         struct ip_mreq mreq;
528         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
529         mreq.imr_multiaddr = port_priv->multicast_addr;
530         mreq.imr_interface.s_addr = INADDR_ANY;
531         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
532                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
533         if (ret)
534                 goto err_close_udp;
535
536         addr.sin_family = AF_INET;
537         addr.sin_port = htons(MCAST_PORT);
538         addr.sin_addr = port_priv->multicast_addr;
539
540         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
541         if (ret != 0) goto err_close_udp;
542
543         char loop = 1;
544         unsigned loop_size = sizeof(loop);
545         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
546                          IP_MULTICAST_LOOP, &loop, loop_size);
547         if (ret)
548                 goto err_close_udp;
549         
550
551         /* Initialize TCP socket */
552         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
553         if (port_priv->listen_socket == -1) goto err_close_udp;
554
555         reuse = 1;
556         setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
557         if (ret)
558                 goto err_close_listen;
559
560         addr.sin_family = AF_INET;
561         addr.sin_port = htons(port);
562         addr.sin_addr = listen_on;
563         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
564         if (ret != 0) goto err_close_listen;
565         if (setnonblocking(port_priv->listen_socket))
566                 goto err_close_listen;
567
568         ret = listen(port_priv->listen_socket, 10);
569         if (ret)
570                 goto err_close_listen;
571
572         /* Determine our address and port*/
573         len = sizeof(addr);
574         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
575         if (ret) {
576                 ul_logerr("Non-loopback inet address not found\n");
577                 goto err_close_listen;
578         }
579
580         port_priv->addr.port = addr.sin_port;
581         if (listen_on.s_addr == INADDR_ANY)
582                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
583         else
584                 port_priv->addr.addr = listen_on;
585
586         /* Initialize epoll descriptor */
587         port_priv->epoll_fd = epoll_create(10);
588         if (port_priv->epoll_fd == -1)
589                 goto err_close_listen;
590
591         memset(&ev, 0, sizeof(ev));
592         ev.events = EPOLLIN | EPOLLET;
593         ev.data.fd = port_priv->listen_socket;
594         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
595                         port_priv->listen_socket, &ev);
596         if (ret)
597                 goto err_close_epoll;
598
599 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT  
600         ev.events = EPOLLIN | EPOLLET;
601         ev.data.fd = port_priv->udp_socket;
602         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
603                         port_priv->udp_socket, &ev);
604         if (ret)
605                 goto err_close_epoll;
606 #endif
607
608         port_desc->proto = &proto_inet;
609         port_desc->proto_priv = port_priv;
610         port_desc->addr = &port_priv->addr;
611         return 0;
612 err_close_epoll:
613         close(port_priv->epoll_fd);
614 err_close_listen:
615         ret = errno;
616         close(port_priv->listen_socket);
617         errno = ret;
618 err_close_udp:
619         ret = errno;
620         close(port_priv->udp_socket);
621         errno = ret;
622 err_free:
623         ret = errno;
624         forb_free(port_priv);
625         errno = ret;
626         return -1;
627 }