]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Waiting for peer discovery is almost finished
authorMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 7 Oct 2008 16:15:09 +0000 (18:15 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 7 Oct 2008 16:15:09 +0000 (18:15 +0200)
There are probably still some race conditions which appear sometimes
in the discovery test.

src/iop.c
src/peer.c
src/peer.h
src/proto.c
src/proto.h
src/request.c
src/tests/discovery.c
src/tests/hello_remote.c
src/tests/objref/objref.c

index 5508e3ca6f8ae19f5f0d8a2ba931f1f4f30578c8..38b915a651de2423568e037f749c45f8542c30af 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -175,6 +175,7 @@ forb_iop_send_reply(forb_t *forb,
        forb_iop_reply_header reply_header;
        CORBA_boolean ret;
        forb_peer_t *peer;
+       fosa_abs_time_t timeout;
        
        reply_header.request_id = request_id;
        reply_header.flags = 0;
@@ -197,7 +198,11 @@ forb_iop_send_reply(forb_t *forb,
        }
        forb_iop_prepend_message_header(codec, forb_iop_REPLY);
 
-       peer = forb_get_next_hop(forb, dest);
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
+
+       peer = forb_get_next_hop(forb, dest, &timeout);
        if (!peer) {
                ul_logerr("Reply destination not found\n");
                goto err;
@@ -378,20 +383,30 @@ process_hello(forb_port_t *port, CDR_Codec *codec)
        }
        if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
                peer = forb_peer_find(forb, &server_id);
-               if (peer) {
+               if (peer && peer->state == FORB_PEER_DISCOVERED) {
                        /* TODO: Update last hello receive time */
                        forb_peer_put(peer);
                } else {
                        /* New peer discovered */
-/*                     char str[100]; */
-/*                     printf("New peer %s discovered port %p\n", */
-/*                            forb_server_id_to_string(str, &server_id, sizeof(str)), port); */
-                       peer = forb_peer_new();
+                       bool notify_waiters = false;
+                       if (peer /* && peer->state == FORB_PEER_WANTED */) {
+                               notify_waiters = true;
+                       } else {
+                               peer = forb_peer_new();
+                       }
                        if (peer) {
+                               fosa_mutex_lock(&forb->peer_mutex);
                                peer->server_id = server_id;
                                peer->port = port;
                                peer->addr = addr;
-                               forb_peer_insert(forb, peer);
+                               peer->state = FORB_PEER_DISCOVERED;
+                               if (notify_waiters) {
+                                       fosa_cond_broadcast(&peer->cond);
+                               } else {
+                                       forb_peer_get(peer);
+                                       forb_peer_nolock_insert(forb, peer);
+                               }
+                               fosa_mutex_unlock(&forb->peer_mutex);
                                forb_peer_put(peer);
                        }
                        /* Broadcast our hello packet now */
index 0151fcd4cec75a7e7de2a7c444be0e74ede95c9a..ca4e2fc58e5d7ad7b64991b6d50d33ee68db4bbf 100644 (file)
@@ -66,6 +66,8 @@ forb_peer_new(void)
        
        peer = forb_malloc(sizeof(*peer));
        if (peer) {
+               memset(peer, 0, sizeof(*peer));
+               fosa_cond_init(&peer->cond);
                forb_ref_init(&peer->ref);
        }
        return peer;
@@ -97,3 +99,44 @@ forb_peer_delete_by_port(forb_t *forb, forb_port_t *port)
        }
        fosa_mutex_unlock(&port->forb->peer_mutex);
 }
+
+forb_peer_t *
+forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
+                    fosa_abs_time_t *timeout)
+{
+       forb_peer_t *peer;
+       
+       fosa_mutex_lock(&forb->peer_mutex);
+       peer = forb_peer_nolock_find(forb, server_id);
+       if (!peer && !timeout) goto unlock;
+       if (peer) {
+               forb_peer_get(peer);
+       }
+       if (!peer) {
+               /* We are the first who want to contact this peer */
+               peer = forb_peer_new();
+               if (!peer) goto unlock;
+               peer->server_id = *server_id;
+               peer->state = FORB_PEER_WANTED;
+               forb_peer_get(peer);
+               forb_peer_nolock_insert(forb, peer);
+       }
+       while (peer->state == FORB_PEER_WANTED) {
+               int ret;
+               /* Wait for the peer to be discovered. */
+               ret = fosa_cond_timedwait(&peer->cond,
+                                         &forb->peer_mutex,
+                                         timeout);
+               if (ret == FOSA_ETIMEDOUT) {
+                       forb_peer_put(peer);
+                       peer = NULL;
+                       /* TODO: How to remove the peer from the tree?
+                        * We should check refcnt and if equal to 1,
+                        * we should remove it.  */
+               }
+       }
+unlock:        
+       fosa_mutex_unlock(&forb->peer_mutex);
+
+       return peer;
+}
index ba8e2969f3e1222edb0fe835fc81330744928259..d0a51869ed4c74d8cd85fc4e652274d31442655a 100644 (file)
 #include "port.h"
 #include "refcnt.h"
 
+enum forb_peer_state {
+       FORB_PEER_WANTED,       /**< Somebody is waiting for the peer to be discovered. In this state, only @a server_id field is set. */
+       FORB_PEER_DISCOVERED,   /**< The peer is already discovered, so it is addr is known and messages can be sent to him. */
+};
 
 /**
  * Description of a peer. We consider peers only in one-hop
  struct forb_peer {
         forb_server_id server_id; /**< Server_id of the peer */
         forb_port_t *port;     /**< Port of this peer is connected to */
-        void *addr;            /**< Protocol specific address of the peer */
+        void *addr;            /**< Protocol specific address of the peer. */
         void *proto_priv;      /**< Protocol private data (e.g. info about established connection) */
         gavl_node_t node;      /**< Node of port's peers tree */
         forb_ref_t ref;        /**< Reference count */
+        enum forb_peer_state state;
+        fosa_cond_t cond;      /**< Condition variable for waiting after the peer is discovered. Used together with forb::peer_mutex. */
 };
 
 typedef struct forb_peer forb_peer_t;
