]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Discovery protocol should work now, but it doesn't :(
authorMichal Sojka <sojkam1@fel.cvut.cz>
Mon, 1 Sep 2008 20:47:39 +0000 (22:47 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Mon, 1 Sep 2008 20:47:39 +0000 (22:47 +0200)
forb.c
iop.c
proto.c
proto.h

diff --git a/forb.c b/forb.c
index ca942a41ea9efcebd88c34fd5d961d7b0e0a7151..a0b31bd48e8f43aab7959e96800a6ad80377dbf1 100644 (file)
--- a/forb.c
+++ b/forb.c
@@ -63,7 +63,7 @@ forb_orb forb_init(void)
                }
        }
 #endif
-
+       /* TODO: atexit(destroy_all_forbs); */
        return object;
 
 err2:  forb_free(forb);
diff --git a/iop.c b/iop.c
index 754daf88e58983dca963cd3f1d6445d2c9cda09a..953b3cfcec02b7120a09093d147172f692667042 100644 (file)
--- a/iop.c
+++ b/iop.c
@@ -1,7 +1,17 @@
+/**
+ * @file   iop.c
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @date   Mon Sep  1 21:51:58 2008
+ * 
+ * @brief  Helper function for Inter-ORB protocol.
+ * 
+ * 
+ */
 #include <forb/iop.h>
 #include <forb/forb-idl.h>
 #include <forb/cdr.h>
 
+/** Version of the protocol */
 #define VER_MAJOR 0
 #define VER_MINOR 0
 
@@ -58,46 +68,6 @@ forb_iop_prepare_hello(CDR_Codec *codec,
        return CORBA_TRUE;
 }
 
-static void
-process_request(CDR_Codec *codec)
-{
-       forb_iop_request_header rh;
-       forb_iop_request_header_deserialize(codec, &rh);
-}
-
-static void
-process_reply(CDR_Codec *codec)
-{
-       forb_iop_reply_header rh;
-       forb_iop_reply_header_deserialize(codec, &rh);
-       /* TODO */
-}
-
-static void
-process_hello(CDR_Codec *codec/* , forb_port_t *port, forb_t *forb */)
-{
-       forb_server_id server_id;
-       forb_server_id_deserialize(codec, &server_id);
-}
-
-void
-forb_iop_process_message(forb_iop_message_header *mh, CDR_Codec *codec)
-{
-       switch (mh->message_type) {
-               case forb_iop_REQUEST:
-                       process_request(codec);
-                       break;
-               case forb_iop_REPLY:
-                       process_reply(codec);
-                       break;
-               case forb_iop_HELLO:
-                       process_hello(codec);
-                       break;
-               default:
-                       break;
-       }
-}
-
 bool
 forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
 {
diff --git a/proto.c b/proto.c
index 3e58bcd6129d31a203833f551b8fe34ef306e9f5..eec903a8954991c7c4cdbe7cd79a3f769244311e 100644 (file)
--- a/proto.c
+++ b/proto.c
@@ -2,6 +2,7 @@
 #include <forb/forb-idl.h>
 #include <forb/iop.h>
 #include <forb/config.h>
+#include <stdio.h>
 
 GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
                       forb_t,          /* cust_root_t */
@@ -13,6 +14,67 @@ GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
                       forb_server_id_cmp)/* cust_cmp_fnc */
 
 
+static void
+process_request(forb_port_t *port, CDR_Codec *codec)
+{
+       forb_iop_request_header rh;
+       forb_iop_request_header_deserialize(codec, &rh);
+}
+
+static void
+process_reply(forb_port_t *port, CDR_Codec *codec)
+{
+       forb_iop_reply_header rh;
+       forb_iop_reply_header_deserialize(codec, &rh);
+       /* TODO */
+}
+
+static void
+process_hello(forb_port_t *port, CDR_Codec *codec)
+{
+       forb_server_id server_id;
+       void *addr = NULL;
+       forb_peer_t *peer;
+       forb_t *forb = port->forb;
+
+       printf("Hello received at port %p\n", port);
+
+       forb_server_id_deserialize(codec, &server_id);
+       if (port->proto->deserialize_addr) {
+               port->proto->deserialize_addr(codec, &addr);
+       }
+       peer = forb_peer_find(forb, &server_id);
+       if (peer) {
+               /* TODO: Update last hello receive time */
+       } else {
+               /* New peer discovered */
+               peer = forb_malloc(sizeof(*peer));
+               peer->server_id = server_id;
+               peer->port = port;
+               peer->addr = addr;
+               forb_peer_insert(forb, peer);
+       }
+       
+}
+
+static void
+process_message(forb_port_t *port, forb_iop_message_header *mh, CDR_Codec *codec)
+{
+       switch (mh->message_type) {
+               case forb_iop_REQUEST:
+                       process_request(port, codec);
+                       break;
+               case forb_iop_REPLY:
+                       process_reply(port, codec);
+                       break;
+               case forb_iop_HELLO:
+                       process_hello(port, codec);
+                       break;
+               default:
+                       break;
+       }
+}
+
 /** 
  * Thread run for every port to receive FORB messages from that port. 
  * 
@@ -30,26 +92,37 @@ static void *port_receiver_thread(void *arg)
        bool header_received = false;
 
        while (!port->finish) {
+               if (c->rptr == c->wptr) {
+                       /* The buffer is empty now - start writing from begining*/
+                       CDR_buffer_reset(c, 0);
+               }
                rcvd = proto->recv(port,
                                   &c->buffer[c->wptr],
                                   c->wptr_max - c->wptr);
                c->wptr += rcvd;
