]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/proto_inet.c
Inet protocol sends HELLO as the first message after connect()
[frescor/forb.git] / src / proto_inet.c
index 771e371c3c172c0147890350e117023765e8a27d..5cef70dc70f203a0c27d0a8625f9da52f72d0452 100644 (file)
@@ -60,6 +60,7 @@
 #include <unistd.h>
 #include <forb/config.h>
 #include "discovery.h"
+#include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
 
 /**
  * @file   proto_inet.c
@@ -145,7 +146,7 @@ inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
        return ret;
 }
 
-static struct inet_peer *
+static int
 inet_connect(forb_peer_t *peer)
 {
        struct inet_peer *ipeer;
@@ -155,7 +156,7 @@ inet_connect(forb_peer_t *peer)
 
        if (!addr) {
                ul_logerr("No address to connect\n");
-               return NULL;
+               goto err;
        }
        ipeer = forb_malloc(sizeof(*ipeer));
        if (!ipeer)
@@ -186,14 +187,22 @@ inet_connect(forb_peer_t *peer)
                goto err_close;
        }
 
+       peer->proto_priv = ipeer;
+
+#ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
+       ret = forb_iop_send_hello_to(peer);
+       if (ret) {
+               goto err_close;
+       }
+#endif
 
-       return ipeer;
+       return 0;
 err_close:
        close(ipeer->socket);
 err_free:
        forb_free(ipeer);
 err:
-       return NULL;
+       return -1;
        
 }
 
@@ -204,16 +213,15 @@ inet_send(forb_peer_t *peer, const void *buf, size_t len)
        ssize_t ret, sent;
 
        if (!ipeer) {
-               ipeer = inet_connect(peer);
-               if (!ipeer) {
-                       return -1;
+               ret = inet_connect(peer);
+               if (ret) {
+                       return ret;
                }
-               peer->proto_priv = ipeer;
-               
+               ipeer = peer->proto_priv;
        }
 
        sent = 0;
-       ul_logdeb("send fd=%d\n", ipeer->socket);
+       ul_logdeb("send fd=%d len=%d\n", ipeer->socket, len);
        do {
                ret = send(ipeer->socket, buf, len, 0);
                if (ret < 0) {
@@ -287,7 +295,6 @@ inet_accept_connection(forb_port_t *port)
                        peer->port = port;
                        peer->state = FORB_PEER_DISCOVERED;
                        inet_port_new_peer_insert(p, peer);
-                       //printf("New connection d=%d\n", client);
                } else {
                        forb_peer_put(peer);
                }
@@ -313,13 +320,16 @@ inet_recv(forb_port_t *port, void *buf, size_t len)
        for (;;) {
                if (iport->last_recv_fd == -1) {
                        nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
+                       if (nfds == -1 && errno == EINTR)
+                               continue;
                        if (nfds < 1)
                                return -1;
                        if (ev.data.fd == iport->listen_socket) {
                                ret = inet_accept_connection(port);
-                               if (ret)
+                               if (ret) {
+                                       ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
                                        return -1;
-                               else
+                               else
                                        continue;
                        } else {
                                iport->last_recv_fd = ev.data.fd;
@@ -349,21 +359,29 @@ inet_recv(forb_port_t *port, void *buf, size_t len)
                                forb_peer_put(peer);
                                port->new_peer = NULL;
                        }
-                       if (errno == EAGAIN) {
-                               iport->last_recv_fd = -1;
-                               continue;
+                       if (errno != EAGAIN) {
+                               ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
                        }
+                       iport->last_recv_fd = -1;
+                       continue;
                }
                if (ret == 0) {
                        if (exported_new_peer) {
                                forb_peer_put(peer);
                                port->new_peer = NULL;
                        }
-                       close(iport->last_recv_fd);
-                       /* TODO: Notify FORB about peer disconnect */
+                       ul_logdeb("recv fd=%d disconnect\n", iport->last_recv_fd);
+                       ul_list_for_each(forb_port_peer, port, peer) {
+                               struct inet_peer *ipeer = peer->proto_priv;
+                               if (ipeer && ipeer->socket == iport->last_recv_fd) {
+                                       forb_peer_disconnected(peer);
+                                       break;
+                               }
+                       }
                        iport->last_recv_fd = -1;
                        continue;
                }
+               ul_logdeb("recv fd=%d len=%d\n", iport->last_recv_fd, ret);
                return ret;
        }
 #else
@@ -404,6 +422,7 @@ inet_peer_destroy(forb_peer_t *peer)
        struct inet_peer *ipeer = peer->proto_priv;
        if (ipeer) {
                peer->proto_priv = NULL;
+               ul_logdeb("destroying peer fd=%d\n", ipeer->socket);
                close(ipeer->socket);
                free(ipeer);
        }