@@ -142,4 +148,8 @@ forb_peer_find(forb_t *forb, forb_server_id *server_id)
        return ret;
 }
 
+forb_peer_t *
+forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
+                    fosa_abs_time_t *timeout);
+
 #endif
index cfea17b23e7c45a8d73b599b8e7eaa89494cafde..6d89450c5a4c6a29bdd3168052d8dccb9d9097c3 100644 (file)
@@ -69,10 +69,10 @@ size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
  * @return Next hop peer of NULL, if the destination is not found.
  */
 forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
+forb_get_next_hop(forb_t *forb, forb_server_id *server_id, fosa_abs_time_t *timeout)
 {
        forb_peer_t *peer;
-       peer = forb_peer_find(forb, server_id);
+       peer = forb_peer_find_timed(forb, server_id, timeout);
        return peer;
 
 }
index dc0dbfbf1b47433df9443dfd4f28ab0393294844..c68c423410740b972db91b342b1c75da929f0c13 100644 (file)
@@ -145,7 +145,7 @@ struct forb_proto {
 size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec);
 
 forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server);
+forb_get_next_hop(forb_t *forb, forb_server_id *server, fosa_abs_time_t *timeout);
 
 
 #endif
index c69295ee854b955c588f35297852fd4d6ac3cadf..a8f5e57157586950d724b78c787821763b6e5123 100644 (file)
@@ -152,7 +152,9 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
        CORBA_boolean ret;
        forb_peer_t *peer;
        size_t size;
+       fosa_abs_time_t timeout;
        forb_t *forb = forb_object_to_forb(req->obj);
+
        if (!forb) {
                env->major = FORB_EX_INTERNAL;
                return;
@@ -166,7 +168,11 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
                env->major = FORB_EX_INTERNAL;
                return;
        }
-       peer = forb_get_next_hop(forb, &req->obj->server);
+
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
+       peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
        if (!peer) {
                char str[50];
                ul_logerr("Cannot find peer to send request to server: %s\n",
index e090b7a2b99021290b4a45f2f0145ec7c065f8bc..dbc6fe6baacbf28ed1a6340206e257f754e74fd6 100644 (file)
@@ -52,6 +52,7 @@
 #include <unistd.h>
 #include "../proto.h"
 #include <error.h>
+#include <fosa.h>
 
 #define NUM_ORBS 5
 
@@ -64,13 +65,15 @@ int main(int argc, char *argv[])
        forb_orb orb[NUM_ORBS];
        int i;
        bool all_peers_found = true;
+       fosa_abs_time_t timeout;
 
        for (i=0; i<NUM_ORBS; i++) {
                orb[i] = forb_init(&argc, &argv, NULL);
        }
 
-       /* Wait for discovering all the peers */
-       sleep(1);
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
 
        /* Check whether all the peers are found */
        for (i=0; i<NUM_ORBS && all_peers_found; i++) {
@@ -78,7 +81,9 @@ int main(int argc, char *argv[])
                forb_peer_t *peer;
                for (j=0; j<NUM_ORBS && all_peers_found; j++) {
                        if (i==j) continue;
-                       peer = forb_peer_find(forb_data(orb[i]), &forb_data(orb[j])->server_id);
+                       peer = forb_peer_find_timed(forb_data(orb[i]),
+                                                   &forb_data(orb[j])->server_id,
+                                                   &timeout);
 
                        all_peers_found &= (peer != NULL);
                        if (!all_peers_found) {
index f24846ac4bc0119387084400b1128ad55e1f6cb1..06ebe76227fe82fe503f3871cff793afafb6db46 100644 (file)
@@ -102,8 +102,6 @@ void client(void *arg)
        
        orb = forb_init(&argc_g, &argv_g, "hello_remote_client");
 
-       sleep(1);               /* Hack to wait for dicovery of the two peers */
-
        hobj = forb_string_to_object(orb, objref);
        if (!hobj) {
                error(1, 0, "Cannot create object reference from the string (%s)\n", objref);
index 9efdcc63de66b590ea53842109df434c309cbf57..4e508e2a3561f00d52c5b26a6688b887ebdd8571 100644 (file)
@@ -131,8 +131,6 @@ void start_client(void *arg)
        
        orb = forb_init(&argc_g, &argv_g, "objref_client");
 
-       sleep(1);               /* Hack to wait for dicovery of the two peers */
-
        me = forb_client_new(orb, &client_impl, NULL);
        fosa_thread_create(&tid, NULL, client_thread, me);