1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
66 * @author Michal Sojka <sojkam1@fel.cvut.cz>
67 * @date Sun Oct 12 16:10:23 2008
69 * @brief FORB transport protocol based on INET family sockets.
71 * UDP is used for broadcasts and TCP for requests/replies. There
72 * exist two uni-drectional connections between any two communicating
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
78 #define MCAST_PORT 15514 /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
81 /** Address used by inet protocol. All values are stored in network
85 uint16_t port; /**< TCP listening port */
88 /** INET protocol data for ports. */
90 int udp_socket; /**< Socket for sending and receiving broadcasts */
91 int listen_socket; /* */
92 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
93 struct inet_addr addr; /**< Address of this port */
94 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
95 struct in_addr multicast_addr;
96 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100 struct inet_port, /* cust_head_t */
101 forb_peer_t, /* cust_item_t */
102 new_peers, /* cust_head_field */
103 lnode) /* cust_node_field */
106 /** INET protocol data associated with every peer */
108 int socket; /**< Connected socket to the peer */
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
117 const struct inet_addr *a = addr;
118 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119 CORBA_unsigned_short hport = ntohs(a->port);
121 ret = CORBA_unsigned_long_serialize(codec, &haddr);
124 return CORBA_unsigned_short_serialize(codec, &hport);
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
131 CORBA_unsigned_long s_addr;
132 CORBA_unsigned_short port;
135 a = forb_malloc(sizeof(*a));
138 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
141 ret = CORBA_unsigned_short_deserialize(codec, &port);
142 a->addr.s_addr = htonl(s_addr);
143 a->port = htons(port);
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
151 struct inet_peer *ipeer;
152 struct sockaddr_in sa;
153 struct inet_addr *addr = peer->addr;
157 ul_logerr("No address to connect\n");
160 ipeer = forb_malloc(sizeof(*ipeer));
163 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164 if (!ipeer->socket) {
165 ul_logerr("socket(): %s\n", strerror(errno));
168 sa.sin_family = AF_INET;
169 sa.sin_port = addr->port;
170 sa.sin_addr = addr->addr;
171 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
174 ul_logerr("connect error: %s\n", strerror(errno));
178 struct epoll_event ev;
179 struct inet_port *p = peer->port->desc.proto_priv;
180 memset(&ev, 0, sizeof(ev));
181 ev.events = EPOLLIN | EPOLLET;
182 ev.data.fd = ipeer->socket;
183 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
185 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
192 close(ipeer->socket);
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
203 struct inet_peer *ipeer = peer->proto_priv;
207 ipeer = inet_connect(peer);
211 peer->proto_priv = ipeer;
216 ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
218 ret = send(ipeer->socket, buf, len, 0);
220 ul_logerr("send error: %s\n", strerror(errno));
231 /*----------------------------------------------------------------------
232 Portable function to set a socket into nonblocking mode.
233 Calling this on a socket causes all future read() and write() calls on
234 that socket to do only as much as they can immediately, and return
236 If no data can be read or written, they return -1 and set errno
237 to EAGAIN (or EWOULDBLOCK).
238 Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
244 /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
249 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
251 /* Otherwise, use the old way of doing it */
253 return ioctl(fd, FIOBIO, &flags);
258 inet_accept_connection(forb_port_t *port)
260 struct inet_port *p = port->desc.proto_priv;
262 struct sockaddr_in addr;
263 socklen_t addrlen = sizeof(addr);
264 struct epoll_event ev;
268 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
273 ret = setnonblocking(client);
279 peer = forb_peer_new();
281 struct inet_peer *ipeer;
283 ipeer = forb_malloc(sizeof(*ipeer));
285 ipeer->socket = client;
286 peer->proto_priv = ipeer;
288 peer->state = FORB_PEER_DISCOVERED;
289 inet_port_new_peer_insert(p, peer);
290 //printf("New connection d=%d\n", client);
296 memset(&ev, 0, sizeof(ev));
297 ev.events = EPOLLIN | EPOLLET;
299 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
303 inet_recv(forb_port_t *port, void *buf, size_t len)
305 struct inet_port *iport = port->desc.proto_priv;
307 struct epoll_event ev;
311 bool exported_new_peer = false;
314 if (iport->last_recv_fd == -1) {
315 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
318 if (ev.data.fd == iport->listen_socket) {
319 ret = inet_accept_connection(port);
321 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
326 iport->last_recv_fd = ev.data.fd;
329 /* Check for first reception form a just connected peer */
330 ul_list_for_each(inet_port_new_peer, iport, peer) {
331 struct inet_peer *ipeer = peer->proto_priv;
332 //printf("checking new peer with fd=%d\n", ipeer->socket);
333 if (ipeer->socket == iport->last_recv_fd) {
334 inet_port_new_peer_delete(iport, peer);
336 if (port->new_peer) forb_peer_put(peer);
338 /* Let the upper layer assign forb ID
339 * to this peer according to the request*/
340 port->new_peer = peer;
341 exported_new_peer = true;
346 //printf("recv fd=%d\n", iport->last_recv_fd);
347 ret = recv(iport->last_recv_fd, buf, len, 0);
349 if (exported_new_peer) {
351 port->new_peer = NULL;
353 if (errno != EAGAIN) {
354 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
356 iport->last_recv_fd = -1;
360 if (exported_new_peer) {
362 port->new_peer = NULL;
364 ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
365 ul_list_for_each(forb_port_peer, port, peer) {
366 struct inet_peer *ipeer = peer->proto_priv;
367 if (ipeer && ipeer->socket == iport->last_recv_fd) {
368 forb_peer_disconnected(peer);
372 iport->last_recv_fd = -1;
375 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
379 return recv(iport->udp_socket, buf, len, 0);
384 inet_port_destroy(forb_port_t * port)
386 struct inet_port *pd = port->desc.proto_priv;
388 close(pd->udp_socket);
389 close(pd->listen_socket);
395 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
397 struct inet_port *p = port->desc.proto_priv;
398 struct sockaddr_in addr;
401 addr.sin_family = AF_INET;
402 addr.sin_port = htons(MCAST_PORT);
403 addr.sin_addr = p->multicast_addr;
405 ret = sendto(p->udp_socket, buf, len, 0,
406 (struct sockaddr*)&addr, sizeof(addr));
411 inet_peer_destroy(forb_peer_t *peer)
413 struct inet_peer *ipeer = peer->proto_priv;
415 peer->proto_priv = NULL;
416 ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
417 close(ipeer->socket);
422 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
424 const struct inet_addr *a = addr;
427 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
432 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
435 #include <fcb_contact_info.h>
438 static void inet_register_cb(forb_port_t *port)
440 struct inet_addr *ia;
442 ia = malloc(sizeof(*ia));
445 char *fcb_addr = getenv("FCB_ADDR");
446 if (!fcb_addr) fcb_addr = "127.0.0.1";
447 ia->addr.s_addr = inet_addr(fcb_addr);
448 ia->port = htons(FCB_TCP_PORT);
449 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
452 #define inet_register_cb NULL
455 static const forb_proto_t proto_inet = {
456 .hello_interval = 40 /* seconds */,
457 .port_destroy = inet_port_destroy,
458 .peer_destroy = inet_peer_destroy,
461 .broadcast = inet_broadcast,
462 .serialize_addr = inet_serialize_addr,
463 .deserialize_addr = inet_deserialize_addr,
464 .addr2str = inet_addr2str,
465 .register_cb = inet_register_cb,
468 #define MAX_INTERFACES 10
469 int get_local_address(int sock, struct in_addr *addr)
472 struct ifreq *ifr, req[MAX_INTERFACES];
475 ifc.ifc_len = sizeof(req);
478 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
480 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
481 struct sockaddr_in ia;
482 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
483 ioctl(sock, SIOCGIFFLAGS, ifr);
484 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
493 * Initializes INET protocol port.
495 * @param port_desc Port description to initialize.
496 * @return Zero on success, -1 on error and errno is set
500 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
504 struct inet_port *port_priv;
505 struct sockaddr_in addr;
507 struct epoll_event ev;
509 port_priv = forb_malloc(sizeof(*port_priv));
513 memset(port_priv, 0, sizeof(*port_priv));
514 port_priv->last_recv_fd = -1;
515 inet_port_new_peer_init_head(port_priv);
517 /* Initialize UDP multicast socket */
518 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
519 if (port_priv->udp_socket == -1) goto err_free;
522 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
527 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
531 setnonblocking(port_priv->udp_socket);
534 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
535 mreq.imr_multiaddr = port_priv->multicast_addr;
536 mreq.imr_interface.s_addr = INADDR_ANY;
537 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
538 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
542 addr.sin_family = AF_INET;
543 addr.sin_port = htons(MCAST_PORT);
544 addr.sin_addr = port_priv->multicast_addr;
546 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
547 if (ret != 0) goto err_close_udp;
550 unsigned loop_size = sizeof(loop);
551 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
552 IP_MULTICAST_LOOP, &loop, loop_size);
557 /* Initialize TCP socket */
558 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
559 if (port_priv->listen_socket == -1) goto err_close_udp;
562 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
564 goto err_close_listen;
566 addr.sin_family = AF_INET;
567 addr.sin_port = htons(port);
568 addr.sin_addr = listen_on;
569 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
570 if (ret != 0) goto err_close_listen;
571 if (setnonblocking(port_priv->listen_socket))
572 goto err_close_listen;
574 ret = listen(port_priv->listen_socket, 10);
576 goto err_close_listen;
578 /* Determine our address and port*/
580 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
582 ul_logerr("Non-loopback inet address not found\n");
583 goto err_close_listen;
586 port_priv->addr.port = addr.sin_port;
587 if (listen_on.s_addr == INADDR_ANY)
588 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
590 port_priv->addr.addr = listen_on;
592 /* Initialize epoll descriptor */
593 port_priv->epoll_fd = epoll_create(10);
594 if (port_priv->epoll_fd == -1)
595 goto err_close_listen;
597 memset(&ev, 0, sizeof(ev));
598 ev.events = EPOLLIN | EPOLLET;
599 ev.data.fd = port_priv->listen_socket;
600 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
601 port_priv->listen_socket, &ev);
603 goto err_close_epoll;
605 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
606 ev.events = EPOLLIN | EPOLLET;
607 ev.data.fd = port_priv->udp_socket;
608 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
609 port_priv->udp_socket, &ev);
611 goto err_close_epoll;
614 port_desc->proto = &proto_inet;
615 port_desc->proto_priv = port_priv;
616 port_desc->addr = &port_priv->addr;
619 close(port_priv->epoll_fd);
622 close(port_priv->listen_socket);
626 close(port_priv->udp_socket);
630 forb_free(port_priv);