1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
64 #include <netinet/tcp.h>
65 #include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
69 * @author Michal Sojka <sojkam1@fel.cvut.cz>
70 * @date Sun Oct 12 16:10:23 2008
72 * @brief FORB transport protocol based on INET family sockets.
74 * UDP is used for broadcasts and TCP for requests/replies. There
75 * exist two uni-drectional connections between any two communicating
79 extern UL_LOG_CUST(ulogd_forb_proto_inet);
81 #define MCAST_PORT 15514 /**< Port used for multicasts */
82 #define MCAST_ADDR "225.15.5.14"
84 /** Address used by inet protocol. All values are stored in network
88 uint16_t port; /**< TCP listening port */
91 /** INET protocol data for ports. */
93 int udp_socket; /**< Socket for sending and receiving broadcasts */
94 int listen_socket; /* */
95 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
96 struct inet_addr addr; /**< Address of this port */
97 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
98 struct in_addr multicast_addr;
99 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
102 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
103 struct inet_port, /* cust_head_t */
104 forb_peer_t, /* cust_item_t */
105 new_peers, /* cust_head_field */
106 lnode) /* cust_node_field */
109 /** INET protocol data associated with every peer */
111 int socket; /**< Connected socket to the peer */
114 /* static struct inet_port* */
115 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
118 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
120 const struct inet_addr *a = addr;
121 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
122 CORBA_unsigned_short hport = ntohs(a->port);
124 ret = CORBA_unsigned_long_serialize(codec, &haddr);
127 return CORBA_unsigned_short_serialize(codec, &hport);
131 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
134 CORBA_unsigned_long s_addr;
135 CORBA_unsigned_short port;
138 a = forb_malloc(sizeof(*a));
141 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
144 ret = CORBA_unsigned_short_deserialize(codec, &port);
145 a->addr.s_addr = htonl(s_addr);
146 a->port = htons(port);
152 setnonblocking(int fd);
158 #if 0 /* For nice graphs in benchmarks */
160 ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
162 ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
170 inet_connect(forb_peer_t *peer)
172 struct inet_peer *ipeer;
173 struct sockaddr_in sa;
174 struct inet_addr *addr = peer->addr;
178 ul_logerr("No address to connect\n");
181 ipeer = forb_malloc(sizeof(*ipeer));
184 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
185 if (!ipeer->socket) {
186 ul_logerr("socket(): %s\n", strerror(errno));
189 sa.sin_family = AF_INET;
190 sa.sin_port = addr->port;
191 sa.sin_addr = addr->addr;
192 ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
193 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
195 ul_logerr("connect error: %s\n", strerror(errno));
199 setnonblocking(ipeer->socket);
200 setnodelay(ipeer->socket);
202 struct epoll_event ev;
203 struct inet_port *p = peer->port->desc.proto_priv;
204 memset(&ev, 0, sizeof(ev));
205 ev.events = EPOLLIN | EPOLLET;
206 ev.data.fd = ipeer->socket;
207 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
209 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
213 peer->proto_priv = ipeer;
215 #ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
216 ret = forb_iop_send_hello_to(peer);
224 close(ipeer->socket);
233 inet_send(forb_peer_t *peer, const void *buf, size_t len)
235 struct inet_peer *ipeer = peer->proto_priv;
239 ret = inet_connect(peer);
243 ipeer = peer->proto_priv;
247 ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
249 ret = send(ipeer->socket, buf, len, 0);
251 ul_logerr("send error: %s\n", strerror(errno));
262 /*----------------------------------------------------------------------
263 Portable function to set a socket into nonblocking mode.
264 Calling this on a socket causes all future read() and write() calls on
265 that socket to do only as much as they can immediately, and return
267 If no data can be read or written, they return -1 and set errno
268 to EAGAIN (or EWOULDBLOCK).
269 Thanks to Bjorn Reese for this code.
270 ----------------------------------------------------------------------*/
271 int setnonblocking(int fd)
275 /* If they have O_NONBLOCK, use the Posix way to do it */
276 #if defined(O_NONBLOCK)
277 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
278 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
280 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
282 /* Otherwise, use the old way of doing it */
284 return ioctl(fd, FIOBIO, &flags);
289 inet_accept_connection(forb_port_t *port)
291 struct inet_port *p = port->desc.proto_priv;
293 struct sockaddr_in addr;
294 socklen_t addrlen = sizeof(addr);
295 struct epoll_event ev;
299 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
304 ret = setnonblocking(client);
311 peer = forb_peer_new();
313 struct inet_peer *ipeer;
315 ipeer = forb_malloc(sizeof(*ipeer));
317 ipeer->socket = client;
318 peer->proto_priv = ipeer;
320 peer->state = FORB_PEER_DISCOVERED;
321 inet_port_new_peer_insert(p, peer);
327 memset(&ev, 0, sizeof(ev));
328 ev.events = EPOLLIN | EPOLLET;
330 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
334 inet_recv(forb_port_t *port, void *buf, size_t len)
336 struct inet_port *iport = port->desc.proto_priv;
338 struct epoll_event ev;
342 bool exported_new_peer = false;
345 if (iport->last_recv_fd == -1) {
346 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
347 if (nfds == -1 && errno == EINTR)
351 if (ev.data.fd == iport->listen_socket) {
352 ret = inet_accept_connection(port);
354 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
359 iport->last_recv_fd = ev.data.fd;
362 /* Check for first reception form a just connected peer */
363 ul_list_for_each(inet_port_new_peer, iport, peer) {
364 struct inet_peer *ipeer = peer->proto_priv;
365 //printf("checking new peer with fd=%d\n", ipeer->socket);
366 if (ipeer->socket == iport->last_recv_fd) {
367 inet_port_new_peer_delete(iport, peer);
370 forb_peer_put(port->new_peer);
372 /* Let the upper layer assign forb ID
373 * to this peer according to the request*/
374 port->new_peer = peer;
375 exported_new_peer = true;
380 //printf("recv fd=%d\n", iport->last_recv_fd);
381 ret = recv(iport->last_recv_fd, buf, len, 0);
383 if (exported_new_peer) {
385 port->new_peer = NULL;
387 if (errno != EAGAIN) {
388 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
390 iport->last_recv_fd = -1;
394 if (exported_new_peer) {
396 port->new_peer = NULL;
398 ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
399 ul_list_for_each(forb_port_peer, port, peer) {
400 struct inet_peer *ipeer = peer->proto_priv;
401 if (ipeer && ipeer->socket == iport->last_recv_fd) {
402 forb_peer_disconnected(peer);
406 iport->last_recv_fd = -1;
409 ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
413 return recv(iport->udp_socket, buf, len, 0);
418 inet_port_destroy(forb_port_t * port)
420 struct inet_port *pd = port->desc.proto_priv;
423 close(pd->udp_socket);
424 close(pd->listen_socket);
425 ul_list_for_each_cut(inet_port_new_peer, pd, peer) {
432 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
434 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
436 struct inet_port *p = port->desc.proto_priv;
437 struct sockaddr_in addr;
440 addr.sin_family = AF_INET;
441 addr.sin_port = htons(MCAST_PORT);
442 addr.sin_addr = p->multicast_addr;
444 ret = sendto(p->udp_socket, buf, len, 0,
445 (struct sockaddr*)&addr, sizeof(addr));
451 inet_peer_destroy(forb_peer_t *peer)
453 struct inet_peer *ipeer = peer->proto_priv;
455 peer->proto_priv = NULL;
456 ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
457 ipeer->socket, peer->orb_id);
458 close(ipeer->socket);
463 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
465 const struct inet_addr *a = addr;
468 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
473 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
476 #include <fcb_contact_info.h>
478 static void inet_register_cb(forb_port_t *port)
480 struct inet_addr *ia;
482 ia = malloc(sizeof(*ia));
485 char *fcb_addr = getenv("FCB_ADDR");
486 if (!fcb_addr) fcb_addr = "127.0.0.1";
487 ia->addr.s_addr = inet_addr(fcb_addr);
488 ia->port = htons(FCB_TCP_PORT);
489 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
492 #define inet_register_cb NULL
495 static const forb_proto_t proto_inet = {
496 .hello_interval = 40 /* seconds */,
497 .port_destroy = inet_port_destroy,
498 .peer_destroy = inet_peer_destroy,
501 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
502 .broadcast = inet_broadcast,
504 .serialize_addr = inet_serialize_addr,
505 .deserialize_addr = inet_deserialize_addr,
506 .addr2str = inet_addr2str,
507 .register_cb = inet_register_cb,
510 #define MAX_INTERFACES 10
511 int get_local_address(int sock, struct in_addr *addr)
514 struct ifreq *ifr, req[MAX_INTERFACES];
516 bool loopback = false;
518 struct in_addr env_addr;
520 env = getenv("FORB_EXTERNAL_IP");
522 if (inet_aton(env, &env_addr) == 0) {
523 ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
529 ifc.ifc_len = sizeof(req);
532 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
534 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
535 struct sockaddr_in ia;
536 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
537 ret = ioctl(sock, SIOCGIFFLAGS, ifr);
538 if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
539 if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
543 if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
555 ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
566 * Initializes INET protocol port.
568 * @param port_desc Port description to initialize.
569 * @return Zero on success, -1 on error and errno is set
573 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
577 struct inet_port *port_priv;
578 struct sockaddr_in addr;
580 struct epoll_event ev;
582 port_priv = forb_malloc(sizeof(*port_priv));
586 memset(port_priv, 0, sizeof(*port_priv));
587 port_priv->last_recv_fd = -1;
588 inet_port_new_peer_init_head(port_priv);
590 /* Initialize UDP multicast socket */
591 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
592 if (port_priv->udp_socket == -1) goto err_free;
595 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
600 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
604 setnonblocking(port_priv->udp_socket);
606 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
608 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
609 mreq.imr_multiaddr = port_priv->multicast_addr;
610 mreq.imr_interface.s_addr = INADDR_ANY;
611 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
612 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
617 addr.sin_family = AF_INET;
618 addr.sin_port = htons(MCAST_PORT);
619 addr.sin_addr = port_priv->multicast_addr;
621 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
622 if (ret != 0) goto err_close_udp;
625 unsigned loop_size = sizeof(loop);
626 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
627 IP_MULTICAST_LOOP, &loop, loop_size);
632 /* Initialize TCP socket */
633 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
634 if (port_priv->listen_socket == -1) goto err_close_udp;
637 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
639 goto err_close_listen;
641 addr.sin_family = AF_INET;
642 addr.sin_port = htons(port);
643 addr.sin_addr = listen_on;
644 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
645 if (ret != 0) goto err_close_listen;
646 if (setnonblocking(port_priv->listen_socket))
647 goto err_close_listen;
649 ret = listen(port_priv->listen_socket, 10);
651 goto err_close_listen;
653 /* Determine our address and port*/
655 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
657 ul_logerr("Non-loopback inet address not found\n");
658 goto err_close_listen;
661 port_priv->addr.port = addr.sin_port;
662 if (listen_on.s_addr == INADDR_ANY) {
663 if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
664 goto err_close_listen;
667 port_priv->addr.addr = listen_on;
669 /* Initialize epoll descriptor */
670 port_priv->epoll_fd = epoll_create(10);
671 if (port_priv->epoll_fd == -1)
672 goto err_close_listen;
674 memset(&ev, 0, sizeof(ev));
675 ev.events = EPOLLIN | EPOLLET;
676 ev.data.fd = port_priv->listen_socket;
677 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
678 port_priv->listen_socket, &ev);
680 goto err_close_epoll;
682 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
683 ev.events = EPOLLIN | EPOLLET;
684 ev.data.fd = port_priv->udp_socket;
685 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
686 port_priv->udp_socket, &ev);
688 goto err_close_epoll;
691 port_desc->proto = &proto_inet;
692 port_desc->proto_priv = port_priv;
693 port_desc->addr = &port_priv->addr;
696 close(port_priv->epoll_fd);
699 close(port_priv->listen_socket);
703 close(port_priv->udp_socket);
707 forb_free(port_priv);