1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
66 * @author Michal Sojka <sojkam1@fel.cvut.cz>
67 * @date Sun Oct 12 16:10:23 2008
69 * @brief FORB transport protocol based on INET family sockets.
71 * UDP is used for broadcasts and TCP for requests/replies. There
72 * exist two uni-drectional connections between any two communicating
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
78 #define MCAST_PORT 15514 /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
81 /** Address used by inet protocol. All values are stored in network
85 uint16_t port; /**< TCP listening port */
88 /** INET protocol data for ports. */
90 int udp_socket; /**< Socket for sending and receiving broadcasts */
91 int listen_socket; /* */
92 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
93 struct inet_addr addr; /**< Address of this port */
94 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
95 struct in_addr multicast_addr;
96 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100 struct inet_port, /* cust_head_t */
101 forb_peer_t, /* cust_item_t */
102 new_peers, /* cust_head_field */
103 lnode) /* cust_node_field */
106 /** INET protocol data associated with every peer */
108 int socket; /**< Connected socket to the peer */
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
117 const struct inet_addr *a = addr;
118 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119 CORBA_unsigned_short hport = ntohs(a->port);
121 ret = CORBA_unsigned_long_serialize(codec, &haddr);
124 return CORBA_unsigned_short_serialize(codec, &hport);
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
131 CORBA_unsigned_long s_addr;
132 CORBA_unsigned_short port;
135 a = forb_malloc(sizeof(*a));
138 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
141 ret = CORBA_unsigned_short_deserialize(codec, &port);
142 a->addr.s_addr = htonl(s_addr);
143 a->port = htons(port);
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
151 struct inet_peer *ipeer;
152 struct sockaddr_in sa;
153 struct inet_addr *addr = peer->addr;
157 ul_logerr("No address to connect\n");
160 ipeer = forb_malloc(sizeof(*ipeer));
163 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164 if (!ipeer->socket) {
165 ul_logerr("socket(): %s\n", strerror(errno));
168 sa.sin_family = AF_INET;
169 sa.sin_port = addr->port;
170 sa.sin_addr = addr->addr;
171 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
174 ul_logerr("connect error: %s\n", strerror(errno));
178 struct epoll_event ev;
179 struct inet_port *p = peer->port->desc.proto_priv;
180 memset(&ev, 0, sizeof(ev));
181 ev.events = EPOLLIN | EPOLLET;
182 ev.data.fd = ipeer->socket;
183 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
185 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
192 close(ipeer->socket);
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
203 struct inet_peer *ipeer = peer->proto_priv;
207 ipeer = inet_connect(peer);
211 peer->proto_priv = ipeer;
216 ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
218 ret = send(ipeer->socket, buf, len, 0);
220 ul_logerr("send error: %s\n", strerror(errno));
231 /*----------------------------------------------------------------------
232 Portable function to set a socket into nonblocking mode.
233 Calling this on a socket causes all future read() and write() calls on
234 that socket to do only as much as they can immediately, and return
236 If no data can be read or written, they return -1 and set errno
237 to EAGAIN (or EWOULDBLOCK).
238 Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
244 /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
249 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
251 /* Otherwise, use the old way of doing it */
253 return ioctl(fd, FIOBIO, &flags);
258 inet_accept_connection(forb_port_t *port)
260 struct inet_port *p = port->desc.proto_priv;
262 struct sockaddr_in addr;
263 socklen_t addrlen = sizeof(addr);
264 struct epoll_event ev;
268 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
273 ret = setnonblocking(client);
279 peer = forb_peer_new();
281 struct inet_peer *ipeer;
283 ipeer = forb_malloc(sizeof(*ipeer));
285 ipeer->socket = client;
286 peer->proto_priv = ipeer;
288 peer->state = FORB_PEER_DISCOVERED;
289 inet_port_new_peer_insert(p, peer);
290 //printf("New connection d=%d\n", client);
296 memset(&ev, 0, sizeof(ev));
297 ev.events = EPOLLIN | EPOLLET;
299 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
303 inet_recv(forb_port_t *port, void *buf, size_t len)
305 struct inet_port *iport = port->desc.proto_priv;
307 struct epoll_event ev;
311 bool exported_new_peer = false;
314 if (iport->last_recv_fd == -1) {
315 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
316 if (nfds == -1 && errno == EINTR)
320 if (ev.data.fd == iport->listen_socket) {
321 ret = inet_accept_connection(port);
323 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
328 iport->last_recv_fd = ev.data.fd;
331 /* Check for first reception form a just connected peer */
332 ul_list_for_each(inet_port_new_peer, iport, peer) {
333 struct inet_peer *ipeer = peer->proto_priv;
334 //printf("checking new peer with fd=%d\n", ipeer->socket);
335 if (ipeer->socket == iport->last_recv_fd) {
336 inet_port_new_peer_delete(iport, peer);
338 if (port->new_peer) forb_peer_put(peer);
340 /* Let the upper layer assign forb ID
341 * to this peer according to the request*/
342 port->new_peer = peer;
343 exported_new_peer = true;
348 //printf("recv fd=%d\n", iport->last_recv_fd);
349 ret = recv(iport->last_recv_fd, buf, len, 0);
351 if (exported_new_peer) {
353 port->new_peer = NULL;
355 if (errno != EAGAIN) {
356 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
358 iport->last_recv_fd = -1;
362 if (exported_new_peer) {
364 port->new_peer = NULL;
366 ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
367 ul_list_for_each(forb_port_peer, port, peer) {
368 struct inet_peer *ipeer = peer->proto_priv;
369 if (ipeer && ipeer->socket == iport->last_recv_fd) {
370 forb_peer_disconnected(peer);
374 iport->last_recv_fd = -1;
377 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
381 return recv(iport->udp_socket, buf, len, 0);
386 inet_port_destroy(forb_port_t * port)
388 struct inet_port *pd = port->desc.proto_priv;
390 close(pd->udp_socket);
391 close(pd->listen_socket);
397 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
399 struct inet_port *p = port->desc.proto_priv;
400 struct sockaddr_in addr;
403 addr.sin_family = AF_INET;
404 addr.sin_port = htons(MCAST_PORT);
405 addr.sin_addr = p->multicast_addr;
407 ret = sendto(p->udp_socket, buf, len, 0,
408 (struct sockaddr*)&addr, sizeof(addr));
413 inet_peer_destroy(forb_peer_t *peer)
415 struct inet_peer *ipeer = peer->proto_priv;
417 peer->proto_priv = NULL;
418 ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
419 close(ipeer->socket);
424 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
426 const struct inet_addr *a = addr;
429 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
434 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
437 #include <fcb_contact_info.h>
440 static void inet_register_cb(forb_port_t *port)
442 struct inet_addr *ia;
444 ia = malloc(sizeof(*ia));
447 char *fcb_addr = getenv("FCB_ADDR");
448 if (!fcb_addr) fcb_addr = "127.0.0.1";
449 ia->addr.s_addr = inet_addr(fcb_addr);
450 ia->port = htons(FCB_TCP_PORT);
451 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
454 #define inet_register_cb NULL
457 static const forb_proto_t proto_inet = {
458 .hello_interval = 40 /* seconds */,
459 .port_destroy = inet_port_destroy,
460 .peer_destroy = inet_peer_destroy,
463 .broadcast = inet_broadcast,
464 .serialize_addr = inet_serialize_addr,
465 .deserialize_addr = inet_deserialize_addr,
466 .addr2str = inet_addr2str,
467 .register_cb = inet_register_cb,
470 #define MAX_INTERFACES 10
471 int get_local_address(int sock, struct in_addr *addr)
474 struct ifreq *ifr, req[MAX_INTERFACES];
477 ifc.ifc_len = sizeof(req);
480 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
482 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
483 struct sockaddr_in ia;
484 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
485 ioctl(sock, SIOCGIFFLAGS, ifr);
486 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
495 * Initializes INET protocol port.
497 * @param port_desc Port description to initialize.
498 * @return Zero on success, -1 on error and errno is set
502 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
506 struct inet_port *port_priv;
507 struct sockaddr_in addr;
509 struct epoll_event ev;
511 port_priv = forb_malloc(sizeof(*port_priv));
515 memset(port_priv, 0, sizeof(*port_priv));
516 port_priv->last_recv_fd = -1;
517 inet_port_new_peer_init_head(port_priv);
519 /* Initialize UDP multicast socket */
520 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
521 if (port_priv->udp_socket == -1) goto err_free;
524 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
529 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
533 setnonblocking(port_priv->udp_socket);
536 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
537 mreq.imr_multiaddr = port_priv->multicast_addr;
538 mreq.imr_interface.s_addr = INADDR_ANY;
539 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
540 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
544 addr.sin_family = AF_INET;
545 addr.sin_port = htons(MCAST_PORT);
546 addr.sin_addr = port_priv->multicast_addr;
548 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
549 if (ret != 0) goto err_close_udp;
552 unsigned loop_size = sizeof(loop);
553 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
554 IP_MULTICAST_LOOP, &loop, loop_size);
559 /* Initialize TCP socket */
560 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
561 if (port_priv->listen_socket == -1) goto err_close_udp;
564 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
566 goto err_close_listen;
568 addr.sin_family = AF_INET;
569 addr.sin_port = htons(port);
570 addr.sin_addr = listen_on;
571 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
572 if (ret != 0) goto err_close_listen;
573 if (setnonblocking(port_priv->listen_socket))
574 goto err_close_listen;
576 ret = listen(port_priv->listen_socket, 10);
578 goto err_close_listen;
580 /* Determine our address and port*/
582 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
584 ul_logerr("Non-loopback inet address not found\n");
585 goto err_close_listen;
588 port_priv->addr.port = addr.sin_port;
589 if (listen_on.s_addr == INADDR_ANY)
590 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
592 port_priv->addr.addr = listen_on;
594 /* Initialize epoll descriptor */
595 port_priv->epoll_fd = epoll_create(10);
596 if (port_priv->epoll_fd == -1)
597 goto err_close_listen;
599 memset(&ev, 0, sizeof(ev));
600 ev.events = EPOLLIN | EPOLLET;
601 ev.data.fd = port_priv->listen_socket;
602 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
603 port_priv->listen_socket, &ev);
605 goto err_close_epoll;
607 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
608 ev.events = EPOLLIN | EPOLLET;
609 ev.data.fd = port_priv->udp_socket;
610 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
611 port_priv->udp_socket, &ev);
613 goto err_close_epoll;
616 port_desc->proto = &proto_inet;
617 port_desc->proto_priv = port_priv;
618 port_desc->addr = &port_priv->addr;
621 close(port_priv->epoll_fd);
624 close(port_priv->listen_socket);
628 close(port_priv->udp_socket);
632 forb_free(port_priv);