1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
62 #include "discovery.h"
66 * @author Michal Sojka <sojkam1@fel.cvut.cz>
67 * @date Sun Oct 12 16:10:23 2008
69 * @brief FORB transport protocol based on INET family sockets.
71 * UDP is used for broadcasts and TCP for requests/replies. There
72 * exist two uni-drectional connections between any two communicating
76 extern UL_LOG_CUST(ulogd_forb_proto_inet);
78 #define MCAST_PORT 15514 /**< Port used for multicasts */
79 #define MCAST_ADDR "225.15.5.14"
81 /** Address used by inet protocol. All values are stored in network
85 uint16_t port; /**< TCP listening port */
88 /** INET protocol data for ports. */
90 int udp_socket; /**< Socket for sending and receiving broadcasts */
91 int listen_socket; /* */
92 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
93 struct inet_addr addr; /**< Address of this port */
94 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
95 struct in_addr multicast_addr;
96 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
99 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
100 struct inet_port, /* cust_head_t */
101 forb_peer_t, /* cust_item_t */
102 new_peers, /* cust_head_field */
103 lnode) /* cust_node_field */
106 /** INET protocol data associated with every peer */
108 int socket; /**< Connected socket to the peer */
111 /* static struct inet_port* */
112 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
115 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
117 const struct inet_addr *a = addr;
118 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
119 CORBA_unsigned_short hport = ntohs(a->port);
121 ret = CORBA_unsigned_long_serialize(codec, &haddr);
124 return CORBA_unsigned_short_serialize(codec, &hport);
128 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
131 CORBA_unsigned_long s_addr;
132 CORBA_unsigned_short port;
135 a = forb_malloc(sizeof(*a));
138 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
141 ret = CORBA_unsigned_short_deserialize(codec, &port);
142 a->addr.s_addr = htonl(s_addr);
143 a->port = htons(port);
148 static struct inet_peer *
149 inet_connect(forb_peer_t *peer)
151 struct inet_peer *ipeer;
152 struct sockaddr_in sa;
153 struct inet_addr *addr = peer->addr;
157 ul_logerr("No address to connect\n");
160 ipeer = forb_malloc(sizeof(*ipeer));
163 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
164 if (!ipeer->socket) {
165 ul_logerr("socket(): %s\n", strerror(errno));
168 sa.sin_family = AF_INET;
169 sa.sin_port = addr->port;
170 sa.sin_addr = addr->addr;
171 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
172 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
174 ul_logerr("connect error: %s\n", strerror(errno));
178 struct epoll_event ev;
179 struct inet_port *p = peer->port->desc.proto_priv;
180 memset(&ev, 0, sizeof(ev));
181 ev.events = EPOLLIN | EPOLLET;
182 ev.data.fd = ipeer->socket;
183 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
185 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
192 close(ipeer->socket);
201 inet_send(forb_peer_t *peer, const void *buf, size_t len)
203 struct inet_peer *ipeer = peer->proto_priv;
207 ipeer = inet_connect(peer);
211 peer->proto_priv = ipeer;
216 ul_logdeb("send fd=%d\n", ipeer->socket);
218 ret = send(ipeer->socket, buf, len, 0);
220 ul_logerr("send error: %s\n", strerror(errno));
231 /*----------------------------------------------------------------------
232 Portable function to set a socket into nonblocking mode.
233 Calling this on a socket causes all future read() and write() calls on
234 that socket to do only as much as they can immediately, and return
236 If no data can be read or written, they return -1 and set errno
237 to EAGAIN (or EWOULDBLOCK).
238 Thanks to Bjorn Reese for this code.
239 ----------------------------------------------------------------------*/
240 int setnonblocking(int fd)
244 /* If they have O_NONBLOCK, use the Posix way to do it */
245 #if defined(O_NONBLOCK)
246 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
247 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
249 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
251 /* Otherwise, use the old way of doing it */
253 return ioctl(fd, FIOBIO, &flags);
258 inet_accept_connection(forb_port_t *port)
260 struct inet_port *p = port->desc.proto_priv;
262 struct sockaddr_in addr;
263 socklen_t addrlen = sizeof(addr);
264 struct epoll_event ev;
268 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
273 ret = setnonblocking(client);
279 peer = forb_peer_new();
281 struct inet_peer *ipeer;
283 ipeer = forb_malloc(sizeof(*ipeer));
285 ipeer->socket = client;
286 peer->proto_priv = ipeer;
288 peer->state = FORB_PEER_DISCOVERED;
289 inet_port_new_peer_insert(p, peer);
290 //printf("New connection d=%d\n", client);
296 memset(&ev, 0, sizeof(ev));
297 ev.events = EPOLLIN | EPOLLET;
299 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
303 inet_recv(forb_port_t *port, void *buf, size_t len)
305 struct inet_port *iport = port->desc.proto_priv;
307 struct epoll_event ev;
310 bool exported_new_peer = false;
313 if (iport->last_recv_fd == -1) {
314 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
317 if (ev.data.fd == iport->listen_socket) {
318 ret = inet_accept_connection(port);
324 iport->last_recv_fd = ev.data.fd;
327 /* Check for first reception form a just connected peer */
328 ul_list_for_each(inet_port_new_peer, iport, peer) {
329 struct inet_peer *ipeer = peer->proto_priv;
330 //printf("checking new peer with fd=%d\n", ipeer->socket);
331 if (ipeer->socket == iport->last_recv_fd) {
332 inet_port_new_peer_delete(iport, peer);
334 if (port->new_peer) forb_peer_put(peer);
336 /* Let the upper layer assign forb ID
337 * to this peer according to the request*/
338 port->new_peer = peer;
339 exported_new_peer = true;
344 //printf("recv fd=%d\n", iport->last_recv_fd);
345 ret = recv(iport->last_recv_fd, buf, len, 0);
347 if (exported_new_peer) {
349 port->new_peer = NULL;
351 if (errno == EAGAIN) {
352 iport->last_recv_fd = -1;
357 if (exported_new_peer) {
359 port->new_peer = NULL;
361 close(iport->last_recv_fd);
362 /* TODO: Notify FORB about peer disconnect */
363 iport->last_recv_fd = -1;
369 return recv(iport->udp_socket, buf, len, 0);
374 inet_port_destroy(forb_port_t * port)
376 struct inet_port *pd = port->desc.proto_priv;
378 close(pd->udp_socket);
379 close(pd->listen_socket);
385 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
387 struct inet_port *p = port->desc.proto_priv;
388 struct sockaddr_in addr;
391 addr.sin_family = AF_INET;
392 addr.sin_port = htons(MCAST_PORT);
393 addr.sin_addr = p->multicast_addr;
395 ret = sendto(p->udp_socket, buf, len, 0,
396 (struct sockaddr*)&addr, sizeof(addr));
401 inet_peer_destroy(forb_peer_t *peer)
403 struct inet_peer *ipeer = peer->proto_priv;
405 peer->proto_priv = NULL;
406 close(ipeer->socket);
411 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
413 const struct inet_addr *a = addr;
416 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
421 #if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
424 #include <fcb_contact_info.h>
427 static void inet_register_cb(forb_port_t *port)
429 struct inet_addr *ia;
431 ia = malloc(sizeof(*ia));
434 char *fcb_addr = getenv("FCB_ADDR");
435 if (!fcb_addr) fcb_addr = "127.0.0.1";
436 ia->addr.s_addr = inet_addr(fcb_addr);
437 ia->port = htons(FCB_TCP_PORT);
438 forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
441 #define inet_register_cb NULL
444 static const forb_proto_t proto_inet = {
445 .hello_interval = 40 /* seconds */,
446 .port_destroy = inet_port_destroy,
447 .peer_destroy = inet_peer_destroy,
450 .broadcast = inet_broadcast,
451 .serialize_addr = inet_serialize_addr,
452 .deserialize_addr = inet_deserialize_addr,
453 .addr2str = inet_addr2str,
454 .register_cb = inet_register_cb,
457 #define MAX_INTERFACES 10
458 int get_local_address(int sock, struct in_addr *addr)
461 struct ifreq *ifr, req[MAX_INTERFACES];
464 ifc.ifc_len = sizeof(req);
467 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
469 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
470 struct sockaddr_in ia;
471 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
472 ioctl(sock, SIOCGIFFLAGS, ifr);
473 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
482 * Initializes INET protocol port.
484 * @param port_desc Port description to initialize.
485 * @return Zero on success, -1 on error and errno is set
489 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
493 struct inet_port *port_priv;
494 struct sockaddr_in addr;
496 struct epoll_event ev;
498 port_priv = forb_malloc(sizeof(*port_priv));
502 memset(port_priv, 0, sizeof(*port_priv));
503 port_priv->last_recv_fd = -1;
504 inet_port_new_peer_init_head(port_priv);
506 /* Initialize UDP multicast socket */
507 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
508 if (port_priv->udp_socket == -1) goto err_free;
511 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
516 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
520 setnonblocking(port_priv->udp_socket);
523 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
524 mreq.imr_multiaddr = port_priv->multicast_addr;
525 mreq.imr_interface.s_addr = INADDR_ANY;
526 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
527 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
531 addr.sin_family = AF_INET;
532 addr.sin_port = htons(MCAST_PORT);
533 addr.sin_addr = port_priv->multicast_addr;
535 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
536 if (ret != 0) goto err_close_udp;
539 unsigned loop_size = sizeof(loop);
540 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
541 IP_MULTICAST_LOOP, &loop, loop_size);
546 /* Initialize TCP socket */
547 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
548 if (port_priv->listen_socket == -1) goto err_close_udp;
550 addr.sin_family = AF_INET;
551 addr.sin_port = htons(port);
552 addr.sin_addr = listen_on;
553 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
554 if (ret != 0) goto err_close_listen;
555 if (setnonblocking(port_priv->listen_socket))
556 goto err_close_listen;
558 ret = listen(port_priv->listen_socket, 10);
560 goto err_close_listen;
562 /* Determine our address and port*/
564 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
566 ul_logerr("Non-loopback inet address not found\n");
567 goto err_close_listen;
570 port_priv->addr.port = addr.sin_port;
571 if (listen_on.s_addr == INADDR_ANY)
572 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
574 port_priv->addr.addr = listen_on;
576 /* Initialize epoll descriptor */
577 port_priv->epoll_fd = epoll_create(10);
578 if (port_priv->epoll_fd == -1)
579 goto err_close_listen;
581 memset(&ev, 0, sizeof(ev));
582 ev.events = EPOLLIN | EPOLLET;
583 ev.data.fd = port_priv->listen_socket;
584 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
585 port_priv->listen_socket, &ev);
587 goto err_close_epoll;
589 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
590 ev.events = EPOLLIN | EPOLLET;
591 ev.data.fd = port_priv->udp_socket;
592 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
593 port_priv->udp_socket, &ev);
595 goto err_close_epoll;
598 port_desc->proto = &proto_inet;
599 port_desc->proto_priv = port_priv;
600 port_desc->addr = &port_priv->addr;
603 close(port_priv->epoll_fd);
606 close(port_priv->listen_socket);
610 close(port_priv->udp_socket);
614 forb_free(port_priv);