1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
64 #include <netinet/tcp.h>
65 #include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
69 * @author Michal Sojka <sojkam1@fel.cvut.cz>
70 * @date Sun Oct 12 16:10:23 2008
72 * @brief FORB transport protocol based on INET family sockets.
74 * UDP is used for broadcasts and TCP for requests/replies. There
75 * exist two uni-drectional connections between any two communicating
79 extern UL_LOG_CUST(ulogd_forb_proto_inet);
81 #define MCAST_PORT 15514 /**< Port used for multicasts */
82 #define MCAST_ADDR "225.15.5.14"
84 /** Address used by inet protocol. All values are stored in network
88 uint16_t port; /**< TCP listening port */
91 /** INET protocol data for ports. */
93 int udp_socket; /**< Socket for sending and receiving broadcasts */
94 int listen_socket; /* */
95 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
96 struct inet_addr addr; /**< Address of this port */
97 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
98 struct in_addr multicast_addr;
99 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
102 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
103 struct inet_port, /* cust_head_t */
104 forb_peer_t, /* cust_item_t */
105 new_peers, /* cust_head_field */
106 lnode) /* cust_node_field */
109 /** INET protocol data associated with every peer */
111 int socket; /**< Connected socket to the peer */
114 /* static struct inet_port* */
115 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
118 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
120 const struct inet_addr *a = addr;
121 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
122 CORBA_unsigned_short hport = ntohs(a->port);
124 ret = CORBA_unsigned_long_serialize(codec, &haddr);
127 return CORBA_unsigned_short_serialize(codec, &hport);
131 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
134 CORBA_unsigned_long s_addr;
135 CORBA_unsigned_short port;
138 a = forb_malloc(sizeof(*a));
141 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
144 ret = CORBA_unsigned_short_deserialize(codec, &port);
145 a->addr.s_addr = htonl(s_addr);
146 a->port = htons(port);
152 setnonblocking(int fd);
158 #if 0 /* For nice graphs in benchmarks */
160 ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
162 ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
170 inet_connect(forb_peer_t *peer)
172 struct inet_peer *ipeer;
173 struct sockaddr_in sa;
174 struct inet_addr *addr = peer->addr;
178 ul_logerr("No address to connect\n");
181 ipeer = forb_malloc(sizeof(*ipeer));
184 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
185 if (!ipeer->socket) {
186 ul_logerr("socket(): %s\n", strerror(errno));
189 sa.sin_family = AF_INET;
190 sa.sin_port = addr->port;
191 sa.sin_addr = addr->addr;
192 ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
193 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
195 ul_logerr("connect error: %s\n", strerror(errno));
199 setnonblocking(ipeer->socket);
200 setnodelay(ipeer->socket);
202 struct epoll_event ev;
203 struct inet_port *p = peer->port->desc.proto_priv;
204 memset(&ev, 0, sizeof(ev));
205 ev.events = EPOLLIN | EPOLLET;
206 ev.data.fd = ipeer->socket;
207 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
209 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
213 peer->proto_priv = ipeer;
215 #ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
216 ret = forb_iop_send_hello_to(peer);
224 close(ipeer->socket);
233 inet_send(forb_peer_t *peer, const void *buf, size_t len)
235 struct inet_peer *ipeer = peer->proto_priv;
239 ret = inet_connect(peer);
243 ipeer = peer->proto_priv;
247 ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
249 ret = send(ipeer->socket, buf, len, 0);
251 ul_logerr("send error: %s\n", strerror(errno));
262 /*----------------------------------------------------------------------
263 Portable function to set a socket into nonblocking mode.
264 Calling this on a socket causes all future read() and write() calls on
265 that socket to do only as much as they can immediately, and return
267 If no data can be read or written, they return -1 and set errno
268 to EAGAIN (or EWOULDBLOCK).
269 Thanks to Bjorn Reese for this code.
270 ----------------------------------------------------------------------*/
271 int setnonblocking(int fd)
275 /* If they have O_NONBLOCK, use the Posix way to do it */
276 #if defined(O_NONBLOCK)
277 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
278 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
280 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
282 /* Otherwise, use the old way of doing it */
284 return ioctl(fd, FIOBIO, &flags);
289 inet_accept_connection(forb_port_t *port)
291 struct inet_port *p = port->desc.proto_priv;
293 struct sockaddr_in addr;
294 socklen_t addrlen = sizeof(addr);
295 struct epoll_event ev;
299 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
304 ret = setnonblocking(client);
311 peer = forb_peer_new();
313 struct inet_peer *ipeer;
315 ipeer = forb_malloc(sizeof(*ipeer));
317 ipeer->socket = client;
318 peer->proto_priv = ipeer;
320 peer->state = FORB_PEER_DISCOVERED;
321 inet_port_new_peer_insert(p, peer);
327 memset(&ev, 0, sizeof(ev));
328 ev.events = EPOLLIN | EPOLLET;
330 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
334 inet_recv(forb_port_t *port, void *buf, size_t len)
336 struct inet_port *iport = port->desc.proto_priv;
338 struct epoll_event ev;
342 bool exported_new_peer = false;
345 if (iport->last_recv_fd == -1) {
346 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
347 if (nfds == -1 && errno == EINTR)
351 if (ev.data.fd == iport->listen_socket) {
352 ret = inet_accept_connection(port);
354 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
359 iport->last_recv_fd = ev.data.fd;
362 /* Check for first reception form a just connected peer */
363 ul_list_for_each(inet_port_new_peer, iport, peer) {
364 struct inet_peer *ipeer = peer->proto_priv;
365 //printf("checking new peer with fd=%d\n", ipeer->socket);
366 if (ipeer->socket == iport->last_recv_fd) {
367 inet_port_new_peer_delete(iport, peer);
369 if (port->new_peer) forb_peer_put(peer);
371 /* Let the upper layer assign forb ID
372 * to this peer according to the request*/
373 port->new_peer = peer;
374 exported_new_peer = true;
379 //printf("recv fd=%d\n", iport->last_recv_fd);
380 ret = recv(iport->last_recv_fd, buf, len, 0);
382 if (exported_new_peer) {
384 port->new_peer = NULL;
386 if (errno != EAGAIN) {
387 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
389 iport->last_recv_fd = -1;
393 if (exported_new_peer) {
395 port->new_peer = NULL;
397 ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
398 ul_list_for_each(forb_port_peer, port, peer) {
399 struct inet_peer *ipeer = peer->proto_priv;
400 if (ipeer && ipeer->socket == iport->last_recv_fd) {
401 forb_peer_disconnected(peer);
405 iport->last_recv_fd = -1;
408 ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
412 return recv(iport->udp_socket, buf, len, 0);
417 inet_port_destroy(forb_port_t * port)
419 struct inet_port *pd = port->desc.proto_priv;
421 close(pd->udp_socket);
422 close(pd->listen_socket);
427 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
429 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
431 struct inet_port *p = port->desc.proto_priv;
432 struct sockaddr_in addr;
435 addr.sin_family = AF_INET;
436 addr.sin_port = htons(MCAST_PORT);
437 addr.sin_addr = p->multicast_addr;
439 ret = sendto(p->udp_socket, buf, len, 0,
440 (struct sockaddr*)&addr, sizeof(addr));
446 inet_peer_destroy(forb_peer_t *peer)
448 struct inet_peer *ipeer = peer->proto_priv;
450 peer->proto_priv = NULL;
451 ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
452 ipeer->socket, peer->orb_id);
453 close(ipeer->socket);
458 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
460 const struct inet_addr *a = addr;
463 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
468 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
471 #include <fcb_contact_info.h>
473 static void inet_register_cb(forb_port_t *port)
475 struct inet_addr *ia;
477 ia = malloc(sizeof(*ia));
480 char *fcb_addr = getenv("FCB_ADDR");
481 if (!fcb_addr) fcb_addr = "127.0.0.1";
482 ia->addr.s_addr = inet_addr(fcb_addr);
483 ia->port = htons(FCB_TCP_PORT);
484 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
487 #define inet_register_cb NULL
490 static const forb_proto_t proto_inet = {
491 .hello_interval = 40 /* seconds */,
492 .port_destroy = inet_port_destroy,
493 .peer_destroy = inet_peer_destroy,
496 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
497 .broadcast = inet_broadcast,
499 .serialize_addr = inet_serialize_addr,
500 .deserialize_addr = inet_deserialize_addr,
501 .addr2str = inet_addr2str,
502 .register_cb = inet_register_cb,
505 #define MAX_INTERFACES 10
506 int get_local_address(int sock, struct in_addr *addr)
509 struct ifreq *ifr, req[MAX_INTERFACES];
511 bool loopback = false;
513 struct in_addr env_addr;
515 env = getenv("FORB_EXTERNAL_IP");
517 if (inet_aton(env, &env_addr) == 0) {
518 ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
524 ifc.ifc_len = sizeof(req);
527 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
529 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
530 struct sockaddr_in ia;
531 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
532 ret = ioctl(sock, SIOCGIFFLAGS, ifr);
533 if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
534 if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
538 if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
550 ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
561 * Initializes INET protocol port.
563 * @param port_desc Port description to initialize.
564 * @return Zero on success, -1 on error and errno is set
568 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
572 struct inet_port *port_priv;
573 struct sockaddr_in addr;
575 struct epoll_event ev;
577 port_priv = forb_malloc(sizeof(*port_priv));
581 memset(port_priv, 0, sizeof(*port_priv));
582 port_priv->last_recv_fd = -1;
583 inet_port_new_peer_init_head(port_priv);
585 /* Initialize UDP multicast socket */
586 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
587 if (port_priv->udp_socket == -1) goto err_free;
590 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
595 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
599 setnonblocking(port_priv->udp_socket);
601 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
603 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
604 mreq.imr_multiaddr = port_priv->multicast_addr;
605 mreq.imr_interface.s_addr = INADDR_ANY;
606 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
607 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
612 addr.sin_family = AF_INET;
613 addr.sin_port = htons(MCAST_PORT);
614 addr.sin_addr = port_priv->multicast_addr;
616 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
617 if (ret != 0) goto err_close_udp;
620 unsigned loop_size = sizeof(loop);
621 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
622 IP_MULTICAST_LOOP, &loop, loop_size);
627 /* Initialize TCP socket */
628 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
629 if (port_priv->listen_socket == -1) goto err_close_udp;
632 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
634 goto err_close_listen;
636 addr.sin_family = AF_INET;
637 addr.sin_port = htons(port);
638 addr.sin_addr = listen_on;
639 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
640 if (ret != 0) goto err_close_listen;
641 if (setnonblocking(port_priv->listen_socket))
642 goto err_close_listen;
644 ret = listen(port_priv->listen_socket, 10);
646 goto err_close_listen;
648 /* Determine our address and port*/
650 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
652 ul_logerr("Non-loopback inet address not found\n");
653 goto err_close_listen;
656 port_priv->addr.port = addr.sin_port;
657 if (listen_on.s_addr == INADDR_ANY) {
658 if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
659 goto err_close_listen;
662 port_priv->addr.addr = listen_on;
664 /* Initialize epoll descriptor */
665 port_priv->epoll_fd = epoll_create(10);
666 if (port_priv->epoll_fd == -1)
667 goto err_close_listen;
669 memset(&ev, 0, sizeof(ev));
670 ev.events = EPOLLIN | EPOLLET;
671 ev.data.fd = port_priv->listen_socket;
672 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
673 port_priv->listen_socket, &ev);
675 goto err_close_epoll;
677 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
678 ev.events = EPOLLIN | EPOLLET;
679 ev.data.fd = port_priv->udp_socket;
680 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
681 port_priv->udp_socket, &ev);
683 goto err_close_epoll;
686 port_desc->proto = &proto_inet;
687 port_desc->proto_priv = port_priv;
688 port_desc->addr = &port_priv->addr;
691 close(port_priv->epoll_fd);
694 close(port_priv->listen_socket);
698 close(port_priv->udp_socket);
702 forb_free(port_priv);