From 0b79d55fff90ed4d0a48bbb97c9c3f787e575f41 Mon Sep 17 00:00:00 2001 From: Michal Sojka Date: Sun, 10 May 2009 23:14:55 +0200 Subject: [PATCH] Make proto_inet usable also as local protocol Now inet protocol's TCP socket does not listen on a specific port so that multiple applications on a single node can communicate using this protocol. UDP socket listens on a fixed port and since SO_REUSEADDR is enabled, multiple applications can receive the same content. --- src/Makefile.omk | 6 +- src/forb.c | 24 ++++++++ src/proto_inet.c | 112 ++++++++++++++++++++++++------------ src/tests/Makefile.omk | 8 ++- src/tests/discovery.c | 6 +- src/tests/test_proto_inet.c | 7 +-- 6 files changed, 113 insertions(+), 50 deletions(-) diff --git a/src/Makefile.omk b/src/Makefile.omk index aa04362..713e3e0 100644 --- a/src/Makefile.omk +++ b/src/Makefile.omk @@ -48,11 +48,13 @@ renamed_include_GEN_HEADERS = \ $(call to_forb_subdir,types.h) default_CONFIG = CONFIG_FORB_PROTO_UNIX=y \ - CONFIG_FORB_RECV_BUF_SIZE=4096 + CONFIG_FORB_RECV_BUF_SIZE=4096 \ + CONFIG_FORB_PROTO_INET_DEFAULT=n config_include_HEADERS = forb/config.h config_DEFINES = CONFIG_FORB_PROTO_UNIX \ - CONFIG_FORB_RECV_BUF_SIZE + CONFIG_FORB_RECV_BUF_SIZE \ + CONFIG_FORB_PROTO_INET_DEFAULT include-pass_HOOKS = log_domains.inc #request_gavl.inc diff --git a/src/forb.c b/src/forb.c index 2008ebb..a60c71f 100644 --- a/src/forb.c +++ b/src/forb.c @@ -79,6 +79,10 @@ #ifdef CONFIG_FORB_PROTO_UNIX #include #endif +#ifdef CONFIG_FORB_PROTO_INET_DEFAULT +#include +#endif + #ifdef DEBUG #define UL_LOGL_DEF UL_LOGL_DEB @@ -227,6 +231,26 @@ forb_init(int *argc, char **argv[], const struct forb_init_attr *attr) goto err2; unix_ok:; } +#endif +#ifdef CONFIG_FORB_PROTO_INET_DEFAULT + { + forb_port_t *port = forb_malloc(sizeof(*port)); + if (port) { + struct in_addr listen_on; + + memset(port, 0, sizeof(*port)); + listen_on.s_addr = INADDR_ANY; + ret = forb_inet_port_init(&port->desc, listen_on); + if (ret) goto err_free_inet; + ret = forb_register_port(orb, port); + if (ret) goto err_free_inet; /* TODO: forb_inet_port_done() */ + goto inet_ok; + } + err_free_inet: + free(port); + goto err2; + inet_ok:; + } #endif return orb; diff --git a/src/proto_inet.c b/src/proto_inet.c index 35235c4..65aa6af 100644 --- a/src/proto_inet.c +++ b/src/proto_inet.c @@ -73,12 +73,14 @@ extern UL_LOG_CUST(ulogd_forb_proto_inet); -#define PORT 15514 +#define MCAST_PORT 15514 /**< Port used for multicasts */ #define MCAST_ADDR "225.15.5.14" -/** Address used by inet protocol. */ +/** Address used by inet protocol. All values are stored in network + * byte order. */ struct inet_addr { struct in_addr addr; + uint16_t port; /**< TCP listening port */ }; /** INET protocol data for ports. */ @@ -103,22 +105,32 @@ static CORBA_boolean inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr) { const struct inet_addr *a = addr; - CORBA_long haddr = ntohl(a->addr.s_addr); - return CORBA_long_serialize(codec, &haddr); + CORBA_unsigned_long haddr = ntohl(a->addr.s_addr); + CORBA_unsigned_short hport = ntohs(a->port); + CORBA_boolean ret; + ret = CORBA_unsigned_long_serialize(codec, &haddr); + if (!ret) + return ret; + return CORBA_unsigned_short_serialize(codec, &hport); } static CORBA_boolean inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr) { struct inet_addr *a; - CORBA_long s_addr; + CORBA_unsigned_long s_addr; + CORBA_unsigned_short port; CORBA_boolean ret; a = forb_malloc(sizeof(*a)); if (!a) return CORBA_FALSE; - ret = CORBA_long_deserialize(codec, &s_addr); + ret = CORBA_unsigned_long_deserialize(codec, &s_addr); + if (!ret) + return ret; + ret = CORBA_unsigned_short_deserialize(codec, &port); a->addr.s_addr = htonl(s_addr); + a->port = htons(port); *addr = a; return ret; } @@ -140,8 +152,9 @@ inet_connect(forb_peer_t *peer) if (!ipeer->socket) goto err_free; sa.sin_family = AF_INET; - sa.sin_port = htons(PORT); + sa.sin_port = addr->port; sa.sin_addr = addr->addr; + ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port)); ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa)); if (ret) goto err_close; @@ -202,7 +215,7 @@ int inet_accept_connection(struct inet_port *p) { int client; struct sockaddr_in addr; - socklen_t addrlen; + socklen_t addrlen = sizeof(addr); struct epoll_event ev; int ret; @@ -216,6 +229,7 @@ int inet_accept_connection(struct inet_port *p) close(client); return -1; } + memset(&ev, 0, sizeof(ev)); ev.events = EPOLLIN | EPOLLET; ev.data.fd = client; return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev); @@ -248,6 +262,12 @@ inet_recv(forb_port_t *port, void *buf, size_t len) iport->last_recv_fd = -1; continue; } + if (ret == 0) { + close(iport->last_recv_fd); + /* TODO: Notify FORB about peer disconnect */ + iport->last_recv_fd = -1; + continue; + } return ret; } #else @@ -274,7 +294,7 @@ inet_broadcast(forb_port_t *port, const void *buf, size_t len) int ret; addr.sin_family = AF_INET; - addr.sin_port = htons(PORT); + addr.sin_port = htons(MCAST_PORT); addr.sin_addr = p->multicast_addr; ret = sendto(p->udp_socket, buf, len, 0, @@ -286,9 +306,11 @@ static void inet_peer_destroy(forb_peer_t *peer) { struct inet_peer *ipeer = peer->proto_priv; - peer->proto_priv = NULL; - close(ipeer->socket); - free(ipeer); + if (ipeer) { + peer->proto_priv = NULL; + close(ipeer->socket); + free(ipeer); + } } static const forb_proto_t proto_inet = { @@ -303,7 +325,7 @@ static const forb_proto_t proto_inet = { }; #define MAX_INTERFACES 10 -int get_local_address(int sock, struct inet_addr *addr) +int get_local_address(int sock, struct in_addr *addr) { struct ifconf ifc; struct ifreq *ifr, req[MAX_INTERFACES]; @@ -319,7 +341,7 @@ int get_local_address(int sock, struct inet_addr *addr) memcpy(&ia, &ifr->ifr_addr, sizeof(ia)); ioctl(sock, SIOCGIFFLAGS, ifr); if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) { - addr->addr = ia.sin_addr; + *addr = ia.sin_addr; return 0; } } @@ -336,7 +358,6 @@ int get_local_address(int sock, struct inet_addr *addr) int forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on) { - int ret; struct inet_port *port_priv; struct sockaddr_in addr; @@ -350,49 +371,63 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on) memset(port_priv, 0, sizeof(*port_priv)); port_priv->last_recv_fd = -1; + /* Initialize UDP multicast socket */ port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0); if (port_priv->udp_socket == -1) goto err_free; - port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0); - if (port_priv->listen_socket == -1) goto err_close_udp; - + int yes = 1; ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes)); if (ret) - goto err_close_listen; + goto err_close_udp; - addr.sin_family = AF_INET; - addr.sin_port = htons(PORT); - addr.sin_addr = listen_on; - - ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr)); - if (ret != 0) goto err_close_listen; - if (setnonblocking(port_priv->listen_socket)) - goto err_close_listen; + int reuse = 1; + setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse)); + if (ret) + goto err_close_udp; - ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr)); - if (ret != 0) goto err_close_listen; setnonblocking(port_priv->udp_socket); - inet_aton(MCAST_ADDR, &port_priv->multicast_addr); + struct ip_mreq mreq; + inet_aton(MCAST_ADDR, &port_priv->multicast_addr); mreq.imr_multiaddr = port_priv->multicast_addr; mreq.imr_interface.s_addr = INADDR_ANY; ret = setsockopt(port_priv->udp_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); if (ret) - goto err_close_listen; + goto err_close_udp; + + addr.sin_family = AF_INET; + addr.sin_port = htons(MCAST_PORT); + addr.sin_addr = port_priv->multicast_addr; + + ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr)); + if (ret != 0) goto err_close_udp; char loop = 1; unsigned loop_size = sizeof(loop); ret = setsockopt(port_priv->udp_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, loop_size); if (ret) - goto err_close_listen; + goto err_close_udp; + /* Initialize TCP socket */ + port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0); + if (port_priv->listen_socket == -1) goto err_close_udp; + + addr.sin_family = AF_INET; + addr.sin_port = htons(0); /* Random port */ + addr.sin_addr = listen_on; + ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr)); + if (ret != 0) goto err_close_listen; + if (setnonblocking(port_priv->listen_socket)) + goto err_close_listen; + ret = listen(port_priv->listen_socket, 10); if (ret) goto err_close_listen; + /* Determine our address and port*/ len = sizeof(addr); ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len); if (ret) { @@ -400,6 +435,13 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on) goto err_close_listen; } + port_priv->addr.port = addr.sin_port; + if (listen_on.s_addr == INADDR_ANY) + get_local_address(port_priv->listen_socket, &port_priv->addr.addr); + else + port_priv->addr.addr = listen_on; + + /* Initialize epoll descriptor */ port_priv->epoll_fd = epoll_create(10); if (port_priv->epoll_fd == -1) goto err_close_listen; @@ -419,12 +461,6 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on) if (ret) goto err_close_epoll; - if (listen_on.s_addr == INADDR_ANY) - get_local_address(port_priv->listen_socket, &port_priv->addr); - else - port_priv->addr.addr = listen_on; - - port_desc->proto = &proto_inet; port_desc->proto_priv = port_priv; port_desc->addr = &port_priv->addr; diff --git a/src/tests/Makefile.omk b/src/tests/Makefile.omk index 4530329..e079c14 100644 --- a/src/tests/Makefile.omk +++ b/src/tests/Makefile.omk @@ -1,7 +1,11 @@ SUBDIRS = $(ALL_OMK_SUBDIRS) -test_PROGRAMS = hello_inproc hello_remote test_proto_unix \ - test_proto_inet discovery syncobj regobjref +test_PROGRAMS += hello_inproc hello_remote test_proto_inet discovery \ + syncobj regobjref +ifeq ($(CONFIG_FORB_PROTO_UNIX),y) +test_PROGRAMS += test_proto_unix +endif + CFLAGS += -DTEST lib_LOADLIBES = forb ulut fosa rt diff --git a/src/tests/discovery.c b/src/tests/discovery.c index 0d388fe..ddb7996 100644 --- a/src/tests/discovery.c +++ b/src/tests/discovery.c @@ -64,11 +64,12 @@ #include "../proto.h" #include #include +#include #define NUM_ORBS 5 -#ifndef CONFIG_FORB_PROTO_UNIX -#error This test should only work with UNIX protocol enabled. +#if !CONFIG_FORB_PROTO_UNIX && !CONFIG_FORB_PROTO_INET_DEFAULT +#error This test should only work if there is some local protocol enabled. #endif int main(int argc, char *argv[]) @@ -111,5 +112,6 @@ int main(int argc, char *argv[]) } } + printf("OK\n"); return 0; } diff --git a/src/tests/test_proto_inet.c b/src/tests/test_proto_inet.c index d444193..4618d5c 100644 --- a/src/tests/test_proto_inet.c +++ b/src/tests/test_proto_inet.c @@ -89,7 +89,7 @@ int main() char tmsg[100]; memset(&peer, 0, sizeof(peer)); - peer.addr = &addr[i]; + peer.addr = port[i].desc.addr; peer.port = &port[0]; len = strlen(msg)+1; @@ -110,10 +110,6 @@ int main() } - - /* Multicasts are not supported on loopback inteface in Linux, - * so we skip the broadcast test. */ -#if 0 len = strlen(msg2)+1; ret = inet_broadcast(&port[0], msg2, len); if (ret != len) @@ -136,7 +132,6 @@ int main() error(1, errno, "port %d: broadcast received wrong data", i); } -#endif for (i=0; i