]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/iop.c
forb: Objects must be properly reference-counted
[frescor/forb.git] / src / iop.c
index 5508e3ca6f8ae19f5f0d8a2ba931f1f4f30578c8..6ba0ec3107796471f9272e20c36d43201acdb8d3 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -59,6 +59,9 @@
 #include "proto.h"
 #include <ul_log.h>
 #include "exec_req.h"
+#include <errno.h>
+#include "peer.h"
+#include "discovery.h"
 
 /** Version of the protocol */
 #define VER_MAJOR 0
@@ -70,7 +73,7 @@ extern UL_LOG_CUST(ulogd_forb_iop);
 
 
 CORBA_boolean
-forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
+forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
 {
        CORBA_boolean ret;
        forb_iop_message_header mh;
@@ -78,8 +81,8 @@ forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
        mh.proto_version.minor = VER_MINOR;
        mh.message_type = mt;
        mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
-       mh.message_size = CDR_data_size(codec);
-       ret = CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
+       mh.message_size = FORB_CDR_data_size(codec);
+       ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
        if (ret) {
                ret = forb_iop_message_header_serialize(codec, &mh);
        }
@@ -88,42 +91,106 @@ forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
 
 CORBA_boolean
 forb_iop_prepare_request(forb_request_t *req,
-                        char *iface,
-                        unsigned method_ind,
+                        unsigned *index,
                         CORBA_Environment *env)
 {
        CORBA_boolean ret;
        forb_iop_request_header rh;
 
        rh.request_id = req->request_id;
-       rh.iface = iface;
+       rh.iface = req->interface;
        rh.objkey = forb_object_to_key(req->obj);
-       rh.method_index = method_ind;
+       rh.method_index = req->method_ind;
        rh.source = forb_object_to_forb(req->obj)->server_id;
        ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
        if (ret) {
                /* Request body is 8 byte aligned */
-               ret = CDR_put_align(&req->cdr_request, 8);
+               ret = FORB_CDR_put_align(&req->cdr_request, 8);
+               char str[50];
+               ul_logdeb("preparing request: id=%d  dest=%s  iface=%s method=%d\n", req->request_id,
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
+                         rh.iface, rh.method_index);
        }
+       *index = req->cdr_request.wptr;
        return ret;
 }
 
-CORBA_boolean
-forb_iop_prepare_hello(CDR_Codec *codec,
+static CORBA_boolean
+forb_iop_prepare_hello(FORB_CDR_Codec *codec,
                       const forb_server_id *server_id,
                       const void *src_addr,
-                      CORBA_boolean (*serialize_addr)(CDR_Codec *codec, const void *addr))
+                      CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
+                      const CORBA_char *orb_id)
 {
        if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
        if (serialize_addr) {
                if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
        }
+       if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
+       /* All headers must be 8 byte aligned so align the length of
+        * this message */
+       if (!FORB_CDR_put_align(codec, 8)) return CORBA_FALSE; 
        if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
        return CORBA_TRUE;
 }
 
+int forb_iop_send_hello_to(forb_peer_t *peer)
+{
+       forb_port_t *port = peer->port;
+       FORB_CDR_Codec codec;
+       int ret;
+       FORB_CDR_codec_init_static(&codec, port->forb->orb);
+       if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+               return -1;              
+       if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+               ret = -1;
+               goto free;
+       }
+       if (!forb_iop_prepare_hello(&codec, &port->forb->server_id,
+                                   port->desc.addr,
+                                   port->desc.proto->serialize_addr,
+                                   port->forb->attr.orb_id)) {
+               ret = -1;
+               goto free;
+       }
+       ret = forb_proto_send(peer, &codec);
+       if (ret > 0) ret = 0;
+free:
+       FORB_CDR_codec_release_buffer(&codec);
+       return ret;
+}
+
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
+{
+       forb_port_t *port = peer->port;
+       FORB_CDR_Codec codec;
+       int ret;
+       FORB_CDR_codec_init_static(&codec, port->forb->orb);
+       if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+               return -1;              
+       if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+               ret = -1;
+               goto free;
+       }
+       if (!forb_iop_prepare_hello(&codec, &peer->server_id,
+                                   peer->addr,
+                                   peer->port->desc.proto->serialize_addr,
+                                   peer->orb_id)) {
+               ret = -1;
+               goto free;
+       }
+       ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
+                 ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
+       ret = forb_proto_send(dest, &codec);
+       if (ret > 0) ret = 0;
+free:
+       FORB_CDR_codec_release_buffer(&codec);
+       return ret;
+}
+
 bool
-forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
+forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
 {
        /* FIXME: If we have multiple protocol versions, use different
         * type (independent from version) for return value instead of
@@ -140,6 +207,8 @@ forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
                                case forb_iop_HELLO:
                                        break;
                                default:
+                                       ul_logerr("rcvd wrong message type: %d\n",
+                                                 mh->message_type);
                                        return false;
                        }
                        codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
@@ -148,18 +217,20 @@ forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
                        return true;
                        break;                  
                default:
+                       ul_logerr("rcvd wrong protocol versio: %d.%d\n",
+                                 mh->proto_version.major, mh->proto_version.minor);
                        return false;
        }
 }
 
 static inline CORBA_boolean
-forb_exception_serialize(CDR_Codec *codec, struct forb_env *env)
+forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
 {
        return CORBA_long_serialize(codec, &env->major);
 }
 
 static inline CORBA_boolean
-forb_exception_deserialize(CDR_Codec *codec, struct forb_env *env)
+forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
 {
        /* TODO: Declare exceptions in IDL and don't typecast here. */
        return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
@@ -168,26 +239,33 @@ forb_exception_deserialize(CDR_Codec *codec, struct forb_env *env)
 void
 forb_iop_send_reply(forb_t *forb,
           forb_server_id *dest,
-          CDR_Codec *codec,
+          FORB_CDR_Codec *codec,
           CORBA_long request_id,
           struct forb_env *env)
 {
        forb_iop_reply_header reply_header;
        CORBA_boolean ret;
        forb_peer_t *peer;
+       fosa_abs_time_t timeout;
        
        reply_header.request_id = request_id;
        reply_header.flags = 0;
-       if (forb_exception_occured(env)) {
+       if (forb_exception_occurred(env)) {
                reply_header.flags |= forb_iop_FLAG_EXCEPTION;
-               CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
-                                       forb_iop_REPLY_HEADER_SIZE);
+               FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
+                                            forb_iop_REPLY_HEADER_SIZE);
                forb_exception_serialize(codec, env);
        }
+       /* All headers must be 8 byte aligned so align the length of
+        * this message */
+       if (!FORB_CDR_put_align(codec, 8)) {
+               ul_logerr("Not enough space for tail align\n");
+               return;         /* FIXME: handle error (goto above)*/
+       }
        /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
         * header is shorter. We want reply data to be 8 byte
         * aligned */
-       ret = CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
+       ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
        if (!ret) {
                goto err;
        }
@@ -197,10 +275,20 @@ forb_iop_send_reply(forb_t *forb,
        }
        forb_iop_prepend_message_header(codec, forb_iop_REPLY);
 
-       peer = forb_get_next_hop(forb, dest);
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
+
+       peer = forb_get_next_hop(forb, dest, &timeout);
        if (!peer) {
-               ul_logerr("Reply destination not found\n");
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logerr("Reply destination not found: %s\n", str);
                goto err;
+       } else {
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
        }
        forb_proto_send(peer, codec);
        forb_peer_put(peer);
@@ -208,9 +296,8 @@ err:
        ;
 }
 
-
 static void
-process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
+process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
 {
        forb_iop_request_header request_header;
        CORBA_boolean ret;
@@ -218,26 +305,27 @@ process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
        size_t n;
        forb_t *forb = port->forb;
        struct forb_env env;
-       CDR_Codec reply_codec;
+       FORB_CDR_Codec reply_codec;
        forb_exec_req_t *exec_req;
        uint32_t req_size, data_size, header_size;
        char str[32];
+       forb_peer_t *peer;
 
-       data_size = CDR_data_size(codec);
+       data_size = FORB_CDR_data_size(codec);
        ret = forb_iop_request_header_deserialize(codec, &request_header);
        if (!ret) {
                ul_logerr("Malformed request recevied\n");
                env.major = FORB_EX_COMM_FAILURE;
                goto out;
        }
-       ret = CDR_get_align(codec, 8);
+       ret = FORB_CDR_get_align(codec, 8);
        if (!ret) {
                ul_logerr("Malformed request recevied\n");
                env.major = FORB_EX_COMM_FAILURE;
                goto out;
        }
 
-       header_size = data_size - CDR_data_size(codec);
+       header_size = data_size - FORB_CDR_data_size(codec);
        req_size = message_size - header_size;
 
        ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
@@ -245,6 +333,26 @@ process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
                  request_header.request_id,
                  request_header.iface,
                  request_header.method_index);
