]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Discovery protocol works correctly now,
authorMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 2 Sep 2008 13:47:55 +0000 (15:47 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 2 Sep 2008 13:47:55 +0000 (15:47 +0200)
the main problem was the uninitialized data_endian memver in CDR_codec.

cdr.c
config.target
forb-internal.h
forb.c
iop.c
proto.c
proto.h
proto_unix.c
tests/discovery.c

diff --git a/cdr.c b/cdr.c
index 803147d0a9a988259a4a23b43bb736d84879d6e3..126638026dbeef8a2855ebd8ea999b3bd6a666a3 100644 (file)
--- a/cdr.c
+++ b/cdr.c
@@ -444,6 +444,7 @@ CDR_codec_init_static(CDR_Codec *codec)
        memset(codec, 0, sizeof(CDR_Codec));
 
        codec->host_endian = FLAG_ENDIANNESS;
+       codec->data_endian = FLAG_ENDIANNESS;
 
        return codec;
 }
index eba98314f74002ccf7dc9b5ce48e131de0d668a6..a7a91f66bb28abb63bc409b665b7e5a986c3063a 100644 (file)
@@ -4,6 +4,6 @@ IDL_COMPILER = $(OUTPUT_DIR)/$(COMPILED_DIR_NAME)/bin/forb-idl
 
 export PLATFORM=AQuoSA
 
-CFLAGS = -g -Wall -O0 -D$(PLATFORM)
+CFLAGS = -g -Wall -O0 -D$(PLATFORM) -D_REENTRANT
 
 USE_LEAF_MAKEFILES=n
index a24d5662fdc220ab79c5f651fed7a777785e87a0..d36b620d97bec132c79a8f9e3544918fba8b7957 100644 (file)
@@ -101,7 +101,7 @@ GAVL_CUST_NODE_INT_DEC(forb_request_nolock /* cust_prefix */,
                       requests /* cust_root_node */,
                       node /* cust_item_node */,
                       request_id /* cust_item_key */,
-                      forb_request_cmp/* cust_cmp_fnc */)
+                      forb_request_cmp/* cust_cmp_fnc */);
 
 static inline void
 forb_request_insert(forb_t *forb, forb_request_t *req)
@@ -143,5 +143,12 @@ static inline int forb_server_id_cmp(const forb_server_id *id1, const forb_serve
        return memcmp(id1, id2, sizeof(forb_server_id));
 }
 
+static inline char *
+forb_server_id_to_string(char *dest, const forb_server_id *server_id, size_t n)
+{
+       return forb_uuid_to_string(dest, (forb_uuid_t*)server_id->uuid, n);
+       
+}
+
 
 #endif
diff --git a/forb.c b/forb.c
index a0b31bd48e8f43aab7959e96800a6ad80377dbf1..a74feb39cf134c4fa0e9b8cd7cc5690677c8cca9 100644 (file)
--- a/forb.c
+++ b/forb.c
@@ -25,7 +25,7 @@ GAVL_CUST_NODE_INT_IMP(forb_request_nolock /* cust_prefix */,
                       requests /* cust_root_node */,
                       node /* cust_item_node */,
                       request_id /* cust_item_key */,
-                      forb_request_cmp/* cust_cmp_fnc */)
+                      forb_request_cmp/* cust_cmp_fnc */);
 
 
 forb_orb forb_init(void)
@@ -77,7 +77,7 @@ void forb_destroy(forb_orb orb)
        forb_port_t *port;
 
        
