]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Added redistribution of HELLO packets
authorMichal Sojka <sojkam1@fel.cvut.cz>
Thu, 28 May 2009 03:29:59 +0000 (05:29 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Thu, 28 May 2009 03:29:59 +0000 (05:29 +0200)
In FRSH_FORB, contract broker acts as central node and redistributes
HELLO packets to all connected applications. This way every application
can connect to every other.

src/discovery.c
src/forb.h
src/iop.c
src/iop.h
src/proto_inet.c

index cc859906278e3215b93f83bbb5a684ca312de45c..a0391a9a39e76021315139a6c38dac2b00232f02 100644 (file)
@@ -56,6 +56,8 @@
 #include "discovery.h"
 #include <ul_log.h>
 #include "object.h"
+#include <forb/config.h>
+#include "iop.h"
 
 extern UL_LOG_CUST(ulogd_forb_discovery);
 
@@ -231,6 +233,18 @@ void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer,
                          forb_server_id_to_string(str, &peer->server_id, sizeof(str)),
                          orb_id);
        }
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+       if (forb->attr.redistribute_hellos) {
+               forb_peer_t *p;
+               ul_list_for_each(forb_port_peer, port, p) {
+                       if (p != peer &&
+                           forb_server_id_cmp(&p->server_id, &forb->server_id) != 0) {
+                               forb_iop_redistribute_hello_to(p, peer); /* Introduce new peer to others */
+                               forb_iop_redistribute_hello_to(peer, p); /* Introduce other peers to the new one */
+                       }
+               }
+       }
+#endif
        if (forb->attr.peer_discovery_callback) {
                forb_orb peer_orb = forb_object_new(forb->orb, &peer->server_id, 0);
                forb->attr.peer_discovery_callback(peer_orb, orb_id);
index 60d6842c998fbd5a57438c89a6c817ff196a7328..187d0d33e4131c6212bb211f28b76bbc02f2a512 100644 (file)
@@ -160,6 +160,7 @@ typedef struct forb_init_attr {
        void (*peer_dead_callback)(const forb_orb peer_orb, const char *orb_id);
        uint16_t fixed_tcp_port; /**< If we want FORB's inet protocol to listen on a fixed port... */
        forb_server_id fixed_server_id; /**< If we do not want the ID to be random. (HACK) */
+       bool redistribute_hellos;
 
 } forb_init_attr_t;
 
index c0434639f79881f33cd7aa4b35a607ad1434da5e..94c351d6ddeaefb17f8398c7c83a4298b346f46a 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -160,6 +160,35 @@ free:
        return ret;
 }
 
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
+{
+       forb_port_t *port = peer->port;
+       FORB_CDR_Codec codec;
+       int ret;
+       FORB_CDR_codec_init_static(&codec, port->forb->orb);
+       if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+               return -1;              
+       if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+               ret = -1;
+               goto free;
+       }
+       if (!forb_iop_prepare_hello(&codec, &peer->server_id,
+                                   peer->addr,
+                                   peer->port->desc.proto->serialize_addr,
+                                   peer->orb_id)) {
+               ret = -1;
+               goto free;
+       }
+       ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
+                 ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
+       ret = forb_proto_send(dest, &codec);
+       if (ret > 0) ret = 0;
+free:
+       FORB_CDR_codec_release_buffer(&codec);
+       return ret;
+}
+
 bool
 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
 {
@@ -480,15 +509,27 @@ process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
                peer = forb_peer_find(forb, &server_id);
                if (peer && peer->state == FORB_PEER_DISCOVERED) {
                        /* TODO: Update last hello receive time */
-                       if (addr)
-                               forb_free(addr);
-                       if (peer_orb_id)
-                               forb_free(peer_orb_id);
-                       forb_peer_put(peer);
+                       if (port->new_peer) {
+                               ul_logdeb("peer already discovered but not connected - replacing\n");
+                               forb_peer_disconnected(peer);
+                               forb_peer_put(peer);
+                               peer = port->new_peer;
+                               port->new_peer = NULL;
+                               forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
+                       } else {
+                               ul_logdeb("peer already discovered - ignoring\n");
+                               if (addr)
+                                       forb_free(addr);
+                               if (peer_orb_id)
+                                       forb_free(peer_orb_id);
+                               forb_peer_put(peer);
+                       }
                } else {
                        if (port->new_peer) {
-                               if (peer)
+                               if (peer) {
                                        ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
+                                       forb_peer_put(peer);
+                               }
                                peer = port->new_peer;
                                port->new_peer = NULL;
                        }
index 38971f6a28b32e9499e093ddaeda02ae60fab51d..e257309e0fa4ce8867cf9858bba3e9dc1d1b6a26 100644 (file)
--- a/src/iop.h
+++ b/src/iop.h
@@ -95,5 +95,7 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env);
 
 int
 forb_iop_send_hello_to(forb_peer_t *peer);
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer);
 
 #endif
index 32a45c8c8e8ef0df90c72cdc8e8435df8bb80e8e..94296c1535d6ce050bedb943ca7873f5e3571bd3 100644 (file)
@@ -147,6 +147,8 @@ inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr)
        return ret;
 }
 
+int setnonblocking(int fd);
+
 static int
 inet_connect(forb_peer_t *peer)
 {
@@ -177,6 +179,8 @@ inet_connect(forb_peer_t *peer)
                goto err_close;
        }
 
+       setnonblocking(ipeer->socket);
+
        struct epoll_event ev;
        struct inet_port *p = peer->port->desc.proto_priv;
        memset(&ev, 0, sizeof(ev));