+
+       if (port->new_peer) {
+               /* Request from a new peer was reported by the underlaying protocol */
+               peer = forb_peer_find(port->forb, &request_header.source);
+               if (peer) {
+                       /* We already know this peer */
+                       /* TODO: Can it be in FORB_PEER_WANTED state?
+                        * If yes, we cannot simply igore this. */
+                       ul_logmsg("new_peer was already known\n");
+                       forb_peer_put(port->new_peer);
+                       port->new_peer = NULL;
+               } else {
+                       ul_logdeb("discovered new_peer from incomming connection\n");
+                       peer = port->new_peer;
+                       port->new_peer = NULL;
+                       forb_new_peer_discovered(port, peer, request_header.source,
+                                                peer->addr, peer->orb_id);
+               }
+               
+       }
        
        obj = forb_key_to_object(forb, request_header.objkey);
        if (!obj) {
@@ -254,11 +362,15 @@ process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
        }
 
        n = strlen(request_header.iface);
-       if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
+       ret = strncmp(request_header.iface, obj->interface->name, n);
+       forb_free(request_header.iface);
+       request_header.iface = NULL;
+       if (ret != 0) {
                env.major = FORB_EX_INV_OBJREF;
                ul_logerr("Object reference has incorrect type\n");
                goto send_execption;
        }
