]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
c7874c3e2ecfc0786ce7e5f4cd141908422a547e
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61
62 /**
63  * @file   proto_inet.c
64  * @author Michal Sojka <sojkam1@fel.cvut.cz>
65  * @date   Sun Oct 12 16:10:23 2008
66  * 
67  * @brief  FORB transport protocol based on INET family sockets.
68  * 
69  * UDP is used for broadcasts and TCP for requests/replies. There
70  * exist two uni-drectional connections between any two communicating
71  * peers.
72  */
73
74 extern UL_LOG_CUST(ulogd_forb_proto_inet);
75
76 #define MCAST_PORT 15514        /**< Port used for multicasts */
77 #define MCAST_ADDR "225.15.5.14"
78
79 /** Address used by inet protocol. All values are stored in network
80  * byte order. */
81 struct inet_addr {
82         struct in_addr addr;
83         uint16_t port;          /**< TCP listening port */
84 };
85
86 /** INET protocol data for ports. */
87 struct inet_port {
88         int udp_socket;         /**< Socket for sending and receiving broadcasts */
89         int listen_socket;      /*  */
90         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
91         struct inet_addr addr;  /**< Address of this port */
92         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
93         struct in_addr multicast_addr;
94         ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
95 };
96
97 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
98                  struct inet_port,   /* cust_head_t */
99                  forb_peer_t,        /* cust_item_t */
100                  new_peers,          /* cust_head_field */
101                  lnode)              /* cust_node_field */
102
103
104 /** INET protocol data associated with every peer */
105 struct inet_peer {
106         int socket;             /**< Connected socket to the peer */
107 };
108
109 /* static struct inet_port* */
110 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
111
112 static CORBA_boolean
113 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
114 {
115         const struct inet_addr *a = addr;
116         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
117         CORBA_unsigned_short hport = ntohs(a->port);
118         CORBA_boolean ret;
119         ret = CORBA_unsigned_long_serialize(codec, &haddr);
120         if (!ret)
121                 return ret;
122         return CORBA_unsigned_short_serialize(codec, &hport);
123 }
124
125 static CORBA_boolean
126 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
127 {
128         struct inet_addr *a;
129         CORBA_unsigned_long s_addr;
130         CORBA_unsigned_short port;
131         CORBA_boolean ret;
132
133         a = forb_malloc(sizeof(*a));
134         if (!a)
135                 return CORBA_FALSE;
136         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
137         if (!ret)
138                 return ret;
139         ret = CORBA_unsigned_short_deserialize(codec, &port);
140         a->addr.s_addr = htonl(s_addr);
141         a->port = htons(port);
142         *addr = a;
143         return ret;
144 }
145
146 static struct inet_peer *
147 inet_connect(forb_peer_t *peer)
148 {
149         struct inet_peer *ipeer;
150         struct sockaddr_in sa;
151         struct inet_addr *addr = peer->addr;
152         int ret;
153
154         if (!addr) {
155                 ul_logerr("No address to connect\n");
156                 return NULL;
157         }
158         ipeer = forb_malloc(sizeof(*ipeer));
159         if (!ipeer)
160                 goto err;
161         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
162         if (!ipeer->socket) {
163                 ul_logerr("socket(): %s\n", strerror(errno));
164                 goto err_free;
165         }
166         sa.sin_family = AF_INET;
167         sa.sin_port = addr->port;
168         sa.sin_addr = addr->addr;
169         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
170         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
171         if (ret) {
172                 ul_logerr("connect error: %s\n", strerror(errno));
173                 goto err_close;
174         }
175
176         struct epoll_event ev;
177         struct inet_port *p = peer->port->desc.proto_priv;
178         memset(&ev, 0, sizeof(ev));
179         ev.events = EPOLLIN | EPOLLET;
180         ev.data.fd = ipeer->socket;
181         ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
182         if (ret) {
183                 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
184                 goto err_close;
185         }
186
187
188         return ipeer;
189 err_close:
190         close(ipeer->socket);
191 err_free:
192         forb_free(ipeer);
193 err:
194         return NULL;
195         
196 }
197
198 static size_t
199 inet_send(forb_peer_t *peer, const void *buf, size_t len)
200 {
201         struct inet_peer *ipeer = peer->proto_priv;
202         size_t ret, sent;
203
204         if (!ipeer) {
205                 ipeer = inet_connect(peer);
206                 if (!ipeer) {
207                         return -1;
208                 }
209                 peer->proto_priv = ipeer;
210                 
211         }
212
213         sent = 0;
214         ul_logdeb("send fd=%d\n", ipeer->socket);
215         do {
216                 ret = send(ipeer->socket, buf, len, 0);
217                 if (ret < 0) {
218                         ul_logerr("send error: %s\n", strerror(errno));
219                         return ret;
220                 }
221                 sent += ret;
222                 buf += ret;
223                 len -= ret;
224         } while (len > 0);
225
226         return sent;
227 }
228
229 /*----------------------------------------------------------------------
230  Portable function to set a socket into nonblocking mode.
231  Calling this on a socket causes all future read() and write() calls on
232  that socket to do only as much as they can immediately, and return 
233  without waiting.
234  If no data can be read or written, they return -1 and set errno
235  to EAGAIN (or EWOULDBLOCK).
236  Thanks to Bjorn Reese for this code.
237 ----------------------------------------------------------------------*/
238 int setnonblocking(int fd)
239 {
240     int flags;
241
242     /* If they have O_NONBLOCK, use the Posix way to do it */
243 #if defined(O_NONBLOCK)
244     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
245     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
246         flags = 0;
247     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
248 #else
249     /* Otherwise, use the old way of doing it */
250     flags = 1;
251     return ioctl(fd, FIOBIO, &flags);
252 #endif
253 }     
254
255 static int
256 inet_accept_connection(forb_port_t *port)
257 {
258         struct inet_port *p = port->desc.proto_priv;
259         int client;
260         struct sockaddr_in addr;
261         socklen_t addrlen = sizeof(addr);
262         struct epoll_event ev;
263         int ret;
264         forb_peer_t *peer;
265                                 
266         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
267         if (client < 0){
268                 //perror("accept");
269                 return -1;
270         }
271         ret = setnonblocking(client);
272         if (ret) {
273                 close(client);
274                 return -1;
275         }
276
277         peer = forb_peer_new();
278         if (peer) {
279                 struct inet_peer *ipeer;
280
281                 ipeer = forb_malloc(sizeof(*ipeer));
282                 if (ipeer) {
283                         ipeer->socket = client;
284                         peer->proto_priv = ipeer;
285                         peer->port = port;
286                         peer->state = FORB_PEER_DISCOVERED;
287                         inet_port_new_peer_insert(p, peer);
288                         //printf("New connection d=%d\n", client);
289                 } else {
290                         forb_peer_put(peer);
291                 }
292         }
293         
294         memset(&ev, 0, sizeof(ev));
295         ev.events = EPOLLIN | EPOLLET;
296         ev.data.fd = client;
297         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
298 }
299
300 static size_t
301 inet_recv(forb_port_t *port, void *buf, size_t len)
302 {
303         struct inet_port *iport = port->desc.proto_priv;
304 #if 1
305         struct epoll_event ev;
306         int ret, nfds;
307         forb_peer_t *peer;
308         bool exported_new_peer = false;
309         
310         for (;;) {
311                 if (iport->last_recv_fd == -1) {
312                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
313                         if (nfds < 1)
314                                 return -1;
315                         if (ev.data.fd == iport->listen_socket) {
316                                 ret = inet_accept_connection(port);
317                                 if (ret)
318                                         return -1;
319                                 else
320                                         continue;
321                         } else {
322                                 iport->last_recv_fd = ev.data.fd;
323                         }
324                 }
325                 /* Check for first reception form a just connected peer */
326                 ul_list_for_each(inet_port_new_peer, iport, peer) {
327                         struct inet_peer *ipeer = peer->proto_priv;
328                         //printf("checking new peer with fd=%d\n", ipeer->socket);
329                         if (ipeer->socket == iport->last_recv_fd) {
330                                 inet_port_new_peer_delete(iport, peer);
331
332                                 if (port->new_peer) forb_peer_put(peer);
333
334                                 /* Let the upper layer assign forb ID
335                                  * to this peer according to the request*/
336                                 port->new_peer = peer;
337                                 exported_new_peer = true;
338                                 break;
339                         }
340                 }
341
342                 //printf("recv fd=%d\n", iport->last_recv_fd);
343                 ret = recv(iport->last_recv_fd, buf, len, 0);
344                 if (ret == -1) {
345                         if (exported_new_peer) {
346                                 forb_peer_put(peer);
347                                 port->new_peer = NULL;
348                         }
349                         if (errno == EAGAIN) {
350                                 iport->last_recv_fd = -1;
351                                 continue;
352                         }
353                 }
354                 if (ret == 0) {
355                         if (exported_new_peer) {
356                                 forb_peer_put(peer);
357                                 port->new_peer = NULL;
358                         }
359                         close(iport->last_recv_fd);
360                         /* TODO: Notify FORB about peer disconnect */
361                         iport->last_recv_fd = -1;
362                         continue;
363                 }
364                 return ret;
365         }
366 #else
367         return recv(iport->udp_socket, buf, len, 0);    
368 #endif
369 }
370
371 static int
372 inet_port_destroy(forb_port_t * port)
373 {
374         struct inet_port *pd = port->desc.proto_priv;
375         close(pd->epoll_fd);
376         close(pd->udp_socket);
377         close(pd->listen_socket);
378         forb_free(pd);
379         return 0;
380 }
381
382 static size_t
383 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
384 {
385         struct inet_port *p = port->desc.proto_priv;
386         struct sockaddr_in addr;
387         int ret;
388         
389         addr.sin_family = AF_INET;
390         addr.sin_port = htons(MCAST_PORT);
391         addr.sin_addr = p->multicast_addr;
392         
393         ret = sendto(p->udp_socket, buf, len, 0,
394                      (struct sockaddr*)&addr, sizeof(addr));
395         return ret;
396 }
397
398 static void
399 inet_peer_destroy(forb_peer_t *peer)
400 {
401         struct inet_peer *ipeer = peer->proto_priv;
402         if (ipeer) {
403                 peer->proto_priv = NULL;
404                 close(ipeer->socket);
405                 free(ipeer);
406         }
407 }
408
409 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
410 {
411         const struct inet_addr *a = addr;
412         size_t ret = 0;
413         if (addr) {
414                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
415         }
416         return ret;
417 }
418
419
420 static const forb_proto_t proto_inet = {
421         .hello_interval = 40 /* seconds */,
422         .port_destroy = inet_port_destroy,
423         .peer_destroy = inet_peer_destroy,
424         .send = inet_send,
425         .recv = inet_recv,
426         .broadcast = inet_broadcast,
427         .serialize_addr = inet_serialize_addr,
428         .deserialize_addr = inet_deserialize_addr,
429         .addr2str = inet_addr2str,
430 };
431
432 #define MAX_INTERFACES 10
433 int get_local_address(int sock, struct in_addr *addr)
434 {
435         struct ifconf  ifc;
436         struct ifreq   *ifr, req[MAX_INTERFACES];
437         
438
439         ifc.ifc_len = sizeof(req);
440         ifc.ifc_req = req;
441
442         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
443                 return -1;
444         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
445                 struct sockaddr_in ia;
446                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
447                 ioctl(sock, SIOCGIFFLAGS, ifr);
448                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
449                         *addr = ia.sin_addr;
450                         return 0;
451                 }
452         }
453         return -1;
454 }
455
456 /** 
457  * Initializes INET protocol port.
458  * 
459  * @param port_desc Port description to initialize.
460  * @return Zero on success, -1 on error and errno is set
461  * appropriately.
462  */
463 int
464 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
465                     uint16_t port)
466 {
467         int ret;       
468         struct inet_port *port_priv;
469         struct sockaddr_in addr;
470         socklen_t len;
471         struct epoll_event ev;
472         
473         port_priv = forb_malloc(sizeof(*port_priv));
474         if (!port_priv)
475                 return -1;
476
477         memset(port_priv, 0, sizeof(*port_priv));
478         port_priv->last_recv_fd = -1;
479         inet_port_new_peer_init_head(port_priv);
480         
481         /* Initialize UDP multicast socket */
482         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
483         if (port_priv->udp_socket == -1) goto err_free;
484         
485         int yes = 1;
486         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
487         if (ret)
488                 goto err_close_udp;
489
490         int reuse = 1;
491         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
492         if (ret)
493                 goto err_close_udp;
494
495         setnonblocking(port_priv->udp_socket);
496
497         struct ip_mreq mreq;
498         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
499         mreq.imr_multiaddr = port_priv->multicast_addr;
500         mreq.imr_interface.s_addr = INADDR_ANY;
501         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
502                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
503         if (ret)
504                 goto err_close_udp;
505
506         addr.sin_family = AF_INET;
507         addr.sin_port = htons(MCAST_PORT);
508         addr.sin_addr = port_priv->multicast_addr;
509
510         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
511         if (ret != 0) goto err_close_udp;
512
513         char loop = 1;
514         unsigned loop_size = sizeof(loop);
515         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
516                          IP_MULTICAST_LOOP, &loop, loop_size);
517         if (ret)
518                 goto err_close_udp;
519         
520
521         /* Initialize TCP socket */
522         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
523         if (port_priv->listen_socket == -1) goto err_close_udp;
524
525         addr.sin_family = AF_INET;
526         addr.sin_port = htons(port);
527         addr.sin_addr = listen_on;
528         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
529         if (ret != 0) goto err_close_listen;
530         if (setnonblocking(port_priv->listen_socket))
531                 goto err_close_listen;
532
533         ret = listen(port_priv->listen_socket, 10);
534         if (ret)
535                 goto err_close_listen;
536
537         /* Determine our address and port*/
538         len = sizeof(addr);
539         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
540         if (ret) {
541                 ul_logerr("Non-loopback inet address not found\n");
542                 goto err_close_listen;
543         }
544
545         port_priv->addr.port = addr.sin_port;
546         if (listen_on.s_addr == INADDR_ANY)
547                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
548         else
549                 port_priv->addr.addr = listen_on;
550
551         /* Initialize epoll descriptor */
552         port_priv->epoll_fd = epoll_create(10);
553         if (port_priv->epoll_fd == -1)
554                 goto err_close_listen;
555
556         memset(&ev, 0, sizeof(ev));
557         ev.events = EPOLLIN | EPOLLET;
558         ev.data.fd = port_priv->listen_socket;
559         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
560                         port_priv->listen_socket, &ev);
561         if (ret)
562                 goto err_close_epoll;
563         
564         ev.events = EPOLLIN | EPOLLET;
565         ev.data.fd = port_priv->udp_socket;
566         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
567                         port_priv->udp_socket, &ev);
568         if (ret)
569                 goto err_close_epoll;
570
571         port_desc->proto = &proto_inet;
572         port_desc->proto_priv = port_priv;
573         port_desc->addr = &port_priv->addr;
574         return 0;
575 err_close_epoll:
576         close(port_priv->epoll_fd);
577 err_close_listen:
578         ret = errno;
579         close(port_priv->listen_socket);
580         errno = ret;
581 err_close_udp:
582         ret = errno;
583         close(port_priv->udp_socket);
584         errno = ret;
585 err_free:
586         ret = errno;
587         forb_free(port_priv);
588         errno = ret;
589         return -1;
590 }