1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
48 #include <arpa/inet.h>
51 #include <forb/proto_inet.h>
53 #include <netinet/in.h>
55 #include <sys/epoll.h>
56 #include <sys/ioctl.h>
57 #include <sys/socket.h>
58 #include <sys/types.h>
61 #include <forb/config.h>
65 * @author Michal Sojka <sojkam1@fel.cvut.cz>
66 * @date Sun Oct 12 16:10:23 2008
68 * @brief FORB transport protocol based on INET family sockets.
70 * UDP is used for broadcasts and TCP for requests/replies. There
71 * exist two uni-drectional connections between any two communicating
75 extern UL_LOG_CUST(ulogd_forb_proto_inet);
77 #define MCAST_PORT 15514 /**< Port used for multicasts */
78 #define MCAST_ADDR "225.15.5.14"
80 /** Address used by inet protocol. All values are stored in network
84 uint16_t port; /**< TCP listening port */
87 /** INET protocol data for ports. */
89 int udp_socket; /**< Socket for sending and receiving broadcasts */
90 int listen_socket; /* */
91 int epoll_fd; /**< File descriptor used by epoll() in inet_receive(). */
92 struct inet_addr addr; /**< Address of this port */
93 int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
94 struct in_addr multicast_addr;
95 ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
98 UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
99 struct inet_port, /* cust_head_t */
100 forb_peer_t, /* cust_item_t */
101 new_peers, /* cust_head_field */
102 lnode) /* cust_node_field */
105 /** INET protocol data associated with every peer */
107 int socket; /**< Connected socket to the peer */
110 /* static struct inet_port* */
111 /* peer_to_inet_port(forb_peer_t *peer) { return peer->port->desc.proto_priv; } */
114 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
116 const struct inet_addr *a = addr;
117 CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
118 CORBA_unsigned_short hport = ntohs(a->port);
120 ret = CORBA_unsigned_long_serialize(codec, &haddr);
123 return CORBA_unsigned_short_serialize(codec, &hport);
127 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
130 CORBA_unsigned_long s_addr;
131 CORBA_unsigned_short port;
134 a = forb_malloc(sizeof(*a));
137 ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
140 ret = CORBA_unsigned_short_deserialize(codec, &port);
141 a->addr.s_addr = htonl(s_addr);
142 a->port = htons(port);
147 static struct inet_peer *
148 inet_connect(forb_peer_t *peer)
150 struct inet_peer *ipeer;
151 struct sockaddr_in sa;
152 struct inet_addr *addr = peer->addr;
156 ul_logerr("No address to connect\n");
159 ipeer = forb_malloc(sizeof(*ipeer));
162 ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
163 if (!ipeer->socket) {
164 ul_logerr("socket(): %s\n", strerror(errno));
167 sa.sin_family = AF_INET;
168 sa.sin_port = addr->port;
169 sa.sin_addr = addr->addr;
170 ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
171 ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
173 ul_logerr("connect error: %s\n", strerror(errno));
177 struct epoll_event ev;
178 struct inet_port *p = peer->port->desc.proto_priv;
179 memset(&ev, 0, sizeof(ev));
180 ev.events = EPOLLIN | EPOLLET;
181 ev.data.fd = ipeer->socket;
182 ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
184 ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
191 close(ipeer->socket);
200 inet_send(forb_peer_t *peer, const void *buf, size_t len)
202 struct inet_peer *ipeer = peer->proto_priv;
206 ipeer = inet_connect(peer);
210 peer->proto_priv = ipeer;
215 ul_logdeb("send fd=%d\n", ipeer->socket);
217 ret = send(ipeer->socket, buf, len, 0);
219 ul_logerr("send error: %s\n", strerror(errno));
230 /*----------------------------------------------------------------------
231 Portable function to set a socket into nonblocking mode.
232 Calling this on a socket causes all future read() and write() calls on
233 that socket to do only as much as they can immediately, and return
235 If no data can be read or written, they return -1 and set errno
236 to EAGAIN (or EWOULDBLOCK).
237 Thanks to Bjorn Reese for this code.
238 ----------------------------------------------------------------------*/
239 int setnonblocking(int fd)
243 /* If they have O_NONBLOCK, use the Posix way to do it */
244 #if defined(O_NONBLOCK)
245 /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
246 if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
248 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 /* Otherwise, use the old way of doing it */
252 return ioctl(fd, FIOBIO, &flags);
257 inet_accept_connection(forb_port_t *port)
259 struct inet_port *p = port->desc.proto_priv;
261 struct sockaddr_in addr;
262 socklen_t addrlen = sizeof(addr);
263 struct epoll_event ev;
267 client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
272 ret = setnonblocking(client);
278 peer = forb_peer_new();
280 struct inet_peer *ipeer;
282 ipeer = forb_malloc(sizeof(*ipeer));
284 ipeer->socket = client;
285 peer->proto_priv = ipeer;
287 peer->state = FORB_PEER_DISCOVERED;
288 inet_port_new_peer_insert(p, peer);
289 //printf("New connection d=%d\n", client);
295 memset(&ev, 0, sizeof(ev));
296 ev.events = EPOLLIN | EPOLLET;
298 return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
302 inet_recv(forb_port_t *port, void *buf, size_t len)
304 struct inet_port *iport = port->desc.proto_priv;
306 struct epoll_event ev;
309 bool exported_new_peer = false;
312 if (iport->last_recv_fd == -1) {
313 nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
316 if (ev.data.fd == iport->listen_socket) {
317 ret = inet_accept_connection(port);
323 iport->last_recv_fd = ev.data.fd;
326 /* Check for first reception form a just connected peer */
327 ul_list_for_each(inet_port_new_peer, iport, peer) {
328 struct inet_peer *ipeer = peer->proto_priv;
329 //printf("checking new peer with fd=%d\n", ipeer->socket);
330 if (ipeer->socket == iport->last_recv_fd) {
331 inet_port_new_peer_delete(iport, peer);
333 if (port->new_peer) forb_peer_put(peer);
335 /* Let the upper layer assign forb ID
336 * to this peer according to the request*/
337 port->new_peer = peer;
338 exported_new_peer = true;
343 //printf("recv fd=%d\n", iport->last_recv_fd);
344 ret = recv(iport->last_recv_fd, buf, len, 0);
346 if (exported_new_peer) {
348 port->new_peer = NULL;
350 if (errno == EAGAIN) {
351 iport->last_recv_fd = -1;
356 if (exported_new_peer) {
358 port->new_peer = NULL;
360 close(iport->last_recv_fd);
361 /* TODO: Notify FORB about peer disconnect */
362 iport->last_recv_fd = -1;
368 return recv(iport->udp_socket, buf, len, 0);
373 inet_port_destroy(forb_port_t * port)
375 struct inet_port *pd = port->desc.proto_priv;
377 close(pd->udp_socket);
378 close(pd->listen_socket);
384 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
386 struct inet_port *p = port->desc.proto_priv;
387 struct sockaddr_in addr;
390 addr.sin_family = AF_INET;
391 addr.sin_port = htons(MCAST_PORT);
392 addr.sin_addr = p->multicast_addr;
394 ret = sendto(p->udp_socket, buf, len, 0,
395 (struct sockaddr*)&addr, sizeof(addr));
400 inet_peer_destroy(forb_peer_t *peer)
402 struct inet_peer *ipeer = peer->proto_priv;
404 peer->proto_priv = NULL;
405 close(ipeer->socket);
410 size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
412 const struct inet_addr *a = addr;
415 snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
421 static const forb_proto_t proto_inet = {
422 .hello_interval = 40 /* seconds */,
423 .port_destroy = inet_port_destroy,
424 .peer_destroy = inet_peer_destroy,
427 .broadcast = inet_broadcast,
428 .serialize_addr = inet_serialize_addr,
429 .deserialize_addr = inet_deserialize_addr,
430 .addr2str = inet_addr2str,
433 #define MAX_INTERFACES 10
434 int get_local_address(int sock, struct in_addr *addr)
437 struct ifreq *ifr, req[MAX_INTERFACES];
440 ifc.ifc_len = sizeof(req);
443 if (ioctl(sock, SIOCGIFCONF, &ifc) < 0)
445 for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
446 struct sockaddr_in ia;
447 memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
448 ioctl(sock, SIOCGIFFLAGS, ifr);
449 if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
458 * Initializes INET protocol port.
460 * @param port_desc Port description to initialize.
461 * @return Zero on success, -1 on error and errno is set
465 forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
469 struct inet_port *port_priv;
470 struct sockaddr_in addr;
472 struct epoll_event ev;
474 port_priv = forb_malloc(sizeof(*port_priv));
478 memset(port_priv, 0, sizeof(*port_priv));
479 port_priv->last_recv_fd = -1;
480 inet_port_new_peer_init_head(port_priv);
482 /* Initialize UDP multicast socket */
483 port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
484 if (port_priv->udp_socket == -1) goto err_free;
487 ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
492 setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
496 setnonblocking(port_priv->udp_socket);
499 inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
500 mreq.imr_multiaddr = port_priv->multicast_addr;
501 mreq.imr_interface.s_addr = INADDR_ANY;
502 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
503 IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
507 addr.sin_family = AF_INET;
508 addr.sin_port = htons(MCAST_PORT);
509 addr.sin_addr = port_priv->multicast_addr;
511 ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
512 if (ret != 0) goto err_close_udp;
515 unsigned loop_size = sizeof(loop);
516 ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
517 IP_MULTICAST_LOOP, &loop, loop_size);
522 /* Initialize TCP socket */
523 port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
524 if (port_priv->listen_socket == -1) goto err_close_udp;
526 addr.sin_family = AF_INET;
527 addr.sin_port = htons(port);
528 addr.sin_addr = listen_on;
529 ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
530 if (ret != 0) goto err_close_listen;
531 if (setnonblocking(port_priv->listen_socket))
532 goto err_close_listen;
534 ret = listen(port_priv->listen_socket, 10);
536 goto err_close_listen;
538 /* Determine our address and port*/
540 ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
542 ul_logerr("Non-loopback inet address not found\n");
543 goto err_close_listen;
546 port_priv->addr.port = addr.sin_port;
547 if (listen_on.s_addr == INADDR_ANY)
548 get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
550 port_priv->addr.addr = listen_on;
552 /* Initialize epoll descriptor */
553 port_priv->epoll_fd = epoll_create(10);
554 if (port_priv->epoll_fd == -1)
555 goto err_close_listen;
557 memset(&ev, 0, sizeof(ev));
558 ev.events = EPOLLIN | EPOLLET;
559 ev.data.fd = port_priv->listen_socket;
560 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
561 port_priv->listen_socket, &ev);
563 goto err_close_epoll;
565 #ifndef CONFIG_FORB_PROTO_INET_DEFAULT
566 ev.events = EPOLLIN | EPOLLET;
567 ev.data.fd = port_priv->udp_socket;
568 ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
569 port_priv->udp_socket, &ev);
571 goto err_close_epoll;
574 port_desc->proto = &proto_inet;
575 port_desc->proto_priv = port_priv;
576 port_desc->addr = &port_priv->addr;
579 close(port_priv->epoll_fd);
582 close(port_priv->listen_socket);
586 close(port_priv->udp_socket);
590 forb_free(port_priv);