1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
64 #include <netinet/tcp.h>
65 #include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
69 * @author Michal Sojka <sojkam1@fel.cvut.cz>
70 * @date Sun Oct 12 16:10:23 2008
72 * @brief FORB transport protocol based on INET family sockets.
74 * UDP is used for broadcasts and TCP for requests/replies. There
75 * exist two uni-drectional connections between any two communicating
79 extern UL_LOG_CUST(ulogd_forb_proto_inet);
81 #define MCAST_PORT 15514 /**< Port used for multicasts */
82 #define MCAST_ADDR "225.15.5.14"
84 /** Address used by inet protocol. All values are stored in network
88 uint16_t port; /**< TCP listening port */
91 /** INET protocol data for ports. */
93 int udp_socket; /**< Socket for sending and receiving broadcasts */
94 int listen_socket; /* */
95 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
96 struct inet_addr addr; /**< Address of this port */
97 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
98 struct in_addr multicast_addr;
99 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
102 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
103 struct inet_port, /* cust_head_t */
104 forb_peer_t, /* cust_item_t */
105 new_peers, /* cust_head_field */
106 lnode) /* cust_node_field */
109 /** INET protocol data associated with every peer */
111 int socket; /**< Connected socket to the peer */
114 /* static struct inet_port* */
115 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
118 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
120 const struct inet_addr *a = addr;
121 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
122 CORBA_unsigned_short hport = ntohs(a->port);
124 ret = CORBA_unsigned_long_serialize(codec, &haddr);
127 return CORBA_unsigned_short_serialize(codec, &hport);
131 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
134 CORBA_unsigned_long s_addr;
135 CORBA_unsigned_short port;
138 a = forb_malloc(sizeof(*a));
141 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
144 ret = CORBA_unsigned_short_deserialize(codec, &port);
145 a->addr.s_addr = htonl(s_addr);
146 a->port = htons(port);
152 setnonblocking(int fd);
158 #if 0 /* For nice graphs in benchmarks */
160 ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
162 ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
170 inet_connect(forb_peer_t *peer)
172 struct inet_peer *ipeer;
173 struct sockaddr_in sa;
174 struct inet_addr *addr = peer->addr;
178 ul_logerr("No address to connect\n");
181 ipeer = forb_malloc(sizeof(*ipeer));
184 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
185 if (!ipeer->socket) {
186 ul_logerr("socket(): %s\n", strerror(errno));
189 sa.sin_family = AF_INET;
190 sa.sin_port = addr->port;
191 sa.sin_addr = addr->addr;
192 ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
193 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
195 ul_logerr("connect error: %s\n", strerror(errno));
199 setnonblocking(ipeer->socket);
200 setnodelay(ipeer->socket);
202 struct epoll_event ev;
203 struct inet_port *p = peer->port->desc.proto_priv;
204 memset(&ev, 0, sizeof(ev));
205 ev.events = EPOLLIN | EPOLLET;
206 ev.data.fd = ipeer->socket;
207 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
209 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
213 peer->proto_priv = ipeer;
215 #ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
216 ret = forb_iop_send_hello_to(peer);
224 close(ipeer->socket);
233 inet_send(forb_peer_t *peer, const void *buf, size_t len)
235 struct inet_peer *ipeer = peer->proto_priv;
239 ret = inet_connect(peer);
243 ipeer = peer->proto_priv;
247 ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
249 ret = send(ipeer->socket, buf, len, 0);
251 ul_logerr("send error: %s\n", strerror(errno));
262 /*----------------------------------------------------------------------
263 Portable function to set a socket into nonblocking mode.
264 Calling this on a socket causes all future read() and write() calls on
265 that socket to do only as much as they can immediately, and return
267 If no data can be read or written, they return -1 and set errno
268 to EAGAIN (or EWOULDBLOCK).
269 Thanks to Bjorn Reese for this code.
270 ----------------------------------------------------------------------*/
271 int setnonblocking(int fd)
275 /* If they have O_NONBLOCK, use the Posix way to do it */
276 #if defined(O_NONBLOCK)
277 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
278 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
280 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
282 /* Otherwise, use the old way of doing it */
284 return ioctl(fd, FIOBIO, &flags);
289 inet_accept_connection(forb_port_t *port)
291 struct inet_port *p = port->desc.proto_priv;
293 struct sockaddr_in addr;
294 socklen_t addrlen = sizeof(addr);
295 struct epoll_event ev;
299 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
304 ret = setnonblocking(client);
311 peer = forb_peer_new();
313 struct inet_peer *ipeer;
315 ipeer = forb_malloc(sizeof(*ipeer));
317 ipeer->socket = client;
318 peer->proto_priv = ipeer;
320 peer->state = FORB_PEER_DISCOVERED;
321 inet_port_new_peer_insert(p, peer);
327 memset(&ev, 0, sizeof(ev));
328 ev.events = EPOLLIN | EPOLLET;
330 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
334 inet_recv(forb_port_t *port, void *buf, size_t len)
336 struct inet_port *iport = port->desc.proto_priv;
338 struct epoll_event ev;
342 bool exported_new_peer = false;
345 if (iport->last_recv_fd == -1) {
346 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
347 if (nfds == -1 && errno == EINTR)
351 if (ev.data.fd == iport->listen_socket) {
352 ret = inet_accept_connection(port);
354 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
359 iport->last_recv_fd = ev.data.fd;
362 /* Check for first reception form a just connected peer */
363 ul_list_for_each(inet_port_new_peer, iport, peer) {
364 struct inet_peer *ipeer = peer->proto_priv;
365 //printf("checking new peer with fd=%d\n", ipeer->socket);
366 if (ipeer->socket == iport->last_recv_fd) {
367 inet_port_new_peer_delete(iport, peer);
369 if (port->new_peer) forb_peer_put(peer);
371 /* Let the upper layer assign forb ID
372 * to this peer according to the request*/
373 port->new_peer = peer;
374 exported_new_peer = true;
379 //printf("recv fd=%d\n", iport->last_recv_fd);
380 ret = recv(iport->last_recv_fd, buf, len, 0);
382 if (exported_new_peer) {
384 port->new_peer = NULL;
386 if (errno != EAGAIN) {
387 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
389 iport->last_recv_fd = -1;
393 if (exported_new_peer) {
395 port->new_peer = NULL;
397 ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
398 ul_list_for_each(forb_port_peer, port, peer) {
399 struct inet_peer *ipeer = peer->proto_priv;
400 if (ipeer && ipeer->socket == iport->last_recv_fd) {
401 forb_peer_disconnected(peer);
405 iport->last_recv_fd = -1;
408 ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
412 return recv(iport->udp_socket, buf, len, 0);
417 inet_port_destroy(forb_port_t * port)
419 struct inet_port *pd = port->desc.proto_priv;
421 close(pd->udp_socket);
422 close(pd->listen_socket);
428 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
430 struct inet_port *p = port->desc.proto_priv;
431 struct sockaddr_in addr;
434 addr.sin_family = AF_INET;
435 addr.sin_port = htons(MCAST_PORT);
436 addr.sin_addr = p->multicast_addr;
438 ret = sendto(p->udp_socket, buf, len, 0,
439 (struct sockaddr*)&addr, sizeof(addr));
444 inet_peer_destroy(forb_peer_t *peer)
446 struct inet_peer *ipeer = peer->proto_priv;
448 peer->proto_priv = NULL;
449 ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
450 ipeer->socket, peer->orb_id);
451 close(ipeer->socket);
456 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
458 const struct inet_addr *a = addr;
461 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
466 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
469 #include <fcb_contact_info.h>
471 static void inet_register_cb(forb_port_t *port)
473 struct inet_addr *ia;
475 ia = malloc(sizeof(*ia));
478 char *fcb_addr = getenv("FCB_ADDR");
479 if (!fcb_addr) fcb_addr = "127.0.0.1";
480 ia->addr.s_addr = inet_addr(fcb_addr);
481 ia->port = htons(FCB_TCP_PORT);
482 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
485 #define inet_register_cb NULL
488 static const forb_proto_t proto_inet = {
489 .hello_interval = 40 /* seconds */,
490 .port_destroy = inet_port_destroy,
491 .peer_destroy = inet_peer_destroy,
494 .broadcast = inet_broadcast,
495 .serialize_addr = inet_serialize_addr,
496 .deserialize_addr = inet_deserialize_addr,
497 .addr2str = inet_addr2str,
498 .register_cb = inet_register_cb,
501 #define MAX_INTERFACES 10
502 int get_local_address(int sock, struct in_addr *addr)
505 struct ifreq *ifr, req[MAX_INTERFACES];
507 bool loopback = false;
509 struct in_addr env_addr;
511 env = getenv("FORB_EXTERNAL_IP");
513 if (inet_aton(env, &env_addr) == 0) {
514 ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
520 ifc.ifc_len = sizeof(req);
523 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
525 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
526 struct sockaddr_in ia;
527 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
528 ret = ioctl(sock, SIOCGIFFLAGS, ifr);
529 if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
530 if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
534 if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
546 ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
557 * Initializes INET protocol port.
559 * @param port_desc Port description to initialize.
560 * @return Zero on success, -1 on error and errno is set
564 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
568 struct inet_port *port_priv;
569 struct sockaddr_in addr;
571 struct epoll_event ev;
573 port_priv = forb_malloc(sizeof(*port_priv));
577 memset(port_priv, 0, sizeof(*port_priv));
578 port_priv->last_recv_fd = -1;
579 inet_port_new_peer_init_head(port_priv);
581 /* Initialize UDP multicast socket */
582 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
583 if (port_priv->udp_socket == -1) goto err_free;
586 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
591 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
595 setnonblocking(port_priv->udp_socket);
597 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
599 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
600 mreq.imr_multiaddr = port_priv->multicast_addr;
601 mreq.imr_interface.s_addr = INADDR_ANY;
602 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
603 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
608 addr.sin_family = AF_INET;
609 addr.sin_port = htons(MCAST_PORT);
610 addr.sin_addr = port_priv->multicast_addr;
612 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
613 if (ret != 0) goto err_close_udp;
616 unsigned loop_size = sizeof(loop);
617 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
618 IP_MULTICAST_LOOP, &loop, loop_size);
623 /* Initialize TCP socket */
624 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
625 if (port_priv->listen_socket == -1) goto err_close_udp;
628 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
630 goto err_close_listen;
632 addr.sin_family = AF_INET;
633 addr.sin_port = htons(port);
634 addr.sin_addr = listen_on;
635 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
636 if (ret != 0) goto err_close_listen;
637 if (setnonblocking(port_priv->listen_socket))
638 goto err_close_listen;
640 ret = listen(port_priv->listen_socket, 10);
642 goto err_close_listen;
644 /* Determine our address and port*/
646 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
648 ul_logerr("Non-loopback inet address not found\n");
649 goto err_close_listen;
652 port_priv->addr.port = addr.sin_port;
653 if (listen_on.s_addr == INADDR_ANY) {
654 if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
655 goto err_close_listen;
658 port_priv->addr.addr = listen_on;
660 /* Initialize epoll descriptor */
661 port_priv->epoll_fd = epoll_create(10);
662 if (port_priv->epoll_fd == -1)
663 goto err_close_listen;
665 memset(&ev, 0, sizeof(ev));
666 ev.events = EPOLLIN | EPOLLET;
667 ev.data.fd = port_priv->listen_socket;
668 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
669 port_priv->listen_socket, &ev);
671 goto err_close_epoll;
673 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
674 ev.events = EPOLLIN | EPOLLET;
675 ev.data.fd = port_priv->udp_socket;
676 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
677 port_priv->udp_socket, &ev);
679 goto err_close_epoll;
682 port_desc->proto = &proto_inet;
683 port_desc->proto_priv = port_priv;
684 port_desc->addr = &port_priv->addr;
687 close(port_priv->epoll_fd);
690 close(port_priv->listen_socket);
694 close(port_priv->udp_socket);
698 forb_free(port_priv);