]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/iop.c
forb: Set cancelation points in receiver thread
[frescor/forb.git] / src / iop.c
index 1115c1f3b064999f0ddfdc79620a6b98d47fca2b..5a937aa344137a9afcb3f71e7c39a4325692e814 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -91,7 +91,6 @@ forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
 
 CORBA_boolean
 forb_iop_prepare_request(forb_request_t *req,
-                        unsigned *index,
                         CORBA_Environment *env)
 {
        CORBA_boolean ret;
@@ -111,7 +110,7 @@ forb_iop_prepare_request(forb_request_t *req,
                          forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
                          rh.iface, rh.method_index);
        }
-       *index = req->cdr_request.wptr;
+       req->end_of_header_index = req->cdr_request.wptr;
        return ret;
 }
 
@@ -598,12 +597,15 @@ void *forb_iop_receiver_thread(void *arg)
        size_t len;
        forb_iop_message_header mh;
        bool header_received = false;
+       int oldstate;
 
        while (!port->finish) {
                if (c->rptr == c->wptr) {
                        /* The buffer is empty now - start writing from begining*/
                        FORB_CDR_buffer_reset(c, 0);
                }
+
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
                /* TODO: If there is not enough space for reception,
                 * we should shift the already received data to the
                 * beginning of the buffer. */
@@ -617,10 +619,12 @@ void *forb_iop_receiver_thread(void *arg)
                c->wptr += rcvd;
                 c->wptr_last = c->wptr;
 
+               pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
+
                /* While there are some data in the buffer, process them. */
                while (FORB_CDR_data_size(c) > 0) {
                        len = FORB_CDR_data_size(c);
-                       /* Wait for and then process message header */
+                       /* Find and process message header (if there is any) */
                        if (!header_received) {
                                if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
                                        if (c->rptr % 8 != 0) {
@@ -637,7 +641,7 @@ void *forb_iop_receiver_thread(void *arg)
                                }
                        }
                        
-                       /* Wait for and then process the message body */
+                       /* If the whole message is received, then process it */
                        if (header_received) {
                                if (len >= mh.message_size) {
                                        process_message(port, &mh, c);
@@ -712,11 +716,13 @@ void *forb_iop_discovery_thread(void *arg)
 }
 
 /** 
- * Sends REQUEST message to another FORB.
+ * Sends REQUEST to another object.
  *
- * The request @a req has to be prepared by
- * forb_iop_prepare_request(). Then, this function adds a message
- * header, connects to the destination FORB and sends the request.
+ * The request @a req has to be prepared previously by calling
+ * forb_iop_prepare_request(). Then, when the request destination is
+ * remote, this function adds a message header, connects to the
+ * destination FORB and sends the request. In the case of local
+ * request, it is directly enqueued into the executor's queue.
  *
  * If no exception is reported, then the caller must wait for response
  * by calling forb_wait_for_reply().
@@ -725,7 +731,7 @@ void *forb_iop_discovery_thread(void *arg)
  * @param env Environment for returning exceptions
  */
 void
-forb_request_send(forb_request_t *req, unsigned index, CORBA_Environment *env)
+forb_request_send(forb_request_t *req, CORBA_Environment *env)
 {
        CORBA_boolean ret;
        forb_peer_t *peer;
@@ -757,10 +763,10 @@ forb_request_send(forb_request_t *req, unsigned index, CORBA_Environment *env)
                memset(exec_req, 0, sizeof(exec_req));
                exec_req->request_type = FORB_EXEC_REQ_LOCAL; 
                exec_req->input_request = req;
-               exec_req->obj = req->obj;
+               exec_req->obj = forb_object_duplicate(req->obj);
                exec_req->method_index = req->method_ind;
                exec_req->interface = req->interface;
-               req->cdr_request.rptr = index;
+               req->cdr_request.rptr = req->end_of_header_index;
                exec_req->codec = req->cdr_request;
                req->cdr_request.release_buffer = CORBA_FALSE;
                exec_req->request_id = req->request_id;