]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Merge branch 'master' of rtime.felk.cvut.cz:/frescor/frsh-forb
authorPetr Benes <benesp16@fel.cvut.cz>
Tue, 8 Feb 2011 17:05:38 +0000 (18:05 +0100)
committerPetr Benes <benesp16@fel.cvut.cz>
Tue, 8 Feb 2011 17:05:38 +0000 (18:05 +0100)
src/exec_req.c
src/exec_req.h
src/executor.c
src/executor.h
src/iop.c
src/request.h
src/tests/executor_calls.c

index e5b9c766d47e7982d8e4131abd1ad0d373563c4a..2f84c103357c72977875b70d93b45208bd6c9a06 100644 (file)
@@ -93,11 +93,26 @@ void forb_exec_req_process(forb_exec_req_t *exec_req)
                                         &reply_codec,
                                         exec_req->obj,
                                         &env);
-       
-       forb_iop_send_reply(forb, &exec_req->source,
-                           &reply_codec,
-                           exec_req->request_id, &env);
-       FORB_CDR_codec_release_buffer(&reply_codec);
+
+       // The local invocation case
+       if ((exec_req->request_type = FORB_EXEC_REQ_LOCAL)) {
+               forb_request_t *request = exec_req->input_request;
+               forb_executor_t *executor = exec_req->obj->executor;
+
+               request->cdr_reply = &reply_codec;
+               /* Tell the stub where to signal that reply processing is
+               * finished */
+               request->reply_processed = &executor->reply_processed;          
+               // notify that the reply is ready
+               forb_syncobj_signal(&request->reply_ready);
+               /* Wait for stub to process the results from the codec's buffer */              
+               forb_syncobj_wait(&executor->reply_processed);          
+       } else {
+               forb_iop_send_reply(forb, &exec_req->source,
+                               &reply_codec,
+                               exec_req->request_id, &env);            
+       }
+       FORB_CDR_codec_release_buffer(&reply_codec);                    
        forb_exec_req_destroy(exec_req);
 }
 
index 57b17ba7da6610ed980ac10a24704b1f3f15b080..f496df9d4f89b32fd344d2895cb35c106f44a1fa 100644 (file)
 #include <forb/cdr.h>
 #include <ul_list.h>
 #include "executor.h"
+#include "request.h"
+
+/**
+ * Type of request (local/remote)
+ */
+enum forb_exec_req_type {
+       FORB_EXEC_REQ_LOCAL,            // no serialization used
+       FORB_EXEC_REQ_REMOTE
+};
 
 /**
  * Request for ::forb_executor_t.
@@ -74,6 +83,8 @@ typedef struct forb_exec_req {
        unsigned method_index;  /**< Mehotd number to be invoked on the object @a obj. */
        FORB_CDR_Codec codec;   /**< Bufffer with serialized request parameters. */
        ul_list_node_t node;    /**< Node for forb_executor_t::requests. */
+       enum forb_exec_req_type request_type; /**< Execution method. */
+       forb_request_t *input_request; /**< Input request data for the case of local invocation */
 } forb_exec_req_t;
 
 UL_LIST_CUST_DEC(forb_exec_req_nolock, /* cust_prefix */
index 9e725d012a2a7b6b72c6a7dc9f54a591b5c7b095..7af1923975a17d4e479a24ddbbb21c66ae4f1816 100644 (file)
@@ -86,6 +86,8 @@ int forb_executor_init(forb_executor_t *executor)
        if (ret) return ret;
 
        forb_exec_req_nolock_init_head(executor);
+       
+       forb_syncobj_init(&executor->reply_processed, 0);
        return 0;
 }
 
index 8f9864ebd8bb91a6132ee7f0715a9f6e8f3cf82a..2c3cdc388cd3bf28befe8abe2bb218d14260f91d 100644 (file)
@@ -59,6 +59,8 @@
 #include <fosa.h>
 #include <ul_list.h>
 #include <forb/object_type.h>
