&reply_codec,
exec_req->obj,
&env);
-
- forb_iop_send_reply(forb, &exec_req->source,
- &reply_codec,
- exec_req->request_id, &env);
- FORB_CDR_codec_release_buffer(&reply_codec);
+
+ // The local invocation case
+ if ((exec_req->request_type = FORB_EXEC_REQ_LOCAL)) {
+ forb_request_t *request = exec_req->input_request;
+ forb_executor_t *executor = exec_req->obj->executor;
+
+ request->cdr_reply = &reply_codec;
+ /* Tell the stub where to signal that reply processing is
+ * finished */
+ request->reply_processed = &executor->reply_processed;
+ // notify that the reply is ready
+ forb_syncobj_signal(&request->reply_ready);
+ /* Wait for stub to process the results from the codec's buffer */
+ forb_syncobj_wait(&executor->reply_processed);
+ } else {
+ forb_iop_send_reply(forb, &exec_req->source,
+ &reply_codec,
+ exec_req->request_id, &env);
+ }
+ FORB_CDR_codec_release_buffer(&reply_codec);
forb_exec_req_destroy(exec_req);
}
#include <forb/cdr.h>
#include <ul_list.h>
#include "executor.h"
+#include "request.h"
+
+/**
+ * Type of request (local/remote)
+ */
+enum forb_exec_req_type {
+ FORB_EXEC_REQ_LOCAL, // no serialization used
+ FORB_EXEC_REQ_REMOTE
+};
/**
* Request for ::forb_executor_t.
unsigned method_index; /**< Mehotd number to be invoked on the object @a obj. */
FORB_CDR_Codec codec; /**< Bufffer with serialized request parameters. */
ul_list_node_t node; /**< Node for forb_executor_t::requests. */
+ enum forb_exec_req_type request_type; /**< Execution method. */
+ forb_request_t *input_request; /**< Input request data for the case of local invocation */
} forb_exec_req_t;
UL_LIST_CUST_DEC(forb_exec_req_nolock, /* cust_prefix */
if (ret) return ret;
forb_exec_req_nolock_init_head(executor);
+
+ forb_syncobj_init(&executor->reply_processed, 0);
return 0;
}
#include <fosa.h>
#include <ul_list.h>
#include <forb/object_type.h>
+#include <forb/syncobj.h>
+
/**
* Executor structure.
fosa_mutex_t mutex; /**< Mutex for protecting forb_executor_t::requests. */
fosa_cond_t new_request_in_empty_list; /**< Signaled when a request was added to the empty list. */
ul_list_head_t requests; /**< List of pending requests for this executor. */
+ forb_syncobj_t reply_processed; /**< Synchronization object for signaling the receiver thread to continue processing after the reply is processed by a stub. */
} forb_executor_t;
exec_req = forb_malloc(sizeof(*exec_req));
if (exec_req) {
memset(exec_req, 0, sizeof(exec_req));
+ exec_req->request_type = FORB_EXEC_REQ_REMOTE;
exec_req->request_id = request_header.request_id;
exec_req->source = request_header.source;
exec_req->obj = obj;
size_t len;
fosa_abs_time_t timeout;
forb_t *forb = forb_object_to_forb(req->obj);
+ forb_exec_req_t *exec_req;
if (!forb) {
env->major = FORB_EX_INTERNAL;
return;
}
+ /* Local invocation case, destination of a message is only
+ * a different executor thread */
+ if (forb_object_is_local(req->obj)) {
+ exec_req = forb_malloc(sizeof(*exec_req));
+ memset(exec_req, 0, sizeof(exec_req));
+ exec_req->request_type = FORB_EXEC_REQ_LOCAL;
+ exec_req->input_request = req;
+ exec_req->obj = exec_req->input_request->obj;
+ //exec_req->method_index = req.method_index;
+ exec_req->codec = exec_req->input_request->cdr_request;
+ exec_req->request_id = exec_req->input_request->request_id;
+ forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
+ return;
+ }
+
ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
if (!ret) {
/* This should never happen */
#include <forb/forb-internal.h>
-
/**
* Helper structure for sending ORB requests.
*
#include <forb/iop.h>
#include <forb/cdr.h>
#include <forb/object.h>
+#include <semaphore.h>
#include "myinterface.h"
CORBA_long myinterface_add(myinterface _obj, const CORBA_long a, const CORBA_long b, CORBA_Environment *ev)
{
CORBA_long _forb_retval;
- executor *exec_current;
+ forb_executor_t *exec_current;
if (ev) ev->major = FORB_EX_NONE;
/* local object */
- if (forb_object_is_local(_obj)) {
+ if (forb_object_is_local(_obj) && !forb_get_current_executor(exec_current) &&
+ (forb_object_get_executor(_obj) == exec_current)) {
if (!_obj->interface ||
strncmp(_obj->interface->name, "myinterface", 11) != 0) {
ev->major = FORB_EX_BAD_OPERATION;
return _forb_retval;
}
- if (!forb_get_current_executor(exec_current) && (forb_object_get_executor(_obj) == exec_current))
- /* direct invocation */
- _forb_retval = _myinterface_impl(_obj)->add(_obj, a, b, ev);
- } else {
- /* TODO inter-thread invocation*/
- }
+ /* direct invocation */
+ _forb_retval = _myinterface_impl(_obj)->add(_obj, a, b, ev);
} else {
/* remote object - the same as before */
forb_request_t req;