-               len = c->wptr - c->rptr;
-               /* Wait for and then process message header */
-               if (!header_received && len >= forb_iop_MESSAGE_HEADER_SIZE) {
-                       header_received = forb_iop_process_message_header(&mh, c);
-                       if (!header_received) { /* Reset the receiving buffer */
-                               c->rptr = c->wptr = 0;
-                       } 
-               }
-               len = c->wptr - c->rptr;
-               /* Wait for and then process the message body */
-               if (header_received && len >= mh.message_size) {
-                       forb_iop_process_message(&mh, c);
-                       header_received = false;
-                       if (c->rptr == c->wptr) {
-                               /* The buffer is empty now */
-                               c->rptr = c->wptr = 0;
+
+               /* While there are some data in the buffer, process them. */
+               while (c->wptr - c->rptr > 0) {
+                       len = c->wptr - c->rptr;
+                       /* Wait for and then process message header */
+                       if (!header_received) {
+                               if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
+                                       header_received = forb_iop_process_message_header(&mh, c);
+                                       len = c->wptr - c->rptr;
+                               } else {
+                                       break; /* Wait for more data to arrive*/
+                               }
+                       }
+                       
+                       /* Wait for and then process the message body */
+                       if (header_received) {
+                               if (len >= mh.message_size) {
+                                       process_message(port, &mh, c);
+                                       /* Wait for the next message */
+                                       header_received = false;
+                               } else {
+                                       break; /* Wait for more data to arrive*/
+                               }
                        }
                }
        }
@@ -65,26 +138,27 @@ static void *port_receiver_thread(void *arg)
  * 
  * @return Always NULL
  */
-static void *port_discovery_thread(void *arg)
-{
-       forb_port_t *port = arg;
-       const forb_proto_t *proto = port->proto;
-       CDR_Codec codec;
+       static void *port_discovery_thread(void *arg)
+       {
+               forb_port_t *port = arg;
+               const forb_proto_t *proto = port->proto;
+               CDR_Codec codec;
 
-       CDR_codec_init_static(&codec);
-       CDR_buffer_init(&codec, 1024, 0);
+               CDR_codec_init_static(&codec);
+               CDR_buffer_init(&codec, 1024, 0);
                
-       while (!port->finish) {
-               CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
-               forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
-                                      proto->serialize_addr);
-               proto->broadcast(port, &codec.buffer[codec.rptr],
-                                codec.wptr - codec.rptr);
-               sleep(proto->hello_interval);
+               while (!port->finish) {
+                       CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
+                       forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
+                                              proto->serialize_addr);
+                       printf("Broadcasting hello from port %p\n", port); 
+                       proto->broadcast(port, &codec.buffer[codec.rptr],
+                                        codec.wptr - codec.rptr);
+                       sleep(proto->hello_interval);
+               }
+               CDR_codec_release_buffer(&codec);
+               return NULL;
        }
-       CDR_codec_release_buffer(&codec);
-       return NULL;
-}
 
 /** 
  * Registers a new port in FORB and run receiver thread on it to
@@ -98,89 +172,89 @@ static void *port_discovery_thread(void *arg)
  *
  * @return Zero on success, FOSA error code on error.
  */