+#include <forb/syncobj.h>
+
 
 /**
  * Executor structure.
@@ -74,6 +76,7 @@ typedef struct forb_executor {
        fosa_mutex_t mutex;     /**< Mutex for protecting forb_executor_t::requests. */
        fosa_cond_t new_request_in_empty_list; /**< Signaled when a request was added to the empty list. */
        ul_list_head_t requests; /**< List of pending requests for this executor. */
+       forb_syncobj_t reply_processed;   /**< Synchronization object for signaling the receiver thread to continue processing after the reply is processed by a stub. */       
 } forb_executor_t;
 
 
index 51330693aba314314b387304cfc6f198b840d9a2..c7ab0c9eeb6e53dfa3ca469a08400888b42b1551 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -388,6 +388,7 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
        exec_req = forb_malloc(sizeof(*exec_req));
        if (exec_req) {
                memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_REMOTE; 
                exec_req->request_id = request_header.request_id;
                exec_req->source = request_header.source;
                exec_req->obj = obj;
@@ -732,6 +733,7 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
        size_t len;
        fosa_abs_time_t timeout;
        forb_t *forb = forb_object_to_forb(req->obj);
+       forb_exec_req_t *exec_req;
 
        if (!forb) {
                env->major = FORB_EX_INTERNAL;
@@ -748,6 +750,21 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env)
                return;
        }
 
+       /* Local invocation case, destination of a message is only 
+        * a different executor thread */
+       if (forb_object_is_local(req->obj)) {
+               exec_req = forb_malloc(sizeof(*exec_req));
+               memset(exec_req, 0, sizeof(exec_req));
+               exec_req->request_type = FORB_EXEC_REQ_LOCAL; 
+               exec_req->input_request = req;
+               exec_req->obj = exec_req->input_request->obj;
+               //exec_req->method_index = req.method_index;
+               exec_req->codec = exec_req->input_request->cdr_request; 
+               exec_req->request_id = exec_req->input_request->request_id;
+               forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
+               return;
+       }
+
        ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
        if (!ret) {
                /* This should never happen */
index 1b1d0fc9c1fa39b4931ee573f2027160c7b74154..16f9049ed4d4c8f8adf0b837dfe80acaf3712d73 100644 (file)
@@ -66,7 +66,6 @@
 #include <forb/forb-internal.h>
 
 
-
 /**
  * Helper structure for sending ORB requests.
  *
index 896c2d9a1f1db4a91f17d8b37748a03521854081..981145f9c719baf0e440560d60d239085173eef8 100644 (file)
@@ -8,6 +8,7 @@
 #include <forb/iop.h>
 #include <forb/cdr.h>
 #include <forb/object.h>
+#include <semaphore.h>
 
 #include "myinterface.h"
 
 CORBA_long myinterface_add(myinterface _obj, const CORBA_long a, const CORBA_long b, CORBA_Environment *ev)
 {
   CORBA_long _forb_retval;
-  executor *exec_current;
+  forb_executor_t *exec_current;
   if (ev) ev->major = FORB_EX_NONE;
   /* local object */
-  if (forb_object_is_local(_obj)) {
+  if (forb_object_is_local(_obj) && !forb_get_current_executor(exec_current) && 
+                       (forb_object_get_executor(_obj) == exec_current)) {
     if (!_obj->interface ||
         strncmp(_obj->interface->name, "myinterface", 11) != 0) {
       ev->major = FORB_EX_BAD_OPERATION;
       return _forb_retval;
     }
-    if (!forb_get_current_executor(exec_current) && (forb_object_get_executor(_obj) == exec_current))
-      /* direct invocation */
-      _forb_retval = _myinterface_impl(_obj)->add(_obj, a, b, ev);
-    } else {
-      /* TODO inter-thread invocation*/
-    }
+    /* direct invocation */
+    _forb_retval = _myinterface_impl(_obj)->add(_obj, a, b, ev);
   } else {
     /* remote object - the same as before */
     forb_request_t req;