]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/proto_inet.c
forb: Fix incorrect parameter of forb_peer_put()
[frescor/forb.git] / src / proto_inet.c
index 1c356794219f2f04c5b4ac5815035731f1757cbb..e1771d11610c1b290acda68888a1842a1ce080ea 100644 (file)
@@ -60,6 +60,9 @@
 #include <unistd.h>
 #include <forb/config.h>
 #include "discovery.h"
+#include <stdlib.h>
+#include <netinet/tcp.h>
+#include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
 
 /**
  * @file   proto_inet.c
@@ -145,7 +148,25 @@ inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
        return ret;
 }
 
-static struct inet_peer *
+static int
+setnonblocking(int fd);
+
+static int
+setnodelay(int fd)
+{
+       int ret = 0;
+#if 0                          /* For nice graphs in benchmarks */
+       int yes = 1;
+       ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+       if (ret < 0) {
+               ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
+       }
+#endif
+       return ret;
+       
+}
+
+static int
 inet_connect(forb_peer_t *peer)
 {
        struct inet_peer *ipeer;
@@ -155,7 +176,7 @@ inet_connect(forb_peer_t *peer)
 
        if (!addr) {
                ul_logerr("No address to connect\n");
-               return NULL;
+               goto err;
        }
        ipeer = forb_malloc(sizeof(*ipeer));
        if (!ipeer)
@@ -168,13 +189,16 @@ inet_connect(forb_peer_t *peer)
        sa.sin_family = AF_INET;
        sa.sin_port = addr->port;
        sa.sin_addr = addr->addr;
-       ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
+       ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
        ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
        if (ret) {
                ul_logerr("connect error: %s\n", strerror(errno));
                goto err_close;
        }
 
+       setnonblocking(ipeer->socket);
+       setnodelay(ipeer->socket);
+
        struct epoll_event ev;
        struct inet_port *p = peer->port->desc.proto_priv;
        memset(&ev, 0, sizeof(ev));
@@ -186,34 +210,41 @@ inet_connect(forb_peer_t *peer)
                goto err_close;
        }
 
+       peer->proto_priv = ipeer;
 
-       return ipeer;
+#ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
+       ret = forb_iop_send_hello_to(peer);
+       if (ret) {
+               goto err_close;
+       }
+#endif
+
+       return 0;
 err_close:
        close(ipeer->socket);
 err_free:
        forb_free(ipeer);
 err:
-       return NULL;
+       return -1;
        
 }
 
