]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/iop.c
Add more debug messages
[frescor/forb.git] / src / iop.c
index b88fdd255cf0e7ee13f096d83c2d62322b30f564..4aa81319c96a4ed2b2599d88c38780fc9b1c8655 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -61,6 +61,7 @@
 #include "exec_req.h"
 #include <errno.h>
 #include "peer.h"
+#include "discovery.h"
 
 /** Version of the protocol */
 #define VER_MAJOR 0
@@ -110,16 +111,18 @@ forb_iop_prepare_request(forb_request_t *req,
        return ret;
 }
 
-CORBA_boolean
+static CORBA_boolean
 forb_iop_prepare_hello(FORB_CDR_Codec *codec,
                       const forb_server_id *server_id,
                       const void *src_addr,
-                      CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr))
+                      CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
+                      const CORBA_string orb_id)
 {
        if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
        if (serialize_addr) {
                if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
        }
+       if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
        if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
        return CORBA_TRUE;
 }
@@ -206,8 +209,14 @@ forb_iop_send_reply(forb_t *forb,
 
        peer = forb_get_next_hop(forb, dest, &timeout);
        if (!peer) {
-               ul_logerr("Reply destination not found\n");
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logerr("Reply destination not found: %s\n", str);
                goto err;
+       } else {
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
        }
        forb_proto_send(peer, codec);
        forb_peer_put(peer);
@@ -215,7 +224,6 @@ err:
        ;
 }
 
-
 static void
 process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
 {
@@ -229,6 +237,7 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
        forb_exec_req_t *exec_req;
        uint32_t req_size, data_size, header_size;
        char str[32];
+       forb_peer_t *peer;
 
        data_size = FORB_CDR_data_size(codec);
        ret = forb_iop_request_header_deserialize(codec, &request_header);
@@ -252,6 +261,26 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
                  request_header.request_id,
                  request_header.iface,
                  request_header.method_index);
+
+       if (port->new_peer) {
+               /* Request from a new peer was reported by the underlaying protocol */
+               peer = forb_peer_find(port->forb, &request_header.source);
+               if (peer) {
+                       /* We already know this peer */
+                       /* TODO: Can it be in FORB_PEER_WANTED state?
+                        * If yes, we cannot simply igore this. */
+                       ul_logmsg("new_peer was already known\n");
+                       forb_peer_put(port->new_peer);
+                       port->new_peer = NULL;
+               } else {
+                       ul_logdeb("discovered new_peer from incomming connection\n");
+                       peer = port->new_peer;
+                       port->new_peer = NULL;
+                       forb_new_peer_discovered(port, peer, request_header.source,
+                                                peer->addr, peer->orb_id);
+               }
+               
+       }
        
        obj = forb_key_to_object(forb, request_header.objkey);
        if (!obj) {
@@ -261,11 +290,15 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
        }
 
        n = strlen(request_header.iface);
-       if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
+       ret = strncmp(request_header.iface, obj->interface->name, n);
+       forb_free(request_header.iface);
+       request_header.iface = NULL;
+       if (ret != 0) {
                env.major = FORB_EX_INV_OBJREF;
                ul_logerr("Object reference has incorrect type\n");
                goto send_execption;
        }
+       
        if (request_header.method_index >= obj->interface->num_methods) {
                env.major = FORB_EX_INV_IDENT;
                ul_logerr("To high method number\n");
@@ -282,6 +315,7 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
        /* Enqueue execution request */
        exec_req = forb_malloc(sizeof(*exec_req));
        if (exec_req) {
+               memset(exec_req, 0, sizeof(exec_req));
                exec_req->request_id = request_header.request_id;
                exec_req->source = request_header.source;
                exec_req->obj = obj;
@@ -324,6 +358,7 @@ send_execption:
        forb_iop_send_reply(port->forb, &request_header.source,
                            &reply_codec, request_header.request_id, &env);
        FORB_CDR_codec_release_buffer(&reply_codec);
+       /* TODO: relese exec_req etc. */
 }
 
 static void
@@ -379,49 +414,36 @@ process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
        void *addr = NULL;
        forb_peer_t *peer;
        forb_t *forb = port->forb;
+       CORBA_string peer_orb_id = NULL;
 
 /*     printf("Hello received at port %p\n", port); */
 
        forb_server_id_deserialize(codec, &server_id);
-       {
-               char str[60];
-               ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
-       }
        if (port->desc.proto->deserialize_addr) {
                port->desc.proto->deserialize_addr(codec, &addr);
        }
+       {
+               char str[60], addrstr[60];
+               if (port->desc.proto->addr2str) {
+                       port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
+               } else
+                       addrstr[0] = 0;
+               ul_logdeb("rcvd hello from %s (addr %s)\n",
+                         forb_server_id_to_string(str, &server_id, sizeof(str)),
+                         addrstr);
+       }
+       CORBA_string_deserialize(codec, &peer_orb_id);
        if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
                peer = forb_peer_find(forb, &server_id);
                if (peer && peer->state == FORB_PEER_DISCOVERED) {
                        /* TODO: Update last hello receive time */
                        if (addr)
                                forb_free(addr);
+                       if (peer_orb_id)
+                               forb_free(peer_orb_id);
                        forb_peer_put(peer);
                } else {
-                       /* New peer discovered */
-                       bool notify_waiters = false;
-                       if (peer /* && peer->state == FORB_PEER_WANTED */) {
-                               notify_waiters = true;
-                       } else {
-                               peer = forb_peer_new();
-                       }
-                       if (peer) {
-                               fosa_mutex_lock(&forb->peer_mutex);
-                               peer->server_id = server_id;
-                               peer->port = port;
-                               peer->addr = addr;
-                               peer->state = FORB_PEER_DISCOVERED;
-                               if (notify_waiters) {
-                                       fosa_cond_broadcast(&peer->cond);
-                               } else {
-                                       forb_peer_nolock_insert(forb, forb_peer_get(peer));
-                               }
-                               forb_port_peer_ins_tail(port, forb_peer_get(peer));
-                               fosa_mutex_unlock(&forb->peer_mutex);
-                               forb_peer_put(peer);
-                       }
-                       /* Broadcast our hello packet now */
-                       forb_syncobj_signal(&port->hello);
+                       forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
                }
        }
 }
@@ -450,11 +472,17 @@ process_message(forb_port_t *port, const forb_iop_message_header *mh,
                        ul_logmsg("rcvd unknown message type\n");
                        break;
        }
+       if (port->new_peer) {
+               /* If for some reaseon the new_peer was not processed so free it here. */
+               ul_logmsg("Forgotten new_peer\n");
+               forb_peer_put(port->new_peer);
+               port->new_peer = NULL;
+       }
        if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
                size_t processed = data_size - FORB_CDR_data_size(codec);
                ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%d); fixing\n",
                          mh->message_type, mh->message_size, processed);
-                         ;
+               ;
                codec->rptr += mh->message_size - processed;
        }
 }
@@ -573,7 +601,7 @@ void *forb_iop_discovery_thread(void *arg)
 
                FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
                forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
-                                      proto->serialize_addr);
+                                      proto->serialize_addr, port->forb->attr.orb_id);
 /*             printf("Broadcasting hello from port %p\n", port);  */
                proto->broadcast(port, &codec.buffer[codec.rptr],
                                 FORB_CDR_data_size(&codec));
@@ -582,5 +610,3 @@ void *forb_iop_discovery_thread(void *arg)
        pthread_cleanup_pop(1);
        return NULL;
 }
-
-