]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - proto.c
Thread synchronization reworked to used syncobj inspired by DTM
[frescor/forb.git] / proto.c
diff --git a/proto.c b/proto.c
index 507d89142922fe05e69cfeecbfbccc4c8b3b6d54..2151640dd16dd143dbd23cad0a11349d1cecaa33 100644 (file)
--- a/proto.c
+++ b/proto.c
@@ -130,22 +130,18 @@ process_reply(forb_port_t *port, CDR_Codec *codec)
                ul_logerr("Received reply to unknown request_id %ud\n", rh.request_id);
                return;
        }
-       fosa_mutex_lock(&req->mutex);
        if (rh.flags & forb_iop_FLAG_EXCEPTION) {
                forb_exception_deserialize(codec, req->env);
        } else {
-               req->reply = codec;
+               req->cdr_reply = codec;
        }
-       req->fin_mutex = &port->fin_mutex;
-       req->fin_cond = &port->fin_cond;
-       
-       fosa_cond_signal(&req->cond);
-       fosa_mutex_unlock(&req->mutex);
+       req->reply_processed = &port->reply_processed;
+
+       /* Resume the stub witing in forb_wait_for_reply() */
+       forb_syncobj_signal(&req->reply_ready);
 
        /* Wait for stub to process the results from the codec's buffer */
-       fosa_mutex_lock(&port->fin_mutex);
-       fosa_cond_wait(&port->fin_cond, &port->fin_mutex);
-       fosa_mutex_unlock(&port->fin_mutex);
+       forb_syncobj_wait(&port->reply_processed);
 }
 
 /** 
@@ -188,9 +184,8 @@ process_hello(forb_port_t *port, CDR_Codec *codec)
                        peer->addr = addr;
                        forb_peer_insert(forb, peer);
 
-                       fosa_mutex_lock(&port->hello_mutex);
-                       fosa_cond_signal(&port->hello_cond);
-                       fosa_mutex_unlock(&port->hello_mutex);
+                       /* Broadcast our hello packet now */
+                       forb_syncobj_signal(&port->hello);
                }
        }
 }
@@ -308,13 +303,10 @@ static void *port_discovery_thread(void *arg)
                                 CDR_data_size(&codec));
 
                /* Wait for next hello interval or until somebody
-                * signal us. */
+                * signals us. */
                fosa_abs_time_incr(hello_time, hello_interval);
                /* sem_timedwait would be more appropriate */
-               fosa_mutex_lock(&port->hello_mutex);
-               fosa_cond_timedwait(&port->hello_cond, &port->hello_mutex,
-                                   &hello_time);
-               fosa_mutex_unlock(&port->hello_mutex);
+               forb_syncobj_timedwait(&port->hello, &hello_time);
        }
        CDR_codec_release_buffer(&codec);
        return NULL;
@@ -343,11 +335,8 @@ int forb_register_port(forb_t *forb, forb_port_t *port)
        forb_port_insert(forb, port);
        fosa_mutex_unlock(&forb->port_mutex);
 
-       fosa_mutex_init(&port->hello_mutex, 0);
-       fosa_cond_init(&port->hello_cond);
-
-       fosa_mutex_init(&port->fin_mutex, 0);
-       fosa_cond_init(&port->fin_cond);
+       forb_syncobj_init(&port->hello, 0);
+       forb_syncobj_init(&port->reply_processed, 0);
 
        CDR_codec_init_static(&port->codec);
        if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
@@ -459,22 +448,28 @@ GAVL_CUST_NODE_INT_IMP(forb_request_nolock /* cust_prefix */,
  * 
  * @param req Request structure to initialize
  * @param obj Destination object
+ *
+ * @return Zero on success, FOSA error code on error.
  */
-void
+int
 forb_request_init(forb_request_t *req, forb_object obj)
 {
+       int ret = 0;
+       
        forb_t *forb = forb_obj_to_forb(obj);
        req->obj = obj;
        fosa_mutex_lock(&forb->id_mutex);
        req->request_id = forb_data(obj->orb)->request_id++;
        fosa_mutex_unlock(&forb->id_mutex);
+
+       ret = forb_syncobj_init(&req->reply_ready, 0);
+       return ret;
 }
 
 void
 forb_request_destroy(forb_request_t *req)
 {
-       fosa_cond_destroy(&req->cond);
-       fosa_mutex_destroy(&req->mutex);
+       forb_syncobj_destroy(&req->reply_ready);
 }
 
 
@@ -503,7 +498,10 @@ forb_send_request(forb_request_t *req, CORBA_Environment *env)
                env->major = FORB_EX_INTERNAL;
                return;
        }
-       ret = forb_iop_prepend_message_header(req->codec, forb_iop_REQUEST);
+
+       req->env = env;     /* Remember, where to return exceptions */
+
+       ret = forb_iop_prepend_message_header(req->cdr_request, forb_iop_REQUEST);
        if (!ret) {
                /* This should never happen */
                env->major = FORB_EX_INTERNAL;
@@ -515,22 +513,11 @@ forb_send_request(forb_request_t *req, CORBA_Environment *env)
                return;
        }
        
-       iret = fosa_mutex_init(&req->mutex, 0);
-       if (iret != 0) {
-               env->major = FORB_EX_INTERNAL;
-               goto exception;
-       }
-       iret = fosa_cond_init(&req->cond);
-       if (iret != 0) {
-               env->major = FORB_EX_INTERNAL;
-               goto exception;
-       }
        forb_request_insert(forb, req);
 
-       fosa_mutex_lock(&req->mutex); /* Unlocked in forb_wait_for_reply() */
-       size = forb_proto_send(peer, req->codec);
+       size = forb_proto_send(peer, req->cdr_request);
        if (size <= 0) {
-               fosa_mutex_unlock(&req->mutex);
+               forb_syncobj_destroy(&req->reply_ready);
                forb_request_delete(forb, req);
                env->major = FORB_EX_COMM_FAILURE;
        }
@@ -541,16 +528,12 @@ exception:
 void
 forb_wait_for_reply(forb_request_t *req, CORBA_Environment *env)
 {
-       req->env = env;         /* Tell, where to return exceptions */
-       fosa_cond_wait(&req->cond, &req->mutex); 
-       fosa_mutex_unlock(&req->mutex); /* Locked in forb_send_request() */
+       forb_syncobj_wait(&req->reply_ready); 
 }
 
 void
 forb_reply_processed(forb_request_t *req)
 {
        /* Signal, that we are done */
-       fosa_mutex_lock(req->fin_mutex);
-       fosa_cond_signal(req->fin_cond);
-       fosa_mutex_unlock(req->fin_mutex);
+       forb_syncobj_signal(req->reply_processed);
 }