-       ul_list_for_each(forb_port, forb, port) {
+       ul_list_for_each_cut(forb_port, forb, port) {
                forb_destroy_port(port);
        }
        forb_free(forb);
@@ -202,4 +202,3 @@ forb_server_id_init(forb_server_id *server_id)
 {
        forb_uuid_generate((forb_uuid_t*)server_id->uuid);
 }
-
diff --git a/iop.c b/iop.c
index 953b3cfcec02b7120a09093d147172f692667042..540ae61302f70a474f8a17bff892bb211492e1ba 100644 (file)
--- a/iop.c
+++ b/iop.c
@@ -83,6 +83,7 @@ forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
                        switch (mh->message_type) {
                                case forb_iop_REQUEST:
                                case forb_iop_REPLY:
+                               case forb_iop_HELLO:
                                        break;
                                default:
                                        return false;
diff --git a/proto.c b/proto.c
index b0a1831660aeae5b5b1f8851cffee2226a7ec9b2..4acd9adfa092648fefd7f51849fb47c42549d783 100644 (file)
--- a/proto.c
+++ b/proto.c
@@ -29,6 +29,17 @@ process_reply(forb_port_t *port, CDR_Codec *codec)
        /* TODO */
 }
 
+/** 
+ * Process incomming HELLO messages.
+ *
+ * For every incomming HELLO message the peer table is searched
+ * whether it already contains a record for that peer or not. If not,
+ * the new peer is added to the table and another hello message is
+ * sent so that the new peer discovers us quickly.
+ * 
+ * @param port 
+ * @param codec 
+ */
 static void
 process_hello(forb_port_t *port, CDR_Codec *codec)
 {
@@ -37,24 +48,32 @@ process_hello(forb_port_t *port, CDR_Codec *codec)
        forb_peer_t *peer;
        forb_t *forb = port->forb;
 
-       printf("Hello received at port %p\n", port);
+/*     printf("Hello received at port %p\n", port); */
 
        forb_server_id_deserialize(codec, &server_id);
        if (port->proto->deserialize_addr) {
                port->proto->deserialize_addr(codec, &addr);
        }
-       peer = forb_peer_find(forb, &server_id);
-       if (peer) {
-               /* TODO: Update last hello receive time */
-       } else {
-               /* New peer discovered */
-               peer = forb_malloc(sizeof(*peer));
-               peer->server_id = server_id;
-               peer->port = port;
-               peer->addr = addr;
-               forb_peer_insert(forb, peer);
+       if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
+               peer = forb_peer_find(forb, &server_id);
+               if (peer) {
+                       /* TODO: Update last hello receive time */
+               } else {
+                       /* New peer discovered */
+/*                     char str[100]; */
+/*                     printf("New peer %s discovered port %p\n", */
+/*                            forb_server_id_to_string(str, &server_id, sizeof(str)), port); */
+                       peer = forb_malloc(sizeof(*peer));
+                       peer->server_id = server_id;
+                       peer->port = port;
+                       peer->addr = addr;
+                       forb_peer_insert(forb, peer);
+
+                       fosa_mutex_lock(&port->hello_mutex);
+                       fosa_cond_signal(&port->hello_cond);
+                       fosa_mutex_unlock(&port->hello_mutex);
+               }
        }
-       
 }
 
 static void
@@ -100,15 +119,16 @@ static void *port_receiver_thread(void *arg)
                                   &c->buffer[c->wptr],
                                   c->wptr_max - c->wptr);
                c->wptr += rcvd;
+                c->wptr_last = c->wptr;
 
                /* While there are some data in the buffer, process them. */
-               while (c->wptr - c->rptr > 0) {
-                       len = c->wptr - c->rptr;
+               while (CDR_data_size(c) > 0) {
+                       len = CDR_data_size(c);
                        /* Wait for and then process message header */
                        if (!header_received) {
                                if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
                                        header_received = forb_iop_process_message_header(&mh, c);
-                                       len = c->wptr - c->rptr;
+                                       len = CDR_data_size(c);
                                } else {
                                        break; /* Wait for more data to arrive*/
                                }
@@ -131,8 +151,8 @@ static void *port_receiver_thread(void *arg)
 
 /** 
  * Thread run for every port to broadcast HELLO messages. These
- * messages are used for a FORB to discover all peers and detect their
- * disconnection.
+ * messages are used for a FORB to discover all peers (and in future
+ * also to detect their disconnection).
  * 
  * @param arg Pointer to ::forb_port_t typecasted to void *.
  * 
@@ -143,18 +163,31 @@ static void *port_discovery_thread(void *arg)
        forb_port_t *port = arg;
        const forb_proto_t *proto = port->proto;
        CDR_Codec codec;
-
+       fosa_abs_time_t hello_time;
+       fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
+       
        CDR_codec_init_static(&codec);
        CDR_buffer_init(&codec, 1024, 0);
-               
+
+
+       fosa_clock_get_time(FOSA_CLOCK_REALTIME, &hello_time);
+       
        while (!port->finish) {
                CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
                forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
                                       proto->serialize_addr);
-               printf("Broadcasting hello from port %p\n", port); 
+/*             printf("Broadcasting hello from port %p\n", port);  */
                proto->broadcast(port, &codec.buffer[codec.rptr],
-                                codec.wptr - codec.rptr);
-               sleep(proto->hello_interval);
+                                CDR_data_size(&codec));
+
+               /* Wait for next hello interval or until somebody
+                * signal us. */
+               fosa_abs_time_incr(hello_time, hello_interval);
+               /* sem_timedwait would be more appropriate */
+               fosa_mutex_lock(&port->hello_mutex);
+               fosa_cond_timedwait(&port->hello_cond, &port->hello_mutex,
+                                   &hello_time);
+               fosa_mutex_unlock(&port->hello_mutex);
        }
        CDR_codec_release_buffer(&codec);
        return NULL;