-int forb_register_port(forb_t *forb, forb_port_t *port)
-{
-       int ret;
+       int forb_register_port(forb_t *forb, forb_port_t *port)
+       {
+               int ret;
        
-       port->forb = forb;
-       port->finish = false;
+               port->forb = forb;
+               port->finish = false;
        
-       fosa_mutex_lock(&forb->port_mutex);
-       forb_port_insert(forb, port);
-       fosa_mutex_unlock(&forb->port_mutex);
-
-       CDR_codec_init_static(&port->codec);
-       if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
-               ret = FOSA_ENOMEM;
-               goto err;
-       }
-       ret = fosa_thread_create(&port->receiver_thread, NULL,
-                                port_receiver_thread, port);
-       if (ret != 0)
-               goto err2;
-       ret = fosa_thread_create(&port->discovery_thread, NULL,
-                                port_discovery_thread, port);
-       if (ret != 0)
-               goto err3;
-       return 0;
-err3:
-       port->finish = true;
-       /* TODO: Wait for the thread to finish - we need some FOSA API for this */
-err2:
-       CDR_codec_release_buffer(&port->codec);
-err:
-       fosa_mutex_lock(&forb->port_mutex);
-       forb_port_delete(forb, port);
-       fosa_mutex_unlock(&forb->port_mutex);
+               fosa_mutex_lock(&forb->port_mutex);
+               forb_port_insert(forb, port);
+               fosa_mutex_unlock(&forb->port_mutex);
+
+               CDR_codec_init_static(&port->codec);
+               if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
+                       ret = FOSA_ENOMEM;
+                       goto err;
+               }
+               ret = fosa_thread_create(&port->receiver_thread, NULL,
+                                        port_receiver_thread, port);
+               if (ret != 0)
+                       goto err2;
+               ret = fosa_thread_create(&port->discovery_thread, NULL,
+                                        port_discovery_thread, port);
+               if (ret != 0)
+                       goto err3;
+               return 0;
+       err3:
+               port->finish = true;
+               /* TODO: Wait for the thread to finish - we need some FOSA API for this */
+       err2:
+               CDR_codec_release_buffer(&port->codec);
+       err:
+               fosa_mutex_lock(&forb->port_mutex);
+               forb_port_delete(forb, port);
+               fosa_mutex_unlock(&forb->port_mutex);
        
-       return ret;
-}
+               return ret;
+       }
 
-void forb_destroy_port(forb_port_t *port)
-{
-       forb_peer_t *peer;
-       forb_t *forb = port->forb;
+       void forb_destroy_port(forb_port_t *port)
+       {
+               forb_peer_t *peer;
+               forb_t *forb = port->forb;
        
-       port->finish = true;    /* Exit all the threads */
-       fosa_mutex_lock(&forb->port_mutex);
-       forb_port_delete(forb, port);
-       fosa_mutex_unlock(&forb->port_mutex);
+               port->finish = true;    /* Exit all the threads */
+               fosa_mutex_lock(&forb->port_mutex);
+               forb_port_delete(forb, port);
+               fosa_mutex_unlock(&forb->port_mutex);
 
-       /* TODO: Wait for the thread to finish - we need some FOSA API for this */
-       //fosa_cond_wait(port->threads)
+               /* TODO: Wait for the thread to finish - we need some FOSA API for this */
+               //fosa_cond_wait(port->threads)
 
-       if (port->proto->port_destroy) {
-               port->proto->port_destroy(port);
-       }
+               if (port->proto->port_destroy) {
+                       port->proto->port_destroy(port);
+               }
 
-       fosa_mutex_lock(&port->forb->peer_mutex);
-       gavl_cust_for_each(forb_peer_nolock, forb, peer) {
-               if (peer->port == port) {
-                       if (port->proto->peer_destroy) {
-                               port->proto->peer_destroy(peer);
+               fosa_mutex_lock(&port->forb->peer_mutex);
+               gavl_cust_for_each(forb_peer_nolock, forb, peer) {
+                       if (peer->port == port) {
+                               if (port->proto->peer_destroy) {
+                                       port->proto->peer_destroy(peer);
+                               }
+                               /* TODO: Reference counting */
+                               forb_free(peer);
                        }
-                       /* TODO: Reference counting */
-                       forb_free(peer);
                }
-       }
-       fosa_mutex_unlock(&port->forb->peer_mutex);
+               fosa_mutex_unlock(&port->forb->peer_mutex);
 
-       /* TODO: reference counting */
-       forb_free(port);
-}
+               /* TODO: reference counting */
+               forb_free(port);
+       }
 
-size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
-{
-       return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
-                                      codec->wptr - codec->rptr);
-}
+       size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
+       {
+               return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
+                                              codec->wptr - codec->rptr);
+       }
 
 
-forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
-{
-       forb_peer_t *peer;
-       peer = forb_peer_find(forb, server_id);
-       return peer;
+       forb_peer_t *
+               forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
+       {
+               forb_peer_t *peer;
+               peer = forb_peer_find(forb, server_id);
+               return peer;
 
-}
+       }
diff --git a/proto.h b/proto.h
index a07a4a50486e4b503e0fea59caa4da90b6675693..dea791d9bb832006af35d5a18774e2f7c4273241 100644 (file)
--- a/proto.h
+++ b/proto.h
@@ -91,7 +91,7 @@ struct forb_proto {
        CORBA_boolean (*serialize_addr)(CDR_Codec *codec, const void *addr);
 
        /** Deserializes the protocol specific address */
-       CORBA_boolean (*deserialize_addr)(CDR_Codec *codec, void *addr);
+       CORBA_boolean (*deserialize_addr)(CDR_Codec *codec, void **addr);
 };
 
 /**