]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/proto_inet.c
Inet protocol sends HELLO as the first message after connect()
[frescor/forb.git] / src / proto_inet.c
index fae7ec2f5c1f628ea28b17dc0261676cb4749c0b..5cef70dc70f203a0c27d0a8625f9da52f72d0452 100644 (file)
@@ -58,6 +58,9 @@
 #include <sys/types.h>
 #include <ul_log.h>
 #include <unistd.h>
+#include <forb/config.h>
+#include "discovery.h"
+#include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
 
 /**
  * @file   proto_inet.c
 
 extern UL_LOG_CUST(ulogd_forb_proto_inet);
 
-#define PORT 15514
+#define MCAST_PORT 15514       /**< Port used for multicasts */
 #define MCAST_ADDR "225.15.5.14"
 
-/** Address used by inet protocol. */
+/** Address used by inet protocol. All values are stored in network
+ * byte order. */
 struct inet_addr {
        struct in_addr addr;
+       uint16_t port;          /**< TCP listening port */
 };
 
 /** INET protocol data for ports. */
@@ -89,8 +94,16 @@ struct inet_port {
        struct inet_addr addr;  /**< Address of this port */
        int last_recv_fd;       /**< Used in inet_recv() to read data longer than receiving buffer */
        struct in_addr multicast_addr;
+       ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
 };
 
+UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
+                struct inet_port,   /* cust_head_t */
+                forb_peer_t,        /* cust_item_t */
+                new_peers,          /* cust_head_field */
+                lnode)              /* cust_node_field */
+
+
 /** INET protocol data associated with every peer */
 struct inet_peer {
        int socket;             /**< Connected socket to the peer */
@@ -103,66 +116,124 @@ static CORBA_boolean
 inet_serialize_addr(FORB_CDR_Codec *codec, const void *addr)
 {
        const struct inet_addr *a = addr;
-       return CORBA_long_serialize(codec, &a->addr.s_addr);
+       CORBA_unsigned_long haddr = ntohl(a->addr.s_addr);
+       CORBA_unsigned_short hport = ntohs(a->port);
+       CORBA_boolean ret;
+       ret = CORBA_unsigned_long_serialize(codec, &haddr);
+       if (!ret)
+               return ret;
+       return CORBA_unsigned_short_serialize(codec, &hport);
 }
 
 static CORBA_boolean
 inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
 {
-       struct inet_addr *a = *addr;
-       CORBA_long s_addr;
+       struct inet_addr *a;
+       CORBA_unsigned_long s_addr;
+       CORBA_unsigned_short port;
        CORBA_boolean ret;
 
-       ret = CORBA_long_deserialize(codec, &s_addr);
-       a->addr.s_addr = s_addr;
+       a = forb_malloc(sizeof(*a));
+       if (!a)
+               return CORBA_FALSE;
+       ret = CORBA_unsigned_long_deserialize(codec, &s_addr);
+       if (!ret)
+               return ret;
+       ret = CORBA_unsigned_short_deserialize(codec, &port);
+       a->addr.s_addr = htonl(s_addr);
+       a->port = htons(port);
+       *addr = a;
        return ret;
 }
 
-static struct inet_peer *
+static int
 inet_connect(forb_peer_t *peer)
 {
        struct inet_peer *ipeer;
        struct sockaddr_in sa;
        struct inet_addr *addr = peer->addr;
        int ret;
-       
+
+       if (!addr) {
+               ul_logerr("No address to connect\n");
+               goto err;
+       }
        ipeer = forb_malloc(sizeof(*ipeer));
        if (!ipeer)
                goto err;
        ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
-       if (!ipeer->socket)
+       if (!ipeer->socket) {
+               ul_logerr("socket(): %s\n", strerror(errno));
                goto err_free;
+       }
        sa.sin_family = AF_INET;
-       sa.sin_port = htons(PORT);
+       sa.sin_port = addr->port;
        sa.sin_addr = addr->addr;
+       ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
        ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
-       if (ret)
+       if (ret) {
+               ul_logerr("connect error: %s\n", strerror(errno));
                goto err_close;
+       }
 
-       return ipeer;
+       struct epoll_event ev;
+       struct inet_port *p = peer->port->desc.proto_priv;
+       memset(&ev, 0, sizeof(ev));
+       ev.events = EPOLLIN | EPOLLET;
+       ev.data.fd = ipeer->socket;
+       ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
+       if (ret) {
+               ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
+               goto err_close;
+       }
+
+       peer->proto_priv = ipeer;
+
+#ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
+       ret = forb_iop_send_hello_to(peer);
+       if (ret) {
+               goto err_close;
+       }
+#endif
+
+       return 0;
 err_close:
        close(ipeer->socket);
 err_free:
        forb_free(ipeer);
 err:
-       return NULL;
+       return -1;
        
 }
 
-static size_t
+static ssize_t
 inet_send(forb_peer_t *peer, const void *buf, size_t len)
 {
        struct inet_peer *ipeer = peer->proto_priv;
+       ssize_t ret, sent;
 
        if (!ipeer) {
-               ipeer = inet_connect(peer);
-               if (!ipeer)
-                       return -1;
-               peer->proto_priv = ipeer;
-               
+               ret = inet_connect(peer);
+               if (ret) {
+                       return ret;
+               }
+               ipeer = peer->proto_priv;
        }
-       
-       return send(ipeer->socket, buf, len, 0);
+
+       sent = 0;
+       ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
+       do {
+               ret = send(ipeer->socket, buf, len, 0);
+               if (ret < 0) {
+                       ul_logerr("send error: %s\n", strerror(errno));
+                       return ret;
+               }
+               sent += ret;
+               buf += ret;
+               len -= ret;
+       } while (len > 0);
+
+       return sent;
 }
 
 /*----------------------------------------------------------------------
@@ -191,13 +262,16 @@ int setnonblocking(int fd)
 #endif
 }     
 
-int inet_accept_connection(struct inet_port *p)
+static int
+inet_accept_connection(forb_port_t *port)
 {
+       struct inet_port *p = port->desc.proto_priv;
        int client;
        struct sockaddr_in addr;
-       socklen_t addrlen;
+       socklen_t addrlen = sizeof(addr);
        struct epoll_event ev;
        int ret;
+       forb_peer_t *peer;
                                
        client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
        if (client < 0){
@@ -209,38 +283,105 @@ int inet_accept_connection(struct inet_port *p)
                close(client);
                return -1;
        }
+
+       peer = forb_peer_new();
+       if (peer) {
+               struct inet_peer *ipeer;
+
+               ipeer = forb_malloc(sizeof(*ipeer));
+               if (ipeer) {
+                       ipeer->socket = client;
+                       peer->proto_priv = ipeer;
+                       peer->port = port;
+                       peer->state = FORB_PEER_DISCOVERED;
+                       inet_port_new_peer_insert(p, peer);
+               } else {
+                       forb_peer_put(peer);
+               }
+       }
+       
+       memset(&ev, 0, sizeof(ev));
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = client;
        return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
 }
 
-static size_t
+static ssize_t
 inet_recv(forb_port_t *port, void *buf, size_t len)
 {
        struct inet_port *iport = port->desc.proto_priv;
 #if 1
        struct epoll_event ev;
-       int ret, nfds;
+       ssize_t ret;
+       int nfds;
+       forb_peer_t *peer;
+       bool exported_new_peer = false;
+       
        for (;;) {
                if (iport->last_recv_fd == -1) {
                        nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
+                       if (nfds == -1 && errno == EINTR)
+                               continue;
                        if (nfds < 1)
                                return -1;
                        if (ev.data.fd == iport->listen_socket) {
-                               ret = inet_accept_connection(iport);
-                               if (ret)
+                               ret = inet_accept_connection(port);
+                               if (ret) {
+                                       ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
                                        return -1;
-                               else
+                               else
                                        continue;
                        } else {
                                iport->last_recv_fd = ev.data.fd;
                        }
                }
+               /* Check for first reception form a just connected peer */
+               ul_list_for_each(inet_port_new_peer, iport, peer) {
+                       struct inet_peer *ipeer = peer->proto_priv;
+                       //printf("checking new peer with fd=%d\n", ipeer->socket);
+                       if (ipeer->socket == iport->last_recv_fd) {
+                               inet_port_new_peer_delete(iport, peer);
+
+                               if (port->new_peer) forb_peer_put(peer);
+
+                               /* Let the upper layer assign forb ID
+                                * to this peer according to the request*/
+                               port->new_peer = peer;
+                               exported_new_peer = true;
+                               break;
+                       }
+               }
+
+               //printf("recv fd=%d\n", iport->last_recv_fd);
                ret = recv(iport->last_recv_fd, buf, len, 0);
-               if (ret == -1 && errno == EAGAIN) {
+               if (ret == -1) {
+                       if (exported_new_peer) {
+                               forb_peer_put(peer);
+                               port->new_peer = NULL;
+                       }
+                       if (errno != EAGAIN) {
+                               ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
+                       }
                        iport->last_recv_fd = -1;
                        continue;
                }
+               if (ret == 0) {
+                       if (exported_new_peer) {
+                               forb_peer_put(peer);
+                               port->new_peer = NULL;
+                       }
+                       ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
+                       ul_list_for_each(forb_port_peer, port, peer) {
+                               struct inet_peer *ipeer = peer->proto_priv;
+                               if (ipeer && ipeer->socket == iport->last_recv_fd) {
+                                       forb_peer_disconnected(peer);
+                                       break;
+                               }
+                       }
+                       iport->last_recv_fd = -1;
+                       continue;
+               }
+               ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
                return ret;
        }
 #else
@@ -259,15 +400,15 @@ inet_port_destroy(forb_port_t * port)
        return 0;
 }
 
