1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
64 #include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
68 * @author Michal Sojka <sojkam1@fel.cvut.cz>
69 * @date Sun Oct 12 16:10:23 2008
71 * @brief FORB transport protocol based on INET family sockets.
73 * UDP is used for broadcasts and TCP for requests/replies. There
74 * exist two uni-drectional connections between any two communicating
78 extern UL_LOG_CUST(ulogd_forb_proto_inet);
80 #define MCAST_PORT 15514 /**< Port used for multicasts */
81 #define MCAST_ADDR "225.15.5.14"
83 /** Address used by inet protocol. All values are stored in network
87 uint16_t port; /**< TCP listening port */
90 /** INET protocol data for ports. */
92 int udp_socket; /**< Socket for sending and receiving broadcasts */
93 int listen_socket; /* */
94 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
95 struct inet_addr addr; /**< Address of this port */
96 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
97 struct in_addr multicast_addr;
98 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
101 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
102 struct inet_port, /* cust_head_t */
103 forb_peer_t, /* cust_item_t */
104 new_peers, /* cust_head_field */
105 lnode) /* cust_node_field */
108 /** INET protocol data associated with every peer */
110 int socket; /**< Connected socket to the peer */
113 /* static struct inet_port* */
114 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
117 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
119 const struct inet_addr *a = addr;
120 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
121 CORBA_unsigned_short hport = ntohs(a->port);
123 ret = CORBA_unsigned_long_serialize(codec, &haddr);
126 return CORBA_unsigned_short_serialize(codec, &hport);
130 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
133 CORBA_unsigned_long s_addr;
134 CORBA_unsigned_short port;
137 a = forb_malloc(sizeof(*a));
140 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
143 ret = CORBA_unsigned_short_deserialize(codec, &port);
144 a->addr.s_addr = htonl(s_addr);
145 a->port = htons(port);
150 int setnonblocking(int fd);
153 inet_connect(forb_peer_t *peer)
155 struct inet_peer *ipeer;
156 struct sockaddr_in sa;
157 struct inet_addr *addr = peer->addr;
161 ul_logerr("No address to connect\n");
164 ipeer = forb_malloc(sizeof(*ipeer));
167 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
168 if (!ipeer->socket) {
169 ul_logerr("socket(): %s\n", strerror(errno));
172 sa.sin_family = AF_INET;
173 sa.sin_port = addr->port;
174 sa.sin_addr = addr->addr;
175 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
176 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
178 ul_logerr("connect error: %s\n", strerror(errno));
182 setnonblocking(ipeer->socket);
184 struct epoll_event ev;
185 struct inet_port *p = peer->port->desc.proto_priv;
186 memset(&ev, 0, sizeof(ev));
187 ev.events = EPOLLIN | EPOLLET;
188 ev.data.fd = ipeer->socket;
189 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
191 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
195 peer->proto_priv = ipeer;
197 #ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
198 ret = forb_iop_send_hello_to(peer);
206 close(ipeer->socket);
215 inet_send(forb_peer_t *peer, const void *buf, size_t len)
217 struct inet_peer *ipeer = peer->proto_priv;
221 ret = inet_connect(peer);
225 ipeer = peer->proto_priv;
229 ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
231 ret = send(ipeer->socket, buf, len, 0);
233 ul_logerr("send error: %s\n", strerror(errno));
244 /*----------------------------------------------------------------------
245 Portable function to set a socket into nonblocking mode.
246 Calling this on a socket causes all future read() and write() calls on
247 that socket to do only as much as they can immediately, and return
249 If no data can be read or written, they return -1 and set errno
250 to EAGAIN (or EWOULDBLOCK).
251 Thanks to Bjorn Reese for this code.
252 ----------------------------------------------------------------------*/
253 int setnonblocking(int fd)
257 /* If they have O_NONBLOCK, use the Posix way to do it */
258 #if defined(O_NONBLOCK)
259 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
260 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
262 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
264 /* Otherwise, use the old way of doing it */
266 return ioctl(fd, FIOBIO, &flags);
271 inet_accept_connection(forb_port_t *port)
273 struct inet_port *p = port->desc.proto_priv;
275 struct sockaddr_in addr;
276 socklen_t addrlen = sizeof(addr);
277 struct epoll_event ev;
281 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
286 ret = setnonblocking(client);
292 peer = forb_peer_new();
294 struct inet_peer *ipeer;
296 ipeer = forb_malloc(sizeof(*ipeer));
298 ipeer->socket = client;
299 peer->proto_priv = ipeer;
301 peer->state = FORB_PEER_DISCOVERED;
302 inet_port_new_peer_insert(p, peer);
308 memset(&ev, 0, sizeof(ev));
309 ev.events = EPOLLIN | EPOLLET;
311 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
315 inet_recv(forb_port_t *port, void *buf, size_t len)
317 struct inet_port *iport = port->desc.proto_priv;
319 struct epoll_event ev;
323 bool exported_new_peer = false;
326 if (iport->last_recv_fd == -1) {
327 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
328 if (nfds == -1 && errno == EINTR)
332 if (ev.data.fd == iport->listen_socket) {
333 ret = inet_accept_connection(port);
335 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
340 iport->last_recv_fd = ev.data.fd;
343 /* Check for first reception form a just connected peer */
344 ul_list_for_each(inet_port_new_peer, iport, peer) {
345 struct inet_peer *ipeer = peer->proto_priv;
346 //printf("checking new peer with fd=%d\n", ipeer->socket);
347 if (ipeer->socket == iport->last_recv_fd) {
348 inet_port_new_peer_delete(iport, peer);
350 if (port->new_peer) forb_peer_put(peer);
352 /* Let the upper layer assign forb ID
353 * to this peer according to the request*/
354 port->new_peer = peer;
355 exported_new_peer = true;
360 //printf("recv fd=%d\n", iport->last_recv_fd);
361 ret = recv(iport->last_recv_fd, buf, len, 0);
363 if (exported_new_peer) {
365 port->new_peer = NULL;
367 if (errno != EAGAIN) {
368 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
370 iport->last_recv_fd = -1;
374 if (exported_new_peer) {
376 port->new_peer = NULL;
378 ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
379 ul_list_for_each(forb_port_peer, port, peer) {
380 struct inet_peer *ipeer = peer->proto_priv;
381 if (ipeer && ipeer->socket == iport->last_recv_fd) {
382 forb_peer_disconnected(peer);
386 iport->last_recv_fd = -1;
389 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
393 return recv(iport->udp_socket, buf, len, 0);
398 inet_port_destroy(forb_port_t * port)
400 struct inet_port *pd = port->desc.proto_priv;
402 close(pd->udp_socket);
403 close(pd->listen_socket);
409 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
411 struct inet_port *p = port->desc.proto_priv;
412 struct sockaddr_in addr;
415 addr.sin_family = AF_INET;
416 addr.sin_port = htons(MCAST_PORT);
417 addr.sin_addr = p->multicast_addr;
419 ret = sendto(p->udp_socket, buf, len, 0,
420 (struct sockaddr*)&addr, sizeof(addr));
425 inet_peer_destroy(forb_peer_t *peer)
427 struct inet_peer *ipeer = peer->proto_priv;
429 peer->proto_priv = NULL;
430 ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
431 close(ipeer->socket);
436 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
438 const struct inet_addr *a = addr;
441 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
446 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
449 #include <fcb_contact_info.h>
451 static void inet_register_cb(forb_port_t *port)
453 struct inet_addr *ia;
455 ia = malloc(sizeof(*ia));
458 char *fcb_addr = getenv("FCB_ADDR");
459 if (!fcb_addr) fcb_addr = "127.0.0.1";
460 ia->addr.s_addr = inet_addr(fcb_addr);
461 ia->port = htons(FCB_TCP_PORT);
462 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
465 #define inet_register_cb NULL
468 static const forb_proto_t proto_inet = {
469 .hello_interval = 40 /* seconds */,
470 .port_destroy = inet_port_destroy,
471 .peer_destroy = inet_peer_destroy,
474 .broadcast = inet_broadcast,
475 .serialize_addr = inet_serialize_addr,
476 .deserialize_addr = inet_deserialize_addr,
477 .addr2str = inet_addr2str,
478 .register_cb = inet_register_cb,
481 #define MAX_INTERFACES 10
482 int get_local_address(int sock, struct in_addr *addr)
485 struct ifreq *ifr, req[MAX_INTERFACES];
487 bool loopback = false;
489 struct in_addr env_addr;
491 env = getenv("FORB_EXTERNAL_IP");
493 if (inet_aton(env, &env_addr) == 0) {
494 ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
500 ifc.ifc_len = sizeof(req);
503 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
505 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
506 struct sockaddr_in ia;
507 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
508 ret = ioctl(sock, SIOCGIFFLAGS, ifr);
509 if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
510 if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
514 if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
526 ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
537 * Initializes INET protocol port.
539 * @param port_desc Port description to initialize.
540 * @return Zero on success, -1 on error and errno is set
544 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
548 struct inet_port *port_priv;
549 struct sockaddr_in addr;
551 struct epoll_event ev;
553 port_priv = forb_malloc(sizeof(*port_priv));
557 memset(port_priv, 0, sizeof(*port_priv));
558 port_priv->last_recv_fd = -1;
559 inet_port_new_peer_init_head(port_priv);
561 /* Initialize UDP multicast socket */
562 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
563 if (port_priv->udp_socket == -1) goto err_free;
566 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
571 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
575 setnonblocking(port_priv->udp_socket);
577 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
579 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
580 mreq.imr_multiaddr = port_priv->multicast_addr;
581 mreq.imr_interface.s_addr = INADDR_ANY;
582 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
583 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
588 addr.sin_family = AF_INET;
589 addr.sin_port = htons(MCAST_PORT);
590 addr.sin_addr = port_priv->multicast_addr;
592 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
593 if (ret != 0) goto err_close_udp;
596 unsigned loop_size = sizeof(loop);
597 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
598 IP_MULTICAST_LOOP, &loop, loop_size);
603 /* Initialize TCP socket */
604 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
605 if (port_priv->listen_socket == -1) goto err_close_udp;
608 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
610 goto err_close_listen;
612 addr.sin_family = AF_INET;
613 addr.sin_port = htons(port);
614 addr.sin_addr = listen_on;
615 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
616 if (ret != 0) goto err_close_listen;
617 if (setnonblocking(port_priv->listen_socket))
618 goto err_close_listen;
620 ret = listen(port_priv->listen_socket, 10);
622 goto err_close_listen;
624 /* Determine our address and port*/
626 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
628 ul_logerr("Non-loopback inet address not found\n");
629 goto err_close_listen;
632 port_priv->addr.port = addr.sin_port;
633 if (listen_on.s_addr == INADDR_ANY) {
634 if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
635 goto err_close_listen;
638 port_priv->addr.addr = listen_on;
640 /* Initialize epoll descriptor */
641 port_priv->epoll_fd = epoll_create(10);
642 if (port_priv->epoll_fd == -1)
643 goto err_close_listen;
645 memset(&ev, 0, sizeof(ev));
646 ev.events = EPOLLIN | EPOLLET;
647 ev.data.fd = port_priv->listen_socket;
648 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
649 port_priv->listen_socket, &ev);
651 goto err_close_epoll;
653 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
654 ev.events = EPOLLIN | EPOLLET;
655 ev.data.fd = port_priv->udp_socket;
656 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
657 port_priv->udp_socket, &ev);
659 goto err_close_epoll;
662 port_desc->proto = &proto_inet;
663 port_desc->proto_priv = port_priv;
664 port_desc->addr = &port_priv->addr;
667 close(port_priv->epoll_fd);
670 close(port_priv->listen_socket);
674 close(port_priv->udp_socket);
678 forb_free(port_priv);