]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/iop.c
forb: Set cancelation points in receiver thread
[frescor/forb.git] / src / iop.c
index c0434639f79881f33cd7aa4b35a607ad1434da5e..5a937aa344137a9afcb3f71e7c39a4325692e814 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -91,17 +91,15 @@ forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
 
 CORBA_boolean
 forb_iop_prepare_request(forb_request_t *req,
-                        char *iface,
-                        unsigned method_ind,
                         CORBA_Environment *env)
 {
        CORBA_boolean ret;
        forb_iop_request_header rh;
 
        rh.request_id = req->request_id;
-       rh.iface = iface;
+       rh.iface = req->interface;
        rh.objkey = forb_object_to_key(req->obj);
-       rh.method_index = method_ind;
+       rh.method_index = req->method_ind;
        rh.source = forb_object_to_forb(req->obj)->server_id;
        ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
        if (ret) {
@@ -110,8 +108,9 @@ forb_iop_prepare_request(forb_request_t *req,
                char str[50];
                ul_logdeb("preparing request: id=%d  dest=%s  iface=%s method=%d\n", req->request_id,
                          forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
-                         iface, method_ind);
+                         rh.iface, rh.method_index);
        }
+       req->end_of_header_index = req->cdr_request.wptr;
        return ret;
 }
 
@@ -160,6 +159,35 @@ free:
        return ret;
 }
 
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
+{
+       forb_port_t *port = peer->port;
+       FORB_CDR_Codec codec;
+       int ret;
+       FORB_CDR_codec_init_static(&codec, port->forb->orb);
+       if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+               return -1;              
+       if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+               ret = -1;
+               goto free;
+       }
+       if (!forb_iop_prepare_hello(&codec, &peer->server_id,
+                                   peer->addr,
+                                   peer->port->desc.proto->serialize_addr,
+                                   peer->orb_id)) {
+               ret = -1;
+               goto free;
+       }
+       ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
+                 ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
+       ret = forb_proto_send(dest, &codec);
+       if (ret > 0) ret = 0;
+free:
+       FORB_CDR_codec_release_buffer(&codec);
+       return ret;
+}
+
 bool
 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
 {
@@ -359,6 +387,7 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
        exec_req = forb_malloc(sizeof(*exec_req));
        if (exec_req) {
                memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_REMOTE; 
                exec_req->request_id = request_header.request_id;
                exec_req->source = request_header.source;
                exec_req->obj = obj;
@@ -480,15 +509,27 @@ process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
                peer = forb_peer_find(forb, &server_id);
                if (peer && peer->state == FORB_PEER_DISCOVERED) {
                        /* TODO: Update last hello receive time */
-                       if (addr)
-                               forb_free(addr);
-                       if (peer_orb_id)
-                               forb_free(peer_orb_id);
-                       forb_peer_put(peer);
+                       if (port->new_peer) {
+                               ul_logdeb("peer already discovered but not connected - replacing\n");
+                               forb_peer_disconnected(peer);
+                               forb_peer_put(peer);
+                               peer = port->new_peer;
+                               port->new_peer = NULL;
+                               forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
+                       } else {
+                               ul_logdeb("peer already discovered - ignoring\n");
+                               if (addr)
+                                       forb_free(addr);
+                               if (peer_orb_id)
+                                       forb_free(peer_orb_id);
+                               forb_peer_put(peer);
+                       }
                } else {
                        if (port->new_peer) {
-                               if (peer)
+                               if (peer) {
                                        ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
+                                       forb_peer_put(peer);
+                               }
                                peer = port->new_peer;
                                port->new_peer = NULL;
                        }
@@ -533,7 +574,7 @@ process_message(forb_port_t *port, const forb_iop_message_header *mh,
 
        if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
                size_t processed = data_size - FORB_CDR_data_size(codec);
-               ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%d); fixing\n",
+               ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%zu); fixing\n",
                          mh->message_type, mh->message_size, processed);
                ;
                codec->rptr += mh->message_size - processed;
@@ -556,12 +597,15 @@ void *forb_iop_receiver_thread(void *arg)
        size_t len;
        forb_iop_message_header mh;
        bool header_received = false;
+       int oldstate;
 
        while (!port->finish) {
                if (c->rptr == c->wptr) {
                        /* The buffer is empty now - start writing from begining*/
                        FORB_CDR_buffer_reset(c, 0);
                }
+
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
                /* TODO: If there is not enough space for reception,
                 * we should shift the already received data to the
                 * beginning of the buffer. */
@@ -569,16 +613,18 @@ void *forb_iop_receiver_thread(void *arg)
                                   &c->buffer[c->wptr],
                                   c->wptr_max - c->wptr);
                if (rcvd < 0) {
-                       ul_logerr("recv returned error %d (%s), exiting\n", rcvd, strerror(errno));
+                       ul_logerr("recv returned error %zd (%s), exiting\n", rcvd, strerror(errno));
                        return NULL;
                }
                c->wptr += rcvd;
                 c->wptr_last = c->wptr;
 
+               pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
+
                /* While there are some data in the buffer, process them. */
                while (FORB_CDR_data_size(c) > 0) {
                        len = FORB_CDR_data_size(c);
-                       /* Wait for and then process message header */
+                       /* Find and process message header (if there is any) */
                        if (!header_received) {
                                if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
                                        if (c->rptr % 8 != 0) {
@@ -595,7 +641,7 @@ void *forb_iop_receiver_thread(void *arg)
                                }
                        }
                        
-                       /* Wait for and then process the message body */
+                       /* If the whole message is received, then process it */
                        if (header_received) {
                                if (len >= mh.message_size) {
                                        process_message(port, &mh, c);
@@ -670,11 +716,13 @@ void *forb_iop_discovery_thread(void *arg)
 }
 
 /** 
- * Sends REQUEST message to another FORB.
+ * Sends REQUEST to another object.
  *
- * The request @a req has to be prepared by
- * forb_iop_prepare_request(). Then, this function adds a message
- * header, connects to the destination FORB and sends the request.
+ * The request @a req has to be prepared previously by calling
+ * forb_iop_prepare_request(). Then, when the request destination is
+ * remote, this function adds a message header, connects to the
+ * destination FORB and sends the request. In the case of local
+ * request, it is directly enqueued into the executor's queue.
  *
  * If no exception is reported, then the caller must wait for response
  * by calling forb_wait_for_reply().
@@ -691,6 +739,7 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
        size_t len;
        fosa_abs_time_t timeout;
        forb_t *forb = forb_object_to_forb(req->obj);
+       forb_exec_req_t *exec_req;
 
        if (!forb) {
                env->major = FORB_EX_INTERNAL;
@@ -707,6 +756,24 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
                return;
        }
 
+       /* Local invocation case, destination of a message is only 
+        * a different executor thread */
+       if (forb_object_is_local(req->obj)) {
+               exec_req = forb_malloc(sizeof(*exec_req));
+               memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_LOCAL; 
+               exec_req->input_request = req;
+               exec_req->obj = forb_object_duplicate(req->obj);
+               exec_req->method_index = req->method_ind;
+               exec_req->interface = req->interface;
+               req->cdr_request.rptr = req->end_of_header_index;
+               exec_req->codec = req->cdr_request;
+               req->cdr_request.release_buffer = CORBA_FALSE;
+               exec_req->request_id = req->request_id;
+               forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
+               return;
+       }
+
        ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
        if (!ret) {
                /* This should never happen */