1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
64 * @author Michal Sojka <sojkam1@fel.cvut.cz>
65 * @date Sun Oct 12 16:10:23 2008
67 * @brief FORB transport protocol based on INET family sockets.
69 * UDP is used for broadcasts and TCP for requests/replies. There
70 * exist two uni-drectional connections between any two communicating
74 extern UL_LOG_CUST(ulogd_forb_proto_inet);
76 #define MCAST_PORT 15514 /**< Port used for multicasts */
77 #define MCAST_ADDR "225.15.5.14"
79 /** Address used by inet protocol. All values are stored in network
83 uint16_t port; /**< TCP listening port */
86 /** INET protocol data for ports. */
88 int udp_socket; /**< Socket for sending and receiving broadcasts */
89 int listen_socket; /* */
90 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
91 struct inet_addr addr; /**< Address of this port */
92 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
93 struct in_addr multicast_addr;
94 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
97 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
98 struct inet_port, /* cust_head_t */
99 forb_peer_t, /* cust_item_t */
100 new_peers, /* cust_head_field */
101 lnode) /* cust_node_field */
104 /** INET protocol data associated with every peer */
106 int socket; /**< Connected socket to the peer */
109 /* static struct inet_port* */
110 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
113 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
115 const struct inet_addr *a = addr;
116 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
117 CORBA_unsigned_short hport = ntohs(a->port);
119 ret = CORBA_unsigned_long_serialize(codec, &haddr);
122 return CORBA_unsigned_short_serialize(codec, &hport);
126 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
129 CORBA_unsigned_long s_addr;
130 CORBA_unsigned_short port;
133 a = forb_malloc(sizeof(*a));
136 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
139 ret = CORBA_unsigned_short_deserialize(codec, &port);
140 a->addr.s_addr = htonl(s_addr);
141 a->port = htons(port);
146 static struct inet_peer *
147 inet_connect(forb_peer_t *peer)
149 struct inet_peer *ipeer;
150 struct sockaddr_in sa;
151 struct inet_addr *addr = peer->addr;
155 ul_logerr("No address to connect\n");
158 ipeer = forb_malloc(sizeof(*ipeer));
161 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
162 if (!ipeer->socket) {
163 ul_logerr("socket(): %s\n", strerror(errno));
166 sa.sin_family = AF_INET;
167 sa.sin_port = addr->port;
168 sa.sin_addr = addr->addr;
169 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
170 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
172 ul_logerr("connect error: %s\n", strerror(errno));
176 struct epoll_event ev;
177 struct inet_port *p = peer->port->desc.proto_priv;
178 memset(&ev, 0, sizeof(ev));
179 ev.events = EPOLLIN | EPOLLET;
180 ev.data.fd = ipeer->socket;
181 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
183 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
190 close(ipeer->socket);
199 inet_send(forb_peer_t *peer, const void *buf, size_t len)
201 struct inet_peer *ipeer = peer->proto_priv;
205 ipeer = inet_connect(peer);
209 peer->proto_priv = ipeer;
214 ul_logdeb("send fd=%d\n", ipeer->socket);
216 ret = send(ipeer->socket, buf, len, 0);
218 ul_logerr("send error: %s\n", strerror(errno));
229 /*----------------------------------------------------------------------
230 Portable function to set a socket into nonblocking mode.
231 Calling this on a socket causes all future read() and write() calls on
232 that socket to do only as much as they can immediately, and return
234 If no data can be read or written, they return -1 and set errno
235 to EAGAIN (or EWOULDBLOCK).
236 Thanks to Bjorn Reese for this code.
237 ----------------------------------------------------------------------*/
238 int setnonblocking(int fd)
242 /* If they have O_NONBLOCK, use the Posix way to do it */
243 #if defined(O_NONBLOCK)
244 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
245 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
247 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
249 /* Otherwise, use the old way of doing it */
251 return ioctl(fd, FIOBIO, &flags);
256 inet_accept_connection(forb_port_t *port)
258 struct inet_port *p = port->desc.proto_priv;
260 struct sockaddr_in addr;
261 socklen_t addrlen = sizeof(addr);
262 struct epoll_event ev;
266 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
271 ret = setnonblocking(client);
277 peer = forb_peer_new();
279 struct inet_peer *ipeer;
281 ipeer = forb_malloc(sizeof(*ipeer));
283 ipeer->socket = client;
284 peer->proto_priv = ipeer;
286 peer->state = FORB_PEER_DISCOVERED;
287 inet_port_new_peer_insert(p, peer);
288 //printf("New connection d=%d\n", client);
294 memset(&ev, 0, sizeof(ev));
295 ev.events = EPOLLIN | EPOLLET;
297 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
301 inet_recv(forb_port_t *port, void *buf, size_t len)
303 struct inet_port *iport = port->desc.proto_priv;
305 struct epoll_event ev;
308 bool exported_new_peer = false;
311 if (iport->last_recv_fd == -1) {
312 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
315 if (ev.data.fd == iport->listen_socket) {
316 ret = inet_accept_connection(port);
322 iport->last_recv_fd = ev.data.fd;
325 /* Check for first reception form a just connected peer */
326 ul_list_for_each(inet_port_new_peer, iport, peer) {
327 struct inet_peer *ipeer = peer->proto_priv;
328 //printf("checking new peer with fd=%d\n", ipeer->socket);
329 if (ipeer->socket == iport->last_recv_fd) {
330 inet_port_new_peer_delete(iport, peer);
332 if (port->new_peer) forb_peer_put(peer);
334 /* Let the upper layer assign forb ID
335 * to this peer according to the request*/
336 port->new_peer = peer;
337 exported_new_peer = true;
342 //printf("recv fd=%d\n", iport->last_recv_fd);
343 ret = recv(iport->last_recv_fd, buf, len, 0);
345 if (exported_new_peer) {
347 port->new_peer = NULL;
349 if (errno == EAGAIN) {
350 iport->last_recv_fd = -1;
355 if (exported_new_peer) {
357 port->new_peer = NULL;
359 close(iport->last_recv_fd);
360 /* TODO: Notify FORB about peer disconnect */
361 iport->last_recv_fd = -1;
367 return recv(iport->udp_socket, buf, len, 0);
372 inet_port_destroy(forb_port_t * port)
374 struct inet_port *pd = port->desc.proto_priv;
376 close(pd->udp_socket);
377 close(pd->listen_socket);
383 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
385 struct inet_port *p = port->desc.proto_priv;
386 struct sockaddr_in addr;
389 addr.sin_family = AF_INET;
390 addr.sin_port = htons(MCAST_PORT);
391 addr.sin_addr = p->multicast_addr;
393 ret = sendto(p->udp_socket, buf, len, 0,
394 (struct sockaddr*)&addr, sizeof(addr));
399 inet_peer_destroy(forb_peer_t *peer)
401 struct inet_peer *ipeer = peer->proto_priv;
403 peer->proto_priv = NULL;
404 close(ipeer->socket);
409 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
411 const struct inet_addr *a = addr;
414 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
420 static const forb_proto_t proto_inet = {
421 .hello_interval = 40 /* seconds */,
422 .port_destroy = inet_port_destroy,
423 .peer_destroy = inet_peer_destroy,
426 .broadcast = inet_broadcast,
427 .serialize_addr = inet_serialize_addr,
428 .deserialize_addr = inet_deserialize_addr,
429 .addr2str = inet_addr2str,
432 #define MAX_INTERFACES 10
433 int get_local_address(int sock, struct in_addr *addr)
436 struct ifreq *ifr, req[MAX_INTERFACES];
439 ifc.ifc_len = sizeof(req);
442 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
444 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
445 struct sockaddr_in ia;
446 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
447 ioctl(sock, SIOCGIFFLAGS, ifr);
448 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
457 * Initializes INET protocol port.
459 * @param port_desc Port description to initialize.
460 * @return Zero on success, -1 on error and errno is set
464 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
468 struct inet_port *port_priv;
469 struct sockaddr_in addr;
471 struct epoll_event ev;
473 port_priv = forb_malloc(sizeof(*port_priv));
477 memset(port_priv, 0, sizeof(*port_priv));
478 port_priv->last_recv_fd = -1;
479 inet_port_new_peer_init_head(port_priv);
481 /* Initialize UDP multicast socket */
482 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
483 if (port_priv->udp_socket == -1) goto err_free;
486 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
491 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
495 setnonblocking(port_priv->udp_socket);
498 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
499 mreq.imr_multiaddr = port_priv->multicast_addr;
500 mreq.imr_interface.s_addr = INADDR_ANY;
501 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
502 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
506 addr.sin_family = AF_INET;
507 addr.sin_port = htons(MCAST_PORT);
508 addr.sin_addr = port_priv->multicast_addr;
510 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
511 if (ret != 0) goto err_close_udp;
514 unsigned loop_size = sizeof(loop);
515 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
516 IP_MULTICAST_LOOP, &loop, loop_size);
521 /* Initialize TCP socket */
522 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
523 if (port_priv->listen_socket == -1) goto err_close_udp;
525 addr.sin_family = AF_INET;
526 addr.sin_port = htons(port);
527 addr.sin_addr = listen_on;
528 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
529 if (ret != 0) goto err_close_listen;
530 if (setnonblocking(port_priv->listen_socket))
531 goto err_close_listen;
533 ret = listen(port_priv->listen_socket, 10);
535 goto err_close_listen;
537 /* Determine our address and port*/
539 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
541 ul_logerr("Non-loopback inet address not found\n");
542 goto err_close_listen;
545 port_priv->addr.port = addr.sin_port;
546 if (listen_on.s_addr == INADDR_ANY)
547 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
549 port_priv->addr.addr = listen_on;
551 /* Initialize epoll descriptor */
552 port_priv->epoll_fd = epoll_create(10);
553 if (port_priv->epoll_fd == -1)
554 goto err_close_listen;
556 memset(&ev, 0, sizeof(ev));
557 ev.events = EPOLLIN | EPOLLET;
558 ev.data.fd = port_priv->listen_socket;
559 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
560 port_priv->listen_socket, &ev);
562 goto err_close_epoll;
564 ev.events = EPOLLIN | EPOLLET;
565 ev.data.fd = port_priv->udp_socket;
566 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
567 port_priv->udp_socket, &ev);
569 goto err_close_epoll;
571 port_desc->proto = &proto_inet;
572 port_desc->proto_priv = port_priv;
573 port_desc->addr = &port_priv->addr;
576 close(port_priv->epoll_fd);
579 close(port_priv->listen_socket);
583 close(port_priv->udp_socket);
587 forb_free(port_priv);