1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
66 * @author Michal Sojka <sojkam1@fel.cvut.cz>
67 * @date Sun Oct 12 16:10:23 2008
69 * @brief FORB transport protocol based on INET family sockets.
71 * UDP is used for broadcasts and TCP for requests/replies. There
72 * exist two uni-drectional connections between any two communicating
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
78 #define MCAST_PORT 15514 /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
81 /** Address used by inet protocol. All values are stored in network
85 uint16_t port; /**< TCP listening port */
88 /** INET protocol data for ports. */
90 int udp_socket; /**< Socket for sending and receiving broadcasts */
91 int listen_socket; /* */
92 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
93 struct inet_addr addr; /**< Address of this port */
94 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
95 struct in_addr multicast_addr;
96 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100 struct inet_port, /* cust_head_t */
101 forb_peer_t, /* cust_item_t */
102 new_peers, /* cust_head_field */
103 lnode) /* cust_node_field */
106 /** INET protocol data associated with every peer */
108 int socket; /**< Connected socket to the peer */
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
117 const struct inet_addr *a = addr;
118 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119 CORBA_unsigned_short hport = ntohs(a->port);
121 ret = CORBA_unsigned_long_serialize(codec, &haddr);
124 return CORBA_unsigned_short_serialize(codec, &hport);
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
131 CORBA_unsigned_long s_addr;
132 CORBA_unsigned_short port;
135 a = forb_malloc(sizeof(*a));
138 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
141 ret = CORBA_unsigned_short_deserialize(codec, &port);
142 a->addr.s_addr = htonl(s_addr);
143 a->port = htons(port);
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
151 struct inet_peer *ipeer;
152 struct sockaddr_in sa;
153 struct inet_addr *addr = peer->addr;
157 ul_logerr("No address to connect\n");
160 ipeer = forb_malloc(sizeof(*ipeer));
163 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164 if (!ipeer->socket) {
165 ul_logerr("socket(): %s\n", strerror(errno));
168 sa.sin_family = AF_INET;
169 sa.sin_port = addr->port;
170 sa.sin_addr = addr->addr;
171 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
174 ul_logerr("connect error: %s\n", strerror(errno));
178 struct epoll_event ev;
179 struct inet_port *p = peer->port->desc.proto_priv;
180 memset(&ev, 0, sizeof(ev));
181 ev.events = EPOLLIN | EPOLLET;
182 ev.data.fd = ipeer->socket;
183 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
185 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
192 close(ipeer->socket);
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
203 struct inet_peer *ipeer = peer->proto_priv;
207 ipeer = inet_connect(peer);
211 peer->proto_priv = ipeer;
216 ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
218 ret = send(ipeer->socket, buf, len, 0);
220 ul_logerr("send error: %s\n", strerror(errno));
231 /*----------------------------------------------------------------------
232 Portable function to set a socket into nonblocking mode.
233 Calling this on a socket causes all future read() and write() calls on
234 that socket to do only as much as they can immediately, and return
236 If no data can be read or written, they return -1 and set errno
237 to EAGAIN (or EWOULDBLOCK).
238 Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
244 /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
249 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
251 /* Otherwise, use the old way of doing it */
253 return ioctl(fd, FIOBIO, &flags);
258 inet_accept_connection(forb_port_t *port)
260 struct inet_port *p = port->desc.proto_priv;
262 struct sockaddr_in addr;
263 socklen_t addrlen = sizeof(addr);
264 struct epoll_event ev;
268 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
273 ret = setnonblocking(client);
279 peer = forb_peer_new();
281 struct inet_peer *ipeer;
283 ipeer = forb_malloc(sizeof(*ipeer));
285 ipeer->socket = client;
286 peer->proto_priv = ipeer;
288 peer->state = FORB_PEER_DISCOVERED;
289 inet_port_new_peer_insert(p, peer);
290 //printf("New connection d=%d\n", client);
296 memset(&ev, 0, sizeof(ev));
297 ev.events = EPOLLIN | EPOLLET;
299 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
303 inet_recv(forb_port_t *port, void *buf, size_t len)
305 struct inet_port *iport = port->desc.proto_priv;
307 struct epoll_event ev;
311 bool exported_new_peer = false;
314 if (iport->last_recv_fd == -1) {
315 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
318 if (ev.data.fd == iport->listen_socket) {
319 ret = inet_accept_connection(port);
321 ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
326 iport->last_recv_fd = ev.data.fd;
329 /* Check for first reception form a just connected peer */
330 ul_list_for_each(inet_port_new_peer, iport, peer) {
331 struct inet_peer *ipeer = peer->proto_priv;
332 //printf("checking new peer with fd=%d\n", ipeer->socket);
333 if (ipeer->socket == iport->last_recv_fd) {
334 inet_port_new_peer_delete(iport, peer);
336 if (port->new_peer) forb_peer_put(peer);
338 /* Let the upper layer assign forb ID
339 * to this peer according to the request*/
340 port->new_peer = peer;
341 exported_new_peer = true;
346 //printf("recv fd=%d\n", iport->last_recv_fd);
347 ret = recv(iport->last_recv_fd, buf, len, 0);
349 if (exported_new_peer) {
351 port->new_peer = NULL;
353 if (errno != EAGAIN) {
354 ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
356 iport->last_recv_fd = -1;
360 if (exported_new_peer) {
362 port->new_peer = NULL;
364 ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
365 close(iport->last_recv_fd);
366 /* TODO: Notify FORB about peer disconnect */
367 iport->last_recv_fd = -1;
370 ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
374 return recv(iport->udp_socket, buf, len, 0);
379 inet_port_destroy(forb_port_t * port)
381 struct inet_port *pd = port->desc.proto_priv;
383 close(pd->udp_socket);
384 close(pd->listen_socket);
390 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
392 struct inet_port *p = port->desc.proto_priv;
393 struct sockaddr_in addr;
396 addr.sin_family = AF_INET;
397 addr.sin_port = htons(MCAST_PORT);
398 addr.sin_addr = p->multicast_addr;
400 ret = sendto(p->udp_socket, buf, len, 0,
401 (struct sockaddr*)&addr, sizeof(addr));
406 inet_peer_destroy(forb_peer_t *peer)
408 struct inet_peer *ipeer = peer->proto_priv;
410 peer->proto_priv = NULL;
411 close(ipeer->socket);
416 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
418 const struct inet_addr *a = addr;
421 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
426 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
429 #include <fcb_contact_info.h>
432 static void inet_register_cb(forb_port_t *port)
434 struct inet_addr *ia;
436 ia = malloc(sizeof(*ia));
439 char *fcb_addr = getenv("FCB_ADDR");
440 if (!fcb_addr) fcb_addr = "127.0.0.1";
441 ia->addr.s_addr = inet_addr(fcb_addr);
442 ia->port = htons(FCB_TCP_PORT);
443 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
446 #define inet_register_cb NULL
449 static const forb_proto_t proto_inet = {
450 .hello_interval = 40 /* seconds */,
451 .port_destroy = inet_port_destroy,
452 .peer_destroy = inet_peer_destroy,
455 .broadcast = inet_broadcast,
456 .serialize_addr = inet_serialize_addr,
457 .deserialize_addr = inet_deserialize_addr,
458 .addr2str = inet_addr2str,
459 .register_cb = inet_register_cb,
462 #define MAX_INTERFACES 10
463 int get_local_address(int sock, struct in_addr *addr)
466 struct ifreq *ifr, req[MAX_INTERFACES];
469 ifc.ifc_len = sizeof(req);
472 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
474 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
475 struct sockaddr_in ia;
476 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
477 ioctl(sock, SIOCGIFFLAGS, ifr);
478 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
487 * Initializes INET protocol port.
489 * @param port_desc Port description to initialize.
490 * @return Zero on success, -1 on error and errno is set
494 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
498 struct inet_port *port_priv;
499 struct sockaddr_in addr;
501 struct epoll_event ev;
503 port_priv = forb_malloc(sizeof(*port_priv));
507 memset(port_priv, 0, sizeof(*port_priv));
508 port_priv->last_recv_fd = -1;
509 inet_port_new_peer_init_head(port_priv);
511 /* Initialize UDP multicast socket */
512 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
513 if (port_priv->udp_socket == -1) goto err_free;
516 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
521 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
525 setnonblocking(port_priv->udp_socket);
528 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
529 mreq.imr_multiaddr = port_priv->multicast_addr;
530 mreq.imr_interface.s_addr = INADDR_ANY;
531 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
532 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
536 addr.sin_family = AF_INET;
537 addr.sin_port = htons(MCAST_PORT);
538 addr.sin_addr = port_priv->multicast_addr;
540 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
541 if (ret != 0) goto err_close_udp;
544 unsigned loop_size = sizeof(loop);
545 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
546 IP_MULTICAST_LOOP, &loop, loop_size);
551 /* Initialize TCP socket */
552 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
553 if (port_priv->listen_socket == -1) goto err_close_udp;
556 setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
558 goto err_close_listen;
560 addr.sin_family = AF_INET;
561 addr.sin_port = htons(port);
562 addr.sin_addr = listen_on;
563 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
564 if (ret != 0) goto err_close_listen;
565 if (setnonblocking(port_priv->listen_socket))
566 goto err_close_listen;
568 ret = listen(port_priv->listen_socket, 10);
570 goto err_close_listen;
572 /* Determine our address and port*/
574 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
576 ul_logerr("Non-loopback inet address not found\n");
577 goto err_close_listen;
580 port_priv->addr.port = addr.sin_port;
581 if (listen_on.s_addr == INADDR_ANY)
582 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
584 port_priv->addr.addr = listen_on;
586 /* Initialize epoll descriptor */
587 port_priv->epoll_fd = epoll_create(10);
588 if (port_priv->epoll_fd == -1)
589 goto err_close_listen;
591 memset(&ev, 0, sizeof(ev));
592 ev.events = EPOLLIN | EPOLLET;
593 ev.data.fd = port_priv->listen_socket;
594 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
595 port_priv->listen_socket, &ev);
597 goto err_close_epoll;
599 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
600 ev.events = EPOLLIN | EPOLLET;
601 ev.data.fd = port_priv->udp_socket;
602 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
603 port_priv->udp_socket, &ev);
605 goto err_close_epoll;
608 port_desc->proto = &proto_inet;
609 port_desc->proto_priv = port_priv;
610 port_desc->addr = &port_priv->addr;
613 close(port_priv->epoll_fd);
616 close(port_priv->listen_socket);
620 close(port_priv->udp_socket);
624 forb_free(port_priv);