]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/proto_inet.c
proto_inet: send wraped in loop to handle interrupted syscalls
[frescor/forb.git] / src / proto_inet.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "proto.h"
48 #include <arpa/inet.h>
49 #include <dirent.h>
50 #include <fcntl.h>
51 #include <forb/proto_inet.h>
52 #include <net/if.h>
53 #include <netinet/in.h>
54 #include <stdio.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
59 #include <ul_log.h>
60 #include <unistd.h>
61
62 /**
63  * @file   proto_inet.c
64  * @author Michal Sojka <sojkam1@fel.cvut.cz>
65  * @date   Sun Oct 12 16:10:23 2008
66  * 
67  * @brief  FORB transport protocol based on INET family sockets.
68  * 
69  * UDP is used for broadcasts and TCP for requests/replies. There
70  * exist two uni-drectional connections between any two communicating
71  * peers.
72  */
73
74 extern UL_LOG_CUST(ulogd_forb_proto_inet);
75
76 #define MCAST_PORT 15514        /**< Port used for multicasts */
77 #define MCAST_ADDR "225.15.5.14"
78
79 /** Address used by inet protocol. All values are stored in network
80  * byte order. */
81 struct inet_addr {
82         struct in_addr addr;
83         uint16_t port;          /**< TCP listening port */
84 };
85
86 /** INET protocol data for ports. */
87 struct inet_port {
88         int udp_socket;         /**< Socket for sending and receiving broadcasts */
89         int listen_socket;      /*  */
90         int epoll_fd;           /**< File descriptor used by epoll() in inet_receive(). */
91         struct inet_addr addr;  /**< Address of this port */
92         int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
93         struct in_addr multicast_addr;
94 };
95
96 /** INET protocol data associated with every peer */
97 struct inet_peer {
98         int socket;             /**< Connected socket to the peer */
99 };
100
101 /* static struct inet_port* */
102 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
103
104 static CORBA_boolean
105 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
106 {
107         const struct inet_addr *a = addr;
108         CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
109         CORBA_unsigned_short hport = ntohs(a->port);
110         CORBA_boolean ret;
111         ret = CORBA_unsigned_long_serialize(codec, &haddr);
112         if (!ret)
113                 return ret;
114         return CORBA_unsigned_short_serialize(codec, &hport);
115 }
116
117 static CORBA_boolean
118 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
119 {
120         struct inet_addr *a;
121         CORBA_unsigned_long s_addr;
122         CORBA_unsigned_short port;
123         CORBA_boolean ret;
124
125         a = forb_malloc(sizeof(*a));
126         if (!a)
127                 return CORBA_FALSE;
128         ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
129         if (!ret)
130                 return ret;
131         ret = CORBA_unsigned_short_deserialize(codec, &port);
132         a->addr.s_addr = htonl(s_addr);
133         a->port = htons(port);
134         *addr = a;
135         return ret;
136 }
137
138 static struct inet_peer *
139 inet_connect(forb_peer_t *peer)
140 {
141         struct inet_peer *ipeer;
142         struct sockaddr_in sa;
143         struct inet_addr *addr = peer->addr;
144         int ret;
145
146         if (!addr)
147                 return NULL;
148         ipeer = forb_malloc(sizeof(*ipeer));
149         if (!ipeer)
150                 goto err;
151         ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
152         if (!ipeer->socket)
153                 goto err_free;
154         sa.sin_family = AF_INET;
155         sa.sin_port = addr->port;
156         sa.sin_addr = addr->addr;
157         ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
158         ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
159         if (ret)
160                 goto err_close;
161
162         return ipeer;
163 err_close:
164         close(ipeer->socket);
165 err_free:
166         forb_free(ipeer);
167 err:
168         return NULL;
169         
170 }
171
172 static size_t
173 inet_send(forb_peer_t *peer, const void *buf, size_t len)
174 {
175         struct inet_peer *ipeer = peer->proto_priv;
176         size_t ret, sent;
177
178         if (!ipeer) {
179                 ipeer = inet_connect(peer);
180                 if (!ipeer)
181                         return -1;
182                 peer->proto_priv = ipeer;
183                 
184         }
185
186         sent = 0;
187         do {
188                 ret = send(ipeer->socket, buf, len, 0);
189                 if (ret < 0)
190                         return ret;
191                 sent += ret;
192                 buf += ret;
193                 len -= ret;
194         } while (len > 0);
195
196         return sent;
197 }
198
199 /*----------------------------------------------------------------------
200  Portable function to set a socket into nonblocking mode.
201  Calling this on a socket causes all future read() and write() calls on
202  that socket to do only as much as they can immediately, and return 
203  without waiting.
204  If no data can be read or written, they return -1 and set errno
205  to EAGAIN (or EWOULDBLOCK).
206  Thanks to Bjorn Reese for this code.
207 ----------------------------------------------------------------------*/
208 int setnonblocking(int fd)
209 {
210     int flags;
211
212     /* If they have O_NONBLOCK, use the Posix way to do it */
213 #if defined(O_NONBLOCK)
214     /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
215     if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
216         flags = 0;
217     return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
218 #else
219     /* Otherwise, use the old way of doing it */
220     flags = 1;
221     return ioctl(fd, FIOBIO, &flags);
222 #endif
223 }     
224
225 int inet_accept_connection(struct inet_port *p)
226 {
227         int client;
228         struct sockaddr_in addr;
229         socklen_t addrlen = sizeof(addr);
230         struct epoll_event ev;
231         int ret;
232                                 
233         client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
234         if (client < 0){
235                 //perror("accept");
236                 return -1;
237         }
238         ret = setnonblocking(client);
239         if (ret) {
240                 close(client);
241                 return -1;
242         }
243         memset(&ev, 0, sizeof(ev));
244         ev.events = EPOLLIN | EPOLLET;
245         ev.data.fd = client;
246         return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
247 }
248
249 static size_t
250 inet_recv(forb_port_t *port, void *buf, size_t len)
251 {
252         struct inet_port *iport = port->desc.proto_priv;
253 #if 1
254         struct epoll_event ev;
255         int ret, nfds;
256         for (;;) {
257                 if (iport->last_recv_fd == -1) {
258                         nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
259                         if (nfds < 1)
260                                 return -1;
261                         if (ev.data.fd == iport->listen_socket) {
262                                 ret = inet_accept_connection(iport);
263                                 if (ret)
264                                         return -1;
265                                 else
266                                         continue;
267                         } else {
268                                 iport->last_recv_fd = ev.data.fd;
269                         }
270                 }
271                 ret = recv(iport->last_recv_fd, buf, len, 0);
272                 if (ret == -1 && errno == EAGAIN) {
273                         iport->last_recv_fd = -1;
274                         continue;
275                 }
276                 if (ret == 0) {
277                         close(iport->last_recv_fd);
278                         /* TODO: Notify FORB about peer disconnect */
279                         iport->last_recv_fd = -1;
280                         continue;
281                 }
282                 return ret;
283         }
284 #else
285         return recv(iport->udp_socket, buf, len, 0);    
286 #endif
287 }
288
289 static int
290 inet_port_destroy(forb_port_t * port)
291 {
292         struct inet_port *pd = port->desc.proto_priv;
293         close(pd->epoll_fd);
294         close(pd->udp_socket);
295         close(pd->listen_socket);
296         forb_free(pd);
297         return 0;
298 }
299
300 static size_t
301 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
302 {
303         struct inet_port *p = port->desc.proto_priv;
304         struct sockaddr_in addr;
305         int ret;
306         
307         addr.sin_family = AF_INET;
308         addr.sin_port = htons(MCAST_PORT);
309         addr.sin_addr = p->multicast_addr;
310         
311         ret = sendto(p->udp_socket, buf, len, 0,
312                      (struct sockaddr*)&addr, sizeof(addr));
313         return ret;
314 }
315
316 static void
317 inet_peer_destroy(forb_peer_t *peer)
318 {
319         struct inet_peer *ipeer = peer->proto_priv;
320         if (ipeer) {
321                 peer->proto_priv = NULL;
322                 close(ipeer->socket);
323                 free(ipeer);
324         }
325 }
326
327 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
328 {
329         const struct inet_addr *a = addr;
330         size_t ret = 0;
331         if (addr) {
332                 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
333         }
334         return ret;
335 }
336
337
338 static const forb_proto_t proto_inet = {
339         .hello_interval = 40 /* seconds */,
340         .port_destroy = inet_port_destroy,
341         .peer_destroy = inet_peer_destroy,
342         .send = inet_send,
343         .recv = inet_recv,
344         .broadcast = inet_broadcast,
345         .serialize_addr = inet_serialize_addr,
346         .deserialize_addr = inet_deserialize_addr,
347         .addr2str = inet_addr2str,
348 };
349
350 #define MAX_INTERFACES 10
351 int get_local_address(int sock, struct in_addr *addr)
352 {
353         struct ifconf  ifc;
354         struct ifreq   *ifr, req[MAX_INTERFACES];
355         
356
357         ifc.ifc_len = sizeof(req);
358         ifc.ifc_req = req;
359
360         if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
361                 return -1;
362         for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
363                 struct sockaddr_in ia;
364                 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
365                 ioctl(sock, SIOCGIFFLAGS, ifr);
366                 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
367                         *addr = ia.sin_addr;
368                         return 0;
369                 }
370         }
371         return -1;
372 }
373
374 /** 
375  * Initializes INET protocol port.
376  * 
377  * @param port_desc Port description to initialize.
378  * @return Zero on success, -1 on error and errno is set
379  * appropriately.
380  */
381 int
382 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
383 {
384         int ret;       
385         struct inet_port *port_priv;
386         struct sockaddr_in addr;
387         socklen_t len;
388         struct epoll_event ev;
389         
390         port_priv = forb_malloc(sizeof(*port_priv));
391         if (!port_priv)
392                 return -1;
393
394         memset(port_priv, 0, sizeof(*port_priv));
395         port_priv->last_recv_fd = -1;
396         
397         /* Initialize UDP multicast socket */
398         port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
399         if (port_priv->udp_socket == -1) goto err_free;
400         
401         int yes = 1;
402         ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
403         if (ret)
404                 goto err_close_udp;
405
406         int reuse = 1;
407         setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
408         if (ret)
409                 goto err_close_udp;
410
411         setnonblocking(port_priv->udp_socket);
412
413         struct ip_mreq mreq;
414         inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
415         mreq.imr_multiaddr = port_priv->multicast_addr;
416         mreq.imr_interface.s_addr = INADDR_ANY;
417         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
418                          IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
419         if (ret)
420                 goto err_close_udp;
421
422         addr.sin_family = AF_INET;
423         addr.sin_port = htons(MCAST_PORT);
424         addr.sin_addr = port_priv->multicast_addr;
425
426         ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
427         if (ret != 0) goto err_close_udp;
428
429         char loop = 1;
430         unsigned loop_size = sizeof(loop);
431         ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
432                          IP_MULTICAST_LOOP, &loop, loop_size);
433         if (ret)
434                 goto err_close_udp;
435         
436
437         /* Initialize TCP socket */
438         port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
439         if (port_priv->listen_socket == -1) goto err_close_udp;
440
441         addr.sin_family = AF_INET;
442         addr.sin_port = htons(0); /* Random port */
443         addr.sin_addr = listen_on;
444         ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
445         if (ret != 0) goto err_close_listen;
446         if (setnonblocking(port_priv->listen_socket))
447                 goto err_close_listen;
448
449         ret = listen(port_priv->listen_socket, 10);
450         if (ret)
451                 goto err_close_listen;
452
453         /* Determine our address and port*/
454         len = sizeof(addr);
455         ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
456         if (ret) {
457                 ul_logerr("Non-loopback inet address not found\n");
458                 goto err_close_listen;
459         }
460
461         port_priv->addr.port = addr.sin_port;
462         if (listen_on.s_addr == INADDR_ANY)
463                 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
464         else
465                 port_priv->addr.addr = listen_on;
466
467         /* Initialize epoll descriptor */
468         port_priv->epoll_fd = epoll_create(10);
469         if (port_priv->epoll_fd == -1)
470                 goto err_close_listen;
471
472         memset(&ev, 0, sizeof(ev));
473         ev.events = EPOLLIN | EPOLLET;
474         ev.data.fd = port_priv->listen_socket;
475         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
476                         port_priv->listen_socket, &ev);
477         if (ret)
478                 goto err_close_epoll;
479         
480         ev.events = EPOLLIN | EPOLLET;
481         ev.data.fd = port_priv->udp_socket;
482         ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
483                         port_priv->udp_socket, &ev);
484         if (ret)
485                 goto err_close_epoll;
486
487         port_desc->proto = &proto_inet;
488         port_desc->proto_priv = port_priv;
489         port_desc->addr = &port_priv->addr;
490         return 0;
491 err_close_epoll:
492         close(port_priv->epoll_fd);
493 err_close_listen:
494         ret = errno;
495         close(port_priv->listen_socket);
496         errno = ret;
497 err_close_udp:
498         ret = errno;
499         close(port_priv->udp_socket);
500         errno = ret;
501 err_free:
502         ret = errno;
503         forb_free(port_priv);
504         errno = ret;
505         return -1;
506 }