-static size_t
+static ssize_t
 inet_send(forb_peer_t *peer, const void *buf, size_t len)
 {
        struct inet_peer *ipeer = peer->proto_priv;
-       size_t ret, sent;
+       ssize_t ret, sent;
 
        if (!ipeer) {
-               ipeer = inet_connect(peer);
-               if (!ipeer) {
-                       return -1;
+               ret = inet_connect(peer);
+               if (ret) {
+                       return ret;
                }
-               peer->proto_priv = ipeer;
-               
+               ipeer = peer->proto_priv;
        }
 
        sent = 0;
-       ul_logdeb("send fd=%d\n", ipeer->socket);
+       ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
        do {
                ret = send(ipeer->socket, buf, len, 0);
                if (ret < 0) {
@@ -275,6 +306,7 @@ inet_accept_connection(forb_port_t *port)
                close(client);
                return -1;
        }
+       setnodelay(client);
 
        peer = forb_peer_new();
        if (peer) {
@@ -287,7 +319,6 @@ inet_accept_connection(forb_port_t *port)
                        peer->port = port;
                        peer->state = FORB_PEER_DISCOVERED;
                        inet_port_new_peer_insert(p, peer);
-                       //printf("New connection d=%d\n", client);
                } else {
                        forb_peer_put(peer);
                }
@@ -299,26 +330,30 @@ inet_accept_connection(forb_port_t *port)
        return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
 }
 
-static size_t
+static ssize_t
 inet_recv(forb_port_t *port, void *buf, size_t len)
 {
        struct inet_port *iport = port->desc.proto_priv;
 #if 1
        struct epoll_event ev;
-       int ret, nfds;
+       ssize_t ret;
+       int nfds;
        forb_peer_t *peer;
        bool exported_new_peer = false;
        
        for (;;) {
                if (iport->last_recv_fd == -1) {
                        nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
+                       if (nfds == -1 && errno == EINTR)
+                               continue;
                        if (nfds < 1)
                                return -1;
                        if (ev.data.fd == iport->listen_socket) {
                                ret = inet_accept_connection(port);
-                               if (ret)
+                               if (ret) {
+                                       ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
                                        return -1;
-                               else
+                               else
                                        continue;
                        } else {
                                iport->last_recv_fd = ev.data.fd;
@@ -331,7 +366,8 @@ inet_recv(forb_port_t *port, void *buf, size_t len)
                        if (ipeer->socket == iport->last_recv_fd) {
                                inet_port_new_peer_delete(iport, peer);
 
-                               if (port->new_peer) forb_peer_put(peer);
+                               if (port->new_peer)
+                                       forb_peer_put(port->new_peer);
 
                                /* Let the upper layer assign forb ID
                                 * to this peer according to the request*/
@@ -348,21 +384,29 @@ inet_recv(forb_port_t *port, void *buf, size_t len)
                                forb_peer_put(peer);
                                port->new_peer = NULL;
                        }
-                       if (errno == EAGAIN) {
-                               iport->last_recv_fd = -1;
-                               continue;
+                       if (errno != EAGAIN) {
+                               ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
                        }
+                       iport->last_recv_fd = -1;
+                       continue;
                }
                if (ret == 0) {
                        if (exported_new_peer) {
                                forb_peer_put(peer);
                                port->new_peer = NULL;
                        }
-                       close(iport->last_recv_fd);
-                       /* TODO: Notify FORB about peer disconnect */
+                       ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
+                       ul_list_for_each(forb_port_peer, port, peer) {
+                               struct inet_peer *ipeer = peer->proto_priv;
+                               if (ipeer && ipeer->socket == iport->last_recv_fd) {
+                                       forb_peer_disconnected(peer);
+                                       break;
+                               }
+                       }
                        iport->last_recv_fd = -1;
                        continue;
                }
+               ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
                return ret;
        }
 #else
@@ -374,19 +418,24 @@ static int
 inet_port_destroy(forb_port_t * port)
 {
        struct inet_port *pd = port->desc.proto_priv;
+       forb_peer_t *peer;
        close(pd->epoll_fd);
        close(pd->udp_socket);
        close(pd->listen_socket);
+       ul_list_for_each_cut(inet_port_new_peer, pd, peer) {
+               forb_peer_put(peer);
+       }
        forb_free(pd);
        return 0;
 }
 
-static size_t
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
+static ssize_t
 inet_broadcast(forb_port_t *port, const void *buf, size_t len)
 {
        struct inet_port *p = port->desc.proto_priv;
        struct sockaddr_in addr;
-       int ret;
+       ssize_t ret;
        
        addr.sin_family = AF_INET;
        addr.sin_port = htons(MCAST_PORT);
@@ -396,6 +445,7 @@ inet_broadcast(forb_port_t *port, const void *buf, size_t len)
                     (struct sockaddr*)&addr, sizeof(addr));
        return ret;
 }
+#endif
 
 static void
 inet_peer_destroy(forb_peer_t *peer)
@@ -403,6 +453,8 @@ inet_peer_destroy(forb_peer_t *peer)
        struct inet_peer *ipeer = peer->proto_priv;
        if (ipeer) {
                peer->proto_priv = NULL;
+               ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
+                           ipeer->socket, peer->orb_id);
                close(ipeer->socket);
                free(ipeer);
        }
@@ -422,7 +474,6 @@ size_t inet_addr2str(char *dest, size_t maxlen, const void *addr)
 
 #include <fcb.h>
 #include <fcb_contact_info.h>
-#include <stdlib.h>
 
 static void inet_register_cb(forb_port_t *port)
 {
@@ -447,7 +498,9 @@ static const forb_proto_t proto_inet = {
        .peer_destroy = inet_peer_destroy,
        .send = inet_send,
        .recv = inet_recv,
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
        .broadcast = inet_broadcast,
+#endif
        .serialize_addr = inet_serialize_addr,
        .deserialize_addr = inet_deserialize_addr,
        .addr2str = inet_addr2str,
@@ -459,7 +512,19 @@ int get_local_address(int sock, struct in_addr *addr)
 {
        struct ifconf  ifc;
        struct ifreq   *ifr, req[MAX_INTERFACES];
+       char *env;
+       bool loopback = false;
+       int ret;
+       struct in_addr env_addr;
        
+       env = getenv("FORB_EXTERNAL_IP");
+       if (env) {
+               if (inet_aton(env, &env_addr) == 0) {
+                       ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
+                       errno = EINVAL;
+                       return -1;
+               }
+       }
 
        ifc.ifc_len = sizeof(req);
        ifc.ifc_req = req;
@@ -469,13 +534,32 @@ int get_local_address(int sock, struct in_addr *addr)
        for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
                struct sockaddr_in ia;
                memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
-               ioctl(sock, SIOCGIFFLAGS, ifr);
-               if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
-                       *addr = ia.sin_addr;
-                       return 0;
+               ret = ioctl(sock, SIOCGIFFLAGS, ifr);
+               if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
+                       if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
+                               *addr = env_addr;
+                               return 0;
+                       }
+                       if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
+                               if (!env) {
+                                       *addr = ia.sin_addr;
+                                       return 0;
+                               }
+                       } else {
+                               *addr = ia.sin_addr;
+                               loopback = true;
+                       }
                }
        }
-       return -1;
+       if (env) {
+               ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
+               errno = ENODEV;
+               return -1;
+       }
+       if (loopback)
+               return 0;
+       else
+               return -1;
 }
 
 /** 
@@ -519,6 +603,7 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
 
        setnonblocking(port_priv->udp_socket);
 
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
        struct ip_mreq mreq;
        inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
        mreq.imr_multiaddr = port_priv->multicast_addr;
@@ -527,6 +612,7 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
                         IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
        if (ret)
                goto err_close_udp;
+#endif
 
        addr.sin_family = AF_INET;
        addr.sin_port = htons(MCAST_PORT);
@@ -547,6 +633,11 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
        port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
        if (port_priv->listen_socket == -1) goto err_close_udp;
 
+       reuse = 1;
+       setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
+       if (ret)
+               goto err_close_listen;
+
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr = listen_on;
@@ -568,9 +659,11 @@ forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
        }
 
        port_priv->addr.port = addr.sin_port;
-       if (listen_on.s_addr == INADDR_ANY)
-               get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
-       else
+       if (listen_on.s_addr == INADDR_ANY) {
+               if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
+                       goto err_close_listen;
+               }
+       } else
                port_priv->addr.addr = listen_on;
 
        /* Initialize epoll descriptor */