]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
Make proto_inet usable also as local protocol
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61
62 /**
63  * @file   proto_inet.c
64  * @author Michal Sojka <sojkam1@fel.cvut.cz>
65  * @date   Sun Oct 12 16:10:23 2008
66  * 
67  * @brief  FORB transport protocol based on INET family sockets.
68  * 
69  * UDP is used for broadcasts and TCP for requests/replies. There
70  * exist two uni-drectional connections between any two communicating
71  * peers.
72  */
73
74 extern UL_LOG_CUST(ulogd_forb_proto_inet);
75
76 #define MCAST_PORT 15514        /**< Port used for multicasts */
77 #define MCAST_ADDR "225.15.5.14"
78
79 /** Address used by inet protocol. All values are stored in network
80  * byte order. */
81 struct inet_addr {
82         struct in_addr addr;
83         uint16_t port;          /**< TCP listening port */
84 };
85
86 /** INET protocol data for ports. */
87 struct inet_port {
88         int udp_socket;         /**< Socket for sending and receiving broadcasts */
89         int listen_socket;      /*  */
90         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
91         struct inet_addr addr;  /**< Address of this port */
92         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
93         struct in_addr multicast_addr;
94 };
95
96 /** INET protocol data associated with every peer */
97 struct inet_peer {
98         int socket;             /**< Connected socket to the peer */
99 };
100
101 /* static struct inet_port* */
102 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
103
104 static CORBA_boolean
105 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
106 {
107         const struct inet_addr *a = addr;
108         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
109         CORBA_unsigned_short hport = ntohs(a->port);
110         CORBA_boolean ret;
111         ret = CORBA_unsigned_long_serialize(codec, &haddr);
112         if (!ret)
113                 return ret;
114         return CORBA_unsigned_short_serialize(codec, &hport);
115 }
116
117 static CORBA_boolean
118 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
119 {
120         struct inet_addr *a;
121         CORBA_unsigned_long s_addr;
122         CORBA_unsigned_short port;
123         CORBA_boolean ret;
124
125         a = forb_malloc(sizeof(*a));
126         if (!a)
127                 return CORBA_FALSE;
128         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
129         if (!ret)
130                 return ret;
131         ret = CORBA_unsigned_short_deserialize(codec, &port);
132         a->addr.s_addr = htonl(s_addr);
133         a->port = htons(port);
134         *addr = a;
135         return ret;
136 }
137
138 static struct inet_peer *
139 inet_connect(forb_peer_t *peer)
140 {
141         struct inet_peer *ipeer;
142         struct sockaddr_in sa;
143         struct inet_addr *addr = peer->addr;
144         int ret;
145
146         if (!addr)
147                 return NULL;
148         ipeer = forb_malloc(sizeof(*ipeer));
149         if (!ipeer)
150                 goto err;
151         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
152         if (!ipeer->socket)
153                 goto err_free;
154         sa.sin_family = AF_INET;
155         sa.sin_port = addr->port;
156         sa.sin_addr = addr->addr;
157         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
158         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
159         if (ret)
160                 goto err_close;
161
162         return ipeer;
163 err_close:
164         close(ipeer->socket);
165 err_free:
166         forb_free(ipeer);
167 err:
168         return NULL;
169         
170 }
171
172 static size_t
173 inet_send(forb_peer_t *peer, const void *buf, size_t len)
174 {
175         struct inet_peer *ipeer = peer->proto_priv;
176
177         if (!ipeer) {
178                 ipeer = inet_connect(peer);
179                 if (!ipeer)
180                         return -1;
181                 peer->proto_priv = ipeer;
182                 
183         }
184         
185         return send(ipeer->socket, buf, len, 0);
186 }
187
188 /*----------------------------------------------------------------------
189  Portable function to set a socket into nonblocking mode.
190  Calling this on a socket causes all future read() and write() calls on
191  that socket to do only as much as they can immediately, and return 
192  without waiting.
193  If no data can be read or written, they return -1 and set errno
194  to EAGAIN (or EWOULDBLOCK).
195  Thanks to Bjorn Reese for this code.
196 ----------------------------------------------------------------------*/
197 int setnonblocking(int fd)
198 {
199     int flags;
200
201     /* If they have O_NONBLOCK, use the Posix way to do it */
202 #if defined(O_NONBLOCK)
203     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
204     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
205         flags = 0;
206     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
207 #else
208     /* Otherwise, use the old way of doing it */
209     flags = 1;
210     return ioctl(fd, FIOBIO, &flags);
211 #endif
212 }     
213
214 int inet_accept_connection(struct inet_port *p)
215 {
216         int client;
217         struct sockaddr_in addr;
218         socklen_t addrlen = sizeof(addr);
219         struct epoll_event ev;
220         int ret;
221                                 
222         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
223         if (client < 0){
224                 //perror("accept");
225                 return -1;
226         }
227         ret = setnonblocking(client);
228         if (ret) {
229                 close(client);
230                 return -1;
231         }
232         memset(&ev, 0, sizeof(ev));
233         ev.events = EPOLLIN | EPOLLET;
234         ev.data.fd = client;
235         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
236 }
237
238 static size_t
239 inet_recv(forb_port_t *port, void *buf, size_t len)
240 {
241         struct inet_port *iport = port->desc.proto_priv;
242 #if 1
243         struct epoll_event ev;
244         int ret, nfds;
245         for (;;) {
246                 if (iport->last_recv_fd == -1) {
247                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
248                         if (nfds < 1)
249                                 return -1;
250                         if (ev.data.fd == iport->listen_socket) {
251                                 ret = inet_accept_connection(iport);
252                                 if (ret)
253                                         return -1;
254                                 else
255                                         continue;
256                         } else {
257                                 iport->last_recv_fd = ev.data.fd;
258                         }
259                 }
260                 ret = recv(iport->last_recv_fd, buf, len, 0);
261                 if (ret == -1 && errno == EAGAIN) {
262                         iport->last_recv_fd = -1;
263                         continue;
264                 }
265                 if (ret == 0) {
266                         close(iport->last_recv_fd);
267                         /* TODO: Notify FORB about peer disconnect */
268                         iport->last_recv_fd = -1;
269                         continue;
270                 }
271                 return ret;
272         }
273 #else
274         return recv(iport->udp_socket, buf, len, 0);    
275 #endif
276 }
277
278 static int
279 inet_port_destroy(forb_port_t * port)
280 {
281         struct inet_port *pd = port->desc.proto_priv;
282         close(pd->epoll_fd);
283         close(pd->udp_socket);
284         close(pd->listen_socket);
285         forb_free(pd);
286         return 0;
287 }
288
289 static size_t
290 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
291 {
292         struct inet_port *p = port->desc.proto_priv;
293         struct sockaddr_in addr;
294         int ret;
295         
296         addr.sin_family = AF_INET;
297         addr.sin_port = htons(MCAST_PORT);
298         addr.sin_addr = p->multicast_addr;
299         
300         ret = sendto(p->udp_socket, buf, len, 0,
301                      (struct sockaddr*)&addr, sizeof(addr));
302         return ret;
303 }
304
305 static void
306 inet_peer_destroy(forb_peer_t *peer)
307 {
308         struct inet_peer *ipeer = peer->proto_priv;
309         if (ipeer) {
310                 peer->proto_priv = NULL;
311                 close(ipeer->socket);
312                 free(ipeer);
313         }
314 }
315
316 static const forb_proto_t proto_inet = {
317         .hello_interval = 40 /* seconds */,
318         .port_destroy = inet_port_destroy,
319         .peer_destroy = inet_peer_destroy,
320         .send = inet_send,
321         .recv = inet_recv,
322         .broadcast = inet_broadcast,
323         .serialize_addr = inet_serialize_addr,
324         .deserialize_addr = inet_deserialize_addr,
325 };
326
327 #define MAX_INTERFACES 10
328 int get_local_address(int sock, struct in_addr *addr)
329 {
330         struct ifconf  ifc;
331         struct ifreq   *ifr, req[MAX_INTERFACES];
332         
333
334         ifc.ifc_len = sizeof(req);
335         ifc.ifc_req = req;
336
337         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
338                 return -1;
339         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
340                 struct sockaddr_in ia;
341                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
342                 ioctl(sock, SIOCGIFFLAGS, ifr);
343                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
344                         *addr = ia.sin_addr;
345                         return 0;
346                 }
347         }
348         return -1;
349 }
350
351 /** 
352  * Initializes INET protocol port.
353  * 
354  * @param port_desc Port description to initialize.
355  * @return Zero on success, -1 on error and errno is set
356  * appropriately.
357  */
358 int
359 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
360 {
361         int ret;       
362         struct inet_port *port_priv;
363         struct sockaddr_in addr;
364         socklen_t len;
365         struct epoll_event ev;
366         
367         port_priv = forb_malloc(sizeof(*port_priv));
368         if (!port_priv)
369                 return -1;
370
371         memset(port_priv, 0, sizeof(*port_priv));
372         port_priv->last_recv_fd = -1;
373         
374         /* Initialize UDP multicast socket */
375         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
376         if (port_priv->udp_socket == -1) goto err_free;
377         
378         int yes = 1;
379         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
380         if (ret)
381                 goto err_close_udp;
382
383         int reuse = 1;
384         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
385         if (ret)
386                 goto err_close_udp;
387
388         setnonblocking(port_priv->udp_socket);
389
390         struct ip_mreq mreq;
391         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
392         mreq.imr_multiaddr = port_priv->multicast_addr;
393         mreq.imr_interface.s_addr = INADDR_ANY;
394         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
395                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
396         if (ret)
397                 goto err_close_udp;
398
399         addr.sin_family = AF_INET;
400         addr.sin_port = htons(MCAST_PORT);
401         addr.sin_addr = port_priv->multicast_addr;
402
403         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
404         if (ret != 0) goto err_close_udp;
405
406         char loop = 1;
407         unsigned loop_size = sizeof(loop);
408         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
409                          IP_MULTICAST_LOOP, &loop, loop_size);
410         if (ret)
411                 goto err_close_udp;
412         
413
414         /* Initialize TCP socket */
415         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
416         if (port_priv->listen_socket == -1) goto err_close_udp;
417
418         addr.sin_family = AF_INET;
419         addr.sin_port = htons(0); /* Random port */
420         addr.sin_addr = listen_on;
421         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
422         if (ret != 0) goto err_close_listen;
423         if (setnonblocking(port_priv->listen_socket))
424                 goto err_close_listen;
425
426         ret = listen(port_priv->listen_socket, 10);
427         if (ret)
428                 goto err_close_listen;
429
430         /* Determine our address and port*/
431         len = sizeof(addr);
432         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
433         if (ret) {
434                 ul_logerr("Non-loopback inet address not found\n");
435                 goto err_close_listen;
436         }
437
438         port_priv->addr.port = addr.sin_port;
439         if (listen_on.s_addr == INADDR_ANY)
440                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
441         else
442                 port_priv->addr.addr = listen_on;
443
444         /* Initialize epoll descriptor */
445         port_priv->epoll_fd = epoll_create(10);
446         if (port_priv->epoll_fd == -1)
447                 goto err_close_listen;
448
449         memset(&ev, 0, sizeof(ev));
450         ev.events = EPOLLIN | EPOLLET;
451         ev.data.fd = port_priv->listen_socket;
452         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
453                         port_priv->listen_socket, &ev);
454         if (ret)
455                 goto err_close_epoll;
456         
457         ev.events = EPOLLIN | EPOLLET;
458         ev.data.fd = port_priv->udp_socket;
459         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
460                         port_priv->udp_socket, &ev);
461         if (ret)
462                 goto err_close_epoll;
463
464         port_desc->proto = &proto_inet;
465         port_desc->proto_priv = port_priv;
466         port_desc->addr = &port_priv->addr;
467         return 0;
468 err_close_epoll:
469         close(port_priv->epoll_fd);
470 err_close_listen:
471         ret = errno;
472         close(port_priv->listen_socket);
473         errno = ret;
474 err_close_udp:
475         ret = errno;
476         close(port_priv->udp_socket);
477         errno = ret;
478 err_free:
479         ret = errno;
480         forb_free(port_priv);
481         errno = ret;
482         return -1;
483 }