-static size_t
+static ssize_t
 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
 {
        struct inet_port *p = port->desc.proto_priv;
        struct sockaddr_in addr;
-       int ret;
+       ssize_t ret;
        
        addr.sin_family = AF_INET;
-       addr.sin_port = htons(PORT);
+       addr.sin_port = htons(MCAST_PORT);
        addr.sin_addr = p->multicast_addr;
        
        ret = sendto(p->udp_socket, buf, len, 0,
@@ -279,11 +420,47 @@ static void
 inet_peer_destroy(forb_peer_t *peer)
 {
        struct inet_peer *ipeer = peer->proto_priv;
-       peer->proto_priv = NULL;
-       close(ipeer->socket);
-       free(ipeer);
+       if (ipeer) {
+               peer->proto_priv = NULL;
+               ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
+               close(ipeer->socket);
+               free(ipeer);
+       }
+}
+
+size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
+{
+       const struct inet_addr *a = addr;
+       size_t ret = 0;
+       if (addr) {
+               snprintf(dest, maxlen, "%s:%d", inet_ntoa(a->addr), ntohs(a->port));
+       }
+       return ret;
 }
 
+#if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
+
+#include <fcb.h>
+#include <fcb_contact_info.h>
+#include <stdlib.h>
+
+static void inet_register_cb(forb_port_t *port)
+{
+       struct inet_addr *ia;
+       
+       ia = malloc(sizeof(*ia));
+       if (!ia) return;
+
+       char *fcb_addr = getenv("FCB_ADDR");
+       if (!fcb_addr) fcb_addr = "127.0.0.1";
+       ia->addr.s_addr = inet_addr(fcb_addr);
+       ia->port = htons(FCB_TCP_PORT);
+       forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
+}
+#else
+#define inet_register_cb NULL
+#endif
+
 static const forb_proto_t proto_inet = {
        .hello_interval = 40 /* seconds */,
        .port_destroy = inet_port_destroy,
@@ -293,10 +470,12 @@ static const forb_proto_t proto_inet = {
        .broadcast = inet_broadcast,
        .serialize_addr = inet_serialize_addr,
        .deserialize_addr = inet_deserialize_addr,
+       .addr2str = inet_addr2str,
+       .register_cb = inet_register_cb,
 };
 
 #define MAX_INTERFACES 10
-int get_local_address(int sock, struct inet_addr *addr)
+int get_local_address(int sock, struct in_addr *addr)
 {
        struct ifconf  ifc;
        struct ifreq   *ifr, req[MAX_INTERFACES];
@@ -312,7 +491,7 @@ int get_local_address(int sock, struct inet_addr *addr)
                memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
                ioctl(sock, SIOCGIFFLAGS, ifr);
                if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
-                       addr->addr = ia.sin_addr;
+                       *addr = ia.sin_addr;
                        return 0;
                }
        }
@@ -327,9 +506,9 @@ int get_local_address(int sock, struct inet_addr *addr)
  * appropriately.
  */
 int
-forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
+forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
+                   uint16_t port)
 {
-       
        int ret;       
        struct inet_port *port_priv;
        struct sockaddr_in addr;
@@ -342,50 +521,70 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
 
        memset(port_priv, 0, sizeof(*port_priv));
        port_priv->last_recv_fd = -1;
+       inet_port_new_peer_init_head(port_priv);
        
+       /* Initialize UDP multicast socket */
        port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
        if (port_priv->udp_socket == -1) goto err_free;
-       port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
-       if (port_priv->listen_socket == -1) goto err_close_udp;
-
+       
        int yes = 1;
        ret = setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes));
        if (ret)