@@ -183,6 +216,9 @@ int forb_register_port(forb_t *forb, forb_port_t *port)
        forb_port_insert(forb, port);
        fosa_mutex_unlock(&forb->port_mutex);
 
+       fosa_mutex_init(&port->hello_mutex, 0);
+       fosa_cond_init(&port->hello_cond);
+
        CDR_codec_init_static(&port->codec);
        if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
                ret = FOSA_ENOMEM;
@@ -216,9 +252,10 @@ void forb_destroy_port(forb_port_t *port)
        forb_t *forb = port->forb;
        
        port->finish = true;    /* Exit all the threads */
-       fosa_mutex_lock(&forb->port_mutex);
-       forb_port_delete(forb, port);
-       fosa_mutex_unlock(&forb->port_mutex);
+
+/*     fosa_mutex_lock(&forb->port_mutex); */
+/*     forb_port_delete(forb, port); */
+/*     fosa_mutex_unlock(&forb->port_mutex); */
 
        /* TODO: Wait for the thread to finish - we need some FOSA API for this */
        //fosa_cond_wait(port->threads)
@@ -246,7 +283,7 @@ void forb_destroy_port(forb_port_t *port)
 size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
 {
        return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
-                                      codec->wptr - codec->rptr);
+                                      CDR_data_size(codec));
 }
 
 
diff --git a/proto.h b/proto.h
index dea791d9bb832006af35d5a18774e2f7c4273241..f2b1a4b12bd6e3a1ff610841a2823c91b6d7f3e0 100644 (file)
--- a/proto.h
+++ b/proto.h
@@ -104,6 +104,8 @@ struct forb_port {
        forb_t *forb;                     /**< FORB, this port is registered in. */
        fosa_thread_id_t receiver_thread; /**< The thread running forb_port_receiver_thread() */
        fosa_thread_id_t discovery_thread;/**< The thread for periodic sending HELLO messages */
+       fosa_cond_t hello_cond;           /**< Condition variable for signaling the discovery thread to send the hello messages now. */
+       fosa_mutex_t hello_mutex;         /**< Mutex for @c hello_cond */
        CDR_Codec codec;                  /**< Receiving buffer for receiver thread */
        void *addr;                       /**< Port's address in a protocol specific format. */
        ul_list_node_t node;              /**< Node in forb's port list */
index f8060b1b72c7bc9a3922e35769c30a0bf9a244b6..9d73f4a8e842ff3cc8b73be110fa07ba743a5898 100644 (file)
@@ -81,12 +81,14 @@ unix_broadcast(forb_port_t *port, const void *buf, size_t len)
                {
                        strcpy(addr.sun_path, "/tmp/");
                        strncat(addr.sun_path, dirent->d_name, sizeof(addr.sun_path));
-                       printf("Broadcasting to %s\n", addr.sun_path);
+/*                     printf("Broadcasting to %s\n", addr.sun_path); */
                        ret = sendto(p->socket, buf, len, 0,
                                     (struct sockaddr*)&addr, sizeof(addr));
-                       /* We do not care about errors in brodcasts */
+                       /* We do not care about errors in brodcasts -
+                        * the socket may nomore be active */
                        if (ret != len) {
-                               return ret;
+/*                             perror("unix_broadcast"); */
+/*                             return ret; */
                        }
                }
                        
index fab4e181fc3122c73141ffc47eae6f77f7a05922..5e5c36fa5a25e479addfbcec9575aa2f161fb54e 100644 (file)
@@ -49,7 +49,7 @@ int main(int argc, char *argv[])
                forb_peer_t *peer;
                for (j=0; j<NUM_ORBS && all_peers_found; j++) {
                        if (i==j) continue;
-                       peer = forb_peer_find(forb_data(orb[i]), &forb_data(orb[i])->server_id);
+                       peer = forb_peer_find(forb_data(orb[i]), &forb_data(orb[j])->server_id);
 
                        all_peers_found &= (peer != NULL);
                        if (!all_peers_found) {