+       
        if (request_header.method_index >= obj->interface->num_methods) {
                env.major = FORB_EX_INV_IDENT;
                ul_logerr("To high method number\n");
@@ -275,22 +387,25 @@ process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
        /* Enqueue execution request */
        exec_req = forb_malloc(sizeof(*exec_req));
        if (exec_req) {
+               memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_REMOTE; 
                exec_req->request_id = request_header.request_id;
                exec_req->source = request_header.source;
                exec_req->obj = obj;
                exec_req->method_index = request_header.method_index;
                /* Copy the request to exec_req */
-               CDR_codec_init_static(&exec_req->codec);
-               ret = CDR_buffer_init(&exec_req->codec,
+               FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
+               ret = FORB_CDR_buffer_init(&exec_req->codec,
                                      req_size,
                                      0);
                if (!ret) {
                        env.major = FORB_EX_NO_MEMORY;
-                       ul_logerr("No memory for executor request bufer\n");
+                       ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
+                                 req_size, header_size);
                        goto send_execption;
                }
                exec_req->codec.data_endian = codec->data_endian;
-               CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
+               FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
                /* TODO: Use better data structure for incomming
                   buffer to achieve zero-copy behaviour. */
                forb_exec_req_ins_tail(obj->executor, exec_req);
@@ -304,8 +419,8 @@ out:
        return;
        
 send_execption:
-       CDR_codec_init_static(&reply_codec);    
-       ret = CDR_buffer_init(&reply_codec, 4096,
+       FORB_CDR_codec_init_static(&reply_codec, codec->orb);   
+       ret = FORB_CDR_buffer_init(&reply_codec, 4096,
                              forb_iop_MESSAGE_HEADER_SIZE +
                              forb_iop_REPLY_HEADER_SIZE);
        if (!ret) {
@@ -313,12 +428,14 @@ send_execption:
                return;
        }
 
-       forb_iop_send_reply(port->forb, &request_header.source, &reply_codec, request_header.request_id, &env);
-       CDR_codec_release_buffer(&reply_codec);
+       forb_iop_send_reply(port->forb, &request_header.source,
+                           &reply_codec, request_header.request_id, &env);
+       FORB_CDR_codec_release_buffer(&reply_codec);
+       /* TODO: relese exec_req etc. */
 }
 
 static void
-process_reply(forb_port_t *port, CDR_Codec *codec)
+process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
 {
        forb_iop_reply_header rh;
        forb_t *forb = port->forb;
@@ -326,21 +443,26 @@ process_reply(forb_port_t *port, CDR_Codec *codec)
 
        forb_iop_reply_header_deserialize(codec, &rh);
        /* Reply data are 8 byte aligned */
-       CDR_get_align(codec, 8);
+       FORB_CDR_get_align(codec, 8);
        ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
        req = forb_request_find(forb, &rh.request_id);
        if (!req) {
                ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
                return;
        }
+       forb_request_delete(forb, req); /* Deregister request from forb */
+
        if (rh.flags & forb_iop_FLAG_EXCEPTION) {
                forb_exception_deserialize(codec, req->env);
        } else {
                req->cdr_reply = codec;
        }
+
+       /* Tell the stub where to signal that reply processing is
+        * finished */
        req->reply_processed = &port->reply_processed;
 
-       /* Resume the stub witing in forb_wait_for_reply() */
+       /* Resume the stub waiting in forb_wait_for_reply() */
        forb_syncobj_signal(&req->reply_ready);
 
        /* Wait for stub to process the results from the codec's buffer */
@@ -359,52 +481,70 @@ process_reply(forb_port_t *port, CDR_Codec *codec)
  * @param codec Buffer with the hello message
  */
 static void
-process_hello(forb_port_t *port, CDR_Codec *codec)
+process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
 {
        forb_server_id server_id;
        void *addr = NULL;
        forb_peer_t *peer;
        forb_t *forb = port->forb;
+       CORBA_string peer_orb_id = NULL;
 
 /*     printf("Hello received at port %p\n", port); */
 
        forb_server_id_deserialize(codec, &server_id);
-       {
-               char str[60];
-               ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
+       if (port->desc.proto->deserialize_addr) {
+               port->desc.proto->deserialize_addr(codec, &addr);
        }
-       if (port->proto->deserialize_addr) {
-               port->proto->deserialize_addr(codec, &addr);
+       {
+               char str[60], addrstr[60];
+               if (port->desc.proto->addr2str) {
+                       port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
+               } else
+                       addrstr[0] = 0;
+               ul_logdeb("rcvd hello from %s (addr %s)\n",
+                         forb_server_id_to_string(str, &server_id, sizeof(str)),
+                         addrstr);
        }
+       CORBA_string_deserialize(codec, &peer_orb_id);
        if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
                peer = forb_peer_find(forb, &server_id);
-               if (peer) {
+               if (peer && peer->state == FORB_PEER_DISCOVERED) {
                        /* TODO: Update last hello receive time */
-                       forb_peer_put(peer);
-               } else {
-                       /* New peer discovered */
-/*                     char str[100]; */
-/*                     printf("New peer %s discovered port %p\n", */
-/*                            forb_server_id_to_string(str, &server_id, sizeof(str)), port); */
-                       peer = forb_peer_new();
-                       if (peer) {
-                               peer->server_id = server_id;
-                               peer->port = port;
-                               peer->addr = addr;
-                               forb_peer_insert(forb, peer);
+                       if (port->new_peer) {
+                               ul_logdeb("peer already discovered but not connected - replacing\n");
+                               forb_peer_disconnected(peer);
+                               forb_peer_put(peer);
+                               peer = port->new_peer;
+                               port->new_peer = NULL;
+                               forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
+                       } else {
+                               ul_logdeb("peer already discovered - ignoring\n");
+                               if (addr)
+                                       forb_free(addr);
+                               if (peer_orb_id)
+                                       forb_free(peer_orb_id);
                                forb_peer_put(peer);
                        }
-                       /* Broadcast our hello packet now */
-                       forb_syncobj_signal(&port->hello);
+               } else {
+                       if (port->new_peer) {
+                               if (peer) {
+                                       ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
+                                       forb_peer_put(peer);
+                               }
+                               peer = port->new_peer;
+                               port->new_peer = NULL;
+                       }
+
+                       forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
                }
        }
 }
 
 static void
 process_message(forb_port_t *port, const forb_iop_message_header *mh,
-               CDR_Codec *codec)
+               FORB_CDR_Codec *codec)
 {
-       CORBA_long data_size = CDR_data_size(codec);
+       CORBA_long data_size = FORB_CDR_data_size(codec);
        /* TODO: Check destination address, whether the message is for
         * us or should be routed. */
 
@@ -424,10 +564,21 @@ process_message(forb_port_t *port, const forb_iop_message_header *mh,
                        ul_logmsg("rcvd unknown message type\n");
                        break;
        }
-       if (CDR_data_size(codec) != data_size - mh->message_size) {
-               ul_logerr("Message of type %d handled incorrectly (size=%d, processed=%d)\n",
-                         mh->message_type, mh->message_size,
-                         data_size - CDR_data_size(codec));
+       if (port->new_peer) {
+               /* If for some reaseon the new_peer was not processed so free it here. */
+               ul_logmsg("Forgotten new_peer\n");
+               forb_peer_put(port->new_peer);
+               port->new_peer = NULL;
+       }
+
+       FORB_CDR_get_align(codec, 8);
+
+       if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
+               size_t processed = data_size - FORB_CDR_data_size(codec);
+               ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%zu); fixing\n",
+                         mh->message_type, mh->message_size, processed);
+               ;
+               codec->rptr += mh->message_size - processed;
        }
 }
 
@@ -441,16 +592,17 @@ process_message(forb_port_t *port, const forb_iop_message_header *mh,
 void *forb_iop_receiver_thread(void *arg)
 {
        forb_port_t *port = arg;
-       const forb_proto_t *proto = port->proto;
-       CDR_Codec *c = &port->codec;
-       size_t rcvd, len;
+       const forb_proto_t *proto = port->desc.proto;
+       FORB_CDR_Codec *c = &port->codec;
+       ssize_t rcvd;
+       size_t len;
        forb_iop_message_header mh;
        bool header_received = false;
 
        while (!port->finish) {
                if (c->rptr == c->wptr) {
                        /* The buffer is empty now - start writing from begining*/
-                       CDR_buffer_reset(c, 0);
+                       FORB_CDR_buffer_reset(c, 0);
                }
                /* TODO: If there is not enough space for reception,
                 * we should shift the already received data to the
@@ -458,12 +610,16 @@ void *forb_iop_receiver_thread(void *arg)
                rcvd = proto->recv(port,
                                   &c->buffer[c->wptr],
                                   c->wptr_max - c->wptr);
+               if (rcvd < 0) {
+                       ul_logerr("recv returned error %zd (%s), exiting\n", rcvd, strerror(errno));
+                       return NULL;
+               }
                c->wptr += rcvd;
                 c->wptr_last = c->wptr;
 
                /* While there are some data in the buffer, process them. */
-               while (CDR_data_size(c) > 0) {
-                       len = CDR_data_size(c);
+               while (FORB_CDR_data_size(c) > 0) {
+                       len = FORB_CDR_data_size(c);
                        /* Wait for and then process message header */
                        if (!header_received) {
                                if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
@@ -471,7 +627,11 @@ void *forb_iop_receiver_thread(void *arg)
                                                ul_logerr("Header doesn't start at 8 byte bounday\n");
                                        }
                                        header_received = forb_iop_process_message_header(&mh, c);
-                                       len = CDR_data_size(c);
+                                       if (!header_received) {
+                                               ul_logerr("Wrong header received\n");
+                                               /* TODO: We should probably reset the buffer here */
+                                       }
+                                       len = FORB_CDR_data_size(c);
                                } else {
                                        break; /* Wait for more data to arrive*/
                                }
@@ -492,6 +652,14 @@ void *forb_iop_receiver_thread(void *arg)
        return NULL;
 }
 
+static void
+discovery_cleanup(void *codec)
+{
+       FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
+       /* TODO: Broadcast some kind of bye bye message */
+}
+
+
 /** 
  * Thread run for every port to broadcast HELLO messages. These
  * messages are used for a FORB to discover all peers (and in future
@@ -504,15 +672,17 @@ void *forb_iop_receiver_thread(void *arg)
 void *forb_iop_discovery_thread(void *arg)
 {
        forb_port_t *port = arg;
-       const forb_proto_t *proto = port->proto;
-       CDR_Codec codec;
+       const forb_proto_t *proto = port->desc.proto;
+       FORB_CDR_Codec codec;
        fosa_abs_time_t hello_time;
        fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
        int ret;
-       
-       CDR_codec_init_static(&codec);
-       CDR_buffer_init(&codec, 1024, 0);
 
+       FORB_CDR_codec_init_static(&codec, port->forb->orb);
+       FORB_CDR_buffer_init(&codec, 1024, 0);
+
+       pthread_cleanup_push(discovery_cleanup, &codec);
+       
        /* Next hello interval is now */
        fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
        
@@ -524,19 +694,120 @@ void *forb_iop_discovery_thread(void *arg)
                if (ret == FOSA_ETIMEDOUT) {
                        hello_time = fosa_abs_time_incr(hello_time, hello_interval);
                } else if (ret != 0) {
-                       ul_logerr("hello syncobj error\n");
+                       ul_logerr("hello syncobj error: %s\n", strerror(ret));
                }
 
-               CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
-               forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
-                                      proto->serialize_addr);
+               if (port->finish) break;
+
+               FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
+               forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
+                                      proto->serialize_addr, port->forb->attr.orb_id);
 /*             printf("Broadcasting hello from port %p\n", port);  */
                proto->broadcast(port, &codec.buffer[codec.rptr],
-                                CDR_data_size(&codec));
-/*             } */
+                                FORB_CDR_data_size(&codec));
        }
-       CDR_codec_release_buffer(&codec);
+
+       pthread_cleanup_pop(1);
        return NULL;
 }
 
+/** 
+ * Sends REQUEST message to another FORB.
+ *
+ * The request @a req has to be prepared by
+ * forb_iop_prepare_request(). Then, this function adds a message
+ * header, connects to the destination FORB and sends the request.
+ *
+ * If no exception is reported, then the caller must wait for response
+ * by calling forb_wait_for_reply().
+ * 
+ * @param req A request prepared by forb_iop_prepare_request()
+ * @param env Environment for returning exceptions
+ */
+void
+forb_request_send(forb_request_t *req, unsigned index, CORBA_Environment *env)
+{
+       CORBA_boolean ret;
+       forb_peer_t *peer;
+       ssize_t size;
+       size_t len;
+       fosa_abs_time_t timeout;
+       forb_t *forb = forb_object_to_forb(req->obj);
+       forb_exec_req_t *exec_req;
+
+       if (!forb) {
+               env->major = FORB_EX_INTERNAL;
+               return;
+       }
+
+       req->env = env;     /* Remember, where to return exceptions */
+
+       /* All headers must be 8 byte aligned so align the length of
+        * this message */
+       if (!FORB_CDR_put_align(&req->cdr_request, 8)) {
+               env->major = FORB_EX_INTERNAL;
+               ul_logerr("Not enough space for tail align\n");
+               return;
+       }
+
+       /* Local invocation case, destination of a message is only 
+        * a different executor thread */
+       if (forb_object_is_local(req->obj)) {
+               exec_req = forb_malloc(sizeof(*exec_req));
+               memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_LOCAL; 
+               exec_req->input_request = req;
+               exec_req->obj = forb_object_duplicate(req->obj);
+               exec_req->method_index = req->method_ind;
+               exec_req->interface = req->interface;
+               req->cdr_request.rptr = index;
+               exec_req->codec = req->cdr_request;
+               req->cdr_request.release_buffer = CORBA_FALSE;
+               exec_req->request_id = req->request_id;
+               forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
+               return;
+       }
+
+       ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
+       if (!ret) {
+               /* This should never happen */
+               env->major = FORB_EX_INTERNAL;
+               return;
+       }
+
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
+       peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
+       if (!peer) {
+               char str[50];
+               ul_logerr("Cannot find peer to send request for server %s\n",
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+               env->major = FORB_EX_COMM_FAILURE;
+               return;
+       }
+       /* Register the request with forb so we can match incomming
+        * reply to this request. */
+       ret = forb_request_insert(forb, req);
+       if (ret <= 0) {
+               ul_logerr("Insert request error %d\n", ret);
+               env->major = FORB_EX_INTERNAL;
+               goto err_peer_put;
+       }
 
+       {
+               char str[50];
+               ul_logdeb("sending request: id=%d  dest=%s\n", req->request_id,
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+       }
+       len = FORB_CDR_data_size(&req->cdr_request);
+       fosa_mutex_lock(&peer->send_lock);
+       size = forb_proto_send(peer, &req->cdr_request);
+       fosa_mutex_unlock(&peer->send_lock);
+       if (size <= 0 || size != len) {
+               env->major = FORB_EX_COMM_FAILURE;
+               /* Request is deleted when the stub calls forb_request_destroy() */
+       }
+ err_peer_put:
+       forb_peer_put(peer);
+}