-               goto err_close_listen;
+               goto err_close_udp;
 
-       addr.sin_family = AF_INET;
-       addr.sin_port = htons(PORT);
-       addr.sin_addr = listen_on;
-       
-       ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
-       if (ret != 0) goto err_close_listen;
-       if (setnonblocking(port_priv->listen_socket))
-               goto err_close_listen;
+       int reuse = 1;
+       setsockopt(port_priv->udp_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
+       if (ret)
+               goto err_close_udp;
 
-       ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
-       if (ret != 0) goto err_close_listen;
        setnonblocking(port_priv->udp_socket);
-       inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
+
        struct ip_mreq mreq;
+       inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
        mreq.imr_multiaddr = port_priv->multicast_addr;
        mreq.imr_interface.s_addr = INADDR_ANY;
        ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
                         IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
        if (ret)
-               goto err_close_listen;
+               goto err_close_udp;
+
+       addr.sin_family = AF_INET;
+       addr.sin_port = htons(MCAST_PORT);
+       addr.sin_addr = port_priv->multicast_addr;
+
+       ret = bind(port_priv->udp_socket, (struct sockaddr *)&addr, sizeof(addr));
+       if (ret != 0) goto err_close_udp;
 
        char loop = 1;
        unsigned loop_size = sizeof(loop);
        ret = setsockopt(port_priv->udp_socket, IPPROTO_IP,
                         IP_MULTICAST_LOOP, &loop, loop_size);
        if (ret)
-               goto err_close_listen;
+               goto err_close_udp;
        
 
+       /* Initialize TCP socket */
+       port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
+       if (port_priv->listen_socket == -1) goto err_close_udp;
+
+       reuse = 1;
+       setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
+       if (ret)
+               goto err_close_listen;
+
+       addr.sin_family = AF_INET;
+       addr.sin_port = htons(port);
+       addr.sin_addr = listen_on;
+       ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
+       if (ret != 0) goto err_close_listen;
+       if (setnonblocking(port_priv->listen_socket))
+               goto err_close_listen;
+
        ret = listen(port_priv->listen_socket, 10);
        if (ret)
                goto err_close_listen;
 
+       /* Determine our address and port*/
        len = sizeof(addr);
        ret = getsockname(port_priv->listen_socket, (struct sockaddr *)&addr, &len);
        if (ret) {
@@ -393,29 +592,33 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
                goto err_close_listen;
        }
 
+       port_priv->addr.port = addr.sin_port;
+       if (listen_on.s_addr == INADDR_ANY)
+               get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
+       else
+               port_priv->addr.addr = listen_on;
+
+       /* Initialize epoll descriptor */
        port_priv->epoll_fd = epoll_create(10);
        if (port_priv->epoll_fd == -1)
                goto err_close_listen;
 
+       memset(&ev, 0, sizeof(ev));
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = port_priv->listen_socket;
        ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
                        port_priv->listen_socket, &ev);
        if (ret)
                goto err_close_epoll;
-       
+
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT 
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = port_priv->udp_socket;
        ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
                        port_priv->udp_socket, &ev);
        if (ret)
                goto err_close_epoll;
-
-       if (listen_on.s_addr == INADDR_ANY)
-               get_local_address(port_priv->listen_socket, &port_priv->addr);
-       else
-               port_priv->addr.addr = listen_on;
-               
+#endif
 
        port_desc->proto = &proto_inet;
        port_desc->proto_priv = port_priv;