1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
64 #include <netinet/tcp.h>
65 #include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
69 * @author Michal Sojka <sojkam1@fel.cvut.cz>
70 * @date Sun Oct 12 16:10:23 2008
72 * @brief FORB transport protocol based on INET family sockets.
74 * UDP is used for broadcasts and TCP for requests/replies. There
75 * exist two uni-drectional connections between any two communicating
79 extern UL_LOG_CUST(ulogd_forb_proto_inet);
81 #define MCAST_PORT 15514 /**< Port used for multicasts */
82 #define MCAST_ADDR "225.15.5.14"
84 /** Address used by inet protocol. All values are stored in network
88 uint16_t port; /**< TCP listening port */
91 /** INET protocol data for ports. */
93 int udp_socket; /**< Socket for sending and receiving broadcasts */
94 int listen_socket; /* */
95 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
96 struct inet_addr addr; /**< Address of this port */
97 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
98 struct in_addr multicast_addr;
99 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
102 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
103 struct inet_port, /* cust_head_t */
104 forb_peer_t, /* cust_item_t */
105 new_peers, /* cust_head_field */
106 lnode) /* cust_node_field */
109 /** INET protocol data associated with every peer */
111 int socket; /**< Connected socket to the peer */
114 /* static struct inet_port* */
115 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
118 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
120 const struct inet_addr *a = addr;
121 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
122 CORBA_unsigned_short hport = ntohs(a->port);
124 ret = CORBA_unsigned_long_serialize(codec, &haddr);
127 return CORBA_unsigned_short_serialize(codec, &hport);
131 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
134 CORBA_unsigned_long s_addr;
135 CORBA_unsigned_short port;
138 a = forb_malloc(sizeof(*a));
141 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
144 ret = CORBA_unsigned_short_deserialize(codec, &port);
145 a->addr.s_addr = htonl(s_addr);
146 a->port = htons(port);
152 setnonblocking(int fd);
158 #if 0 /* For nice graphs in benchmarks */
160 ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
162 ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
170 inet_connect(forb_peer_t *peer)
172 struct inet_peer *ipeer;
173 struct sockaddr_in sa;
174 struct inet_addr *addr = peer->addr;
178 ul_logerr("No address to connect\n");
181 ipeer = forb_malloc(sizeof(*ipeer));
184 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
185 if (!ipeer->socket) {
186 ul_logerr("socket(): %s\n", strerror(errno));
189 sa.sin_family = AF_INET;
190 sa.sin_port = addr->port;
191 sa.sin_addr = addr->addr;
192 ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
193 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
195 ul_logerr("connect error: %s\n", strerror(errno));
199 setnonblocking(ipeer->socket);
200 setnodelay(ipeer->socket);
202 struct epoll_event ev;
203 struct inet_port *p = peer->port->desc.proto_priv;
204 memset(&ev, 0, sizeof(ev));
205 ev.events = EPOLLIN | EPOLLET;
206 ev.data.fd = ipeer->socket;
207 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
209 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
213 peer->proto_priv = ipeer;
215 #ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
216 ret = forb_iop_send_hello_to(peer);
224 close(ipeer->socket);
233 inet_send(forb_peer_t *peer, const void *buf, size_t len)
235 struct inet_peer *ipeer = peer->proto_priv;
239 ret = inet_connect(peer);
243 ipeer = peer->proto_priv;
247 ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
249 ret = send(ipeer->socket, buf, len, 0);
251 ul_logerr("send error: %s\n", strerror(errno));
262 /*----------------------------------------------------------------------
263 Portable function to set a socket into nonblocking mode.
264 Calling this on a socket causes all future read() and write() calls on
265 that socket to do only as much as they can immediately, and return
267 If no data can be read or written, they return -1 and set errno
268 to EAGAIN (or EWOULDBLOCK).
269 Thanks to Bjorn Reese for this code.
270 ----------------------------------------------------------------------*/
271 int setnonblocking(int fd)
275 /* If they have O_NONBLOCK, use the Posix way to do it */
276 #if defined(O_NONBLOCK)
277 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
278 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
280 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
282 /* Otherwise, use the old way of doing it */
284 return ioctl(fd, FIOBIO, &flags);
289 inet_accept_connection(forb_port_t *port)
291 struct inet_port *p = port->desc.proto_priv;
293 struct sockaddr_in addr;
294 socklen_t addrlen = sizeof(addr);
295 struct epoll_event ev;
299 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
304 ret = setnonblocking(client);
311 peer = forb_peer_new();
313 struct inet_peer *ipeer;
315 ipeer = forb_malloc(sizeof(*ipeer));
317 ipeer->socket = client;
318 peer->proto_priv = ipeer;
320 peer->state = FORB_PEER_DISCOVERED;
321 inet_port_new_peer_insert(p, peer);
327 memset(&ev, 0, sizeof(ev));
328 ev.events = EPOLLIN | EPOLLET;
330 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
334 inet_recv(forb_port_t *port, void *buf, size_t len)
336 struct inet_port *iport = port->desc.proto_priv;
338 struct epoll_event ev;
342 bool exported_new_peer = false;
345 if (iport->last_recv_fd == -1) {
346 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
347 if (nfds == -1 && errno == EINTR)
351 if (ev.data.fd == iport->listen_socket) {
352 ret = inet_accept_connection(port);
354 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
359 iport->last_recv_fd = ev.data.fd;
362 /* Check for first reception form a just connected peer */
363 ul_list_for_each(inet_port_new_peer, iport, peer) {
364 struct inet_peer *ipeer = peer->proto_priv;
365 //printf("checking new peer with fd=%d\n", ipeer->socket);
366 if (ipeer->socket == iport->last_recv_fd) {
367 inet_port_new_peer_delete(iport, peer);
369 if (port->new_peer) forb_peer_put(peer);
371 /* Let the upper layer assign forb ID
372 * to this peer according to the request*/
373 port->new_peer = peer;
374 exported_new_peer = true;
379 //printf("recv fd=%d\n", iport->last_recv_fd);
380 ret = recv(iport->last_recv_fd, buf, len, 0);
382 if (exported_new_peer) {
384 port->new_peer = NULL;
386 if (errno != EAGAIN) {
387 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
389 iport->last_recv_fd = -1;
393 if (exported_new_peer) {
395 port->new_peer = NULL;
397 ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
398 ul_list_for_each(forb_port_peer, port, peer) {
399 struct inet_peer *ipeer = peer->proto_priv;
400 if (ipeer && ipeer->socket == iport->last_recv_fd) {
401 forb_peer_disconnected(peer);
405 iport->last_recv_fd = -1;
408 ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
412 return recv(iport->udp_socket, buf, len, 0);
417 inet_port_destroy(forb_port_t * port)
419 struct inet_port *pd = port->desc.proto_priv;
422 close(pd->udp_socket);
423 close(pd->listen_socket);
424 ul_list_for_each_cut(inet_port_new_peer, pd, peer) {
431 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
433 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
435 struct inet_port *p = port->desc.proto_priv;
436 struct sockaddr_in addr;
439 addr.sin_family = AF_INET;
440 addr.sin_port = htons(MCAST_PORT);
441 addr.sin_addr = p->multicast_addr;
443 ret = sendto(p->udp_socket, buf, len, 0,
444 (struct sockaddr*)&addr, sizeof(addr));
450 inet_peer_destroy(forb_peer_t *peer)
452 struct inet_peer *ipeer = peer->proto_priv;
454 peer->proto_priv = NULL;
455 ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
456 ipeer->socket, peer->orb_id);
457 close(ipeer->socket);
462 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
464 const struct inet_addr *a = addr;
467 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
472 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
475 #include <fcb_contact_info.h>
477 static void inet_register_cb(forb_port_t *port)
479 struct inet_addr *ia;
481 ia = malloc(sizeof(*ia));
484 char *fcb_addr = getenv("FCB_ADDR");
485 if (!fcb_addr) fcb_addr = "127.0.0.1";
486 ia->addr.s_addr = inet_addr(fcb_addr);
487 ia->port = htons(FCB_TCP_PORT);
488 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
491 #define inet_register_cb NULL
494 static const forb_proto_t proto_inet = {
495 .hello_interval = 40 /* seconds */,
496 .port_destroy = inet_port_destroy,
497 .peer_destroy = inet_peer_destroy,
500 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
501 .broadcast = inet_broadcast,
503 .serialize_addr = inet_serialize_addr,
504 .deserialize_addr = inet_deserialize_addr,
505 .addr2str = inet_addr2str,
506 .register_cb = inet_register_cb,
509 #define MAX_INTERFACES 10
510 int get_local_address(int sock, struct in_addr *addr)
513 struct ifreq *ifr, req[MAX_INTERFACES];
515 bool loopback = false;
517 struct in_addr env_addr;
519 env = getenv("FORB_EXTERNAL_IP");
521 if (inet_aton(env, &env_addr) == 0) {
522 ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
528 ifc.ifc_len = sizeof(req);
531 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
533 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
534 struct sockaddr_in ia;
535 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
536 ret = ioctl(sock, SIOCGIFFLAGS, ifr);
537 if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
538 if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
542 if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
554 ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
565 * Initializes INET protocol port.
567 * @param port_desc Port description to initialize.
568 * @return Zero on success, -1 on error and errno is set
572 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
576 struct inet_port *port_priv;
577 struct sockaddr_in addr;
579 struct epoll_event ev;
581 port_priv = forb_malloc(sizeof(*port_priv));
585 memset(port_priv, 0, sizeof(*port_priv));
586 port_priv->last_recv_fd = -1;
587 inet_port_new_peer_init_head(port_priv);
589 /* Initialize UDP multicast socket */
590 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
591 if (port_priv->udp_socket == -1) goto err_free;
594 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
599 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
603 setnonblocking(port_priv->udp_socket);
605 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
607 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
608 mreq.imr_multiaddr = port_priv->multicast_addr;
609 mreq.imr_interface.s_addr = INADDR_ANY;
610 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
611 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
616 addr.sin_family = AF_INET;
617 addr.sin_port = htons(MCAST_PORT);
618 addr.sin_addr = port_priv->multicast_addr;
620 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
621 if (ret != 0) goto err_close_udp;
624 unsigned loop_size = sizeof(loop);
625 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
626 IP_MULTICAST_LOOP, &loop, loop_size);
631 /* Initialize TCP socket */
632 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
633 if (port_priv->listen_socket == -1) goto err_close_udp;
636 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
638 goto err_close_listen;
640 addr.sin_family = AF_INET;
641 addr.sin_port = htons(port);
642 addr.sin_addr = listen_on;
643 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
644 if (ret != 0) goto err_close_listen;
645 if (setnonblocking(port_priv->listen_socket))
646 goto err_close_listen;
648 ret = listen(port_priv->listen_socket, 10);
650 goto err_close_listen;
652 /* Determine our address and port*/
654 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
656 ul_logerr("Non-loopback inet address not found\n");
657 goto err_close_listen;
660 port_priv->addr.port = addr.sin_port;
661 if (listen_on.s_addr == INADDR_ANY) {
662 if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
663 goto err_close_listen;
666 port_priv->addr.addr = listen_on;
668 /* Initialize epoll descriptor */
669 port_priv->epoll_fd = epoll_create(10);
670 if (port_priv->epoll_fd == -1)
671 goto err_close_listen;
673 memset(&ev, 0, sizeof(ev));
674 ev.events = EPOLLIN | EPOLLET;
675 ev.data.fd = port_priv->listen_socket;
676 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
677 port_priv->listen_socket, &ev);
679 goto err_close_epoll;
681 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
682 ev.events = EPOLLIN | EPOLLET;
683 ev.data.fd = port_priv->udp_socket;
684 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
685 port_priv->udp_socket, &ev);
687 goto err_close_epoll;
690 port_desc->proto = &proto_inet;
691 port_desc->proto_priv = port_priv;
692 port_desc->addr = &port_priv->addr;
695 close(port_priv->epoll_fd);
698 close(port_priv->listen_socket);
702 close(port_priv->udp_socket);
706 forb_free(port_priv);