EXTRA_RULES_SUBDIRS = forb-idl\r
\r
lib_LIBRARIES += forb\r
-forb_SOURCES = forb.c cdr.c sha1.c uuid.c iop.c proto.c\r
+forb_SOURCES = forb.c cdr.c sha1.c uuid.c iop.c proto.c syncobj.c\r
forb_CLIENT_IDL = forb-idl.idl\r
\r
lib_LIBRARIES += idltest\r
$(call to_forb_subdir, cdr.h) \\r
$(call to_forb_subdir, iop.h) \\r
$(call to_forb_subdir, uuid.h) \\r
+ $(call to_forb_subdir, syncobj.h) \\r
$(call to_forb_subdir, proto.h)\r
\r
renamed_include_GEN_HEADERS = \\r
" forb_request_t req;\n"
" CDR_codec_init_static(&codec);\n"
" ex_on_fail(CDR_buffer_init(&codec, 256, forb_iop_MESSAGE_HEADER_SIZE), FORB_EX_NO_MEMORY);\n");
- fprintf(of, " forb_request_init(&req, _obj->orb);\n");
+ fprintf(of, " ex_on_fail(forb_request_init(&req, _obj->orb) == 0, FORB_EX_INTERNAL);\n");
fprintf(of, " forb_iop_prepare_request(&req, &codec, \"%s\", _obj, FORB_METHOD_INDEX(%s), ev);\n",
iface_id, opname);
fprintf(of, " if (forb_exception_occured(ev)) goto exception;\n");
if (has_retval) {
fprintf(of, " ");
forb_cbe_write_typespec(of, IDL_OP_DCL(tree).op_type_spec);
- fprintf(of, "_deserialize(req.reply, &"FORB_RETVAL_VAR_NAME");\n");
+ fprintf(of, "_deserialize(req.cdr_reply, &"FORB_RETVAL_VAR_NAME");\n");
}
for (sub = IDL_OP_DCL (tree).parameter_dcls; sub; sub = IDL_LIST (sub).next) {
IDL_tree parm = IDL_LIST (sub).data;
char *name = IDL_IDENT(IDL_PARAM_DCL(parm).simple_declarator).str;
fprintf(of, " ");
forb_cbe_write_typespec(of, IDL_PARAM_DCL(parm).param_type_spec);
- fprintf(of, "_deserialize(req.reply, %s);\n", name);
+ fprintf(of, "_deserialize(req.cdr_reply, %s);\n", name);
}
}
fprintf(of, "exception:\n"
ul_logerr("Received reply to unknown request_id %ud\n", rh.request_id);
return;
}
- fosa_mutex_lock(&req->mutex);
if (rh.flags & forb_iop_FLAG_EXCEPTION) {
forb_exception_deserialize(codec, req->env);
} else {
- req->reply = codec;
+ req->cdr_reply = codec;
}
- req->fin_mutex = &port->fin_mutex;
- req->fin_cond = &port->fin_cond;
-
- fosa_cond_signal(&req->cond);
- fosa_mutex_unlock(&req->mutex);
+ req->reply_processed = &port->reply_processed;
+
+ /* Resume the stub witing in forb_wait_for_reply() */
+ forb_syncobj_signal(&req->reply_ready);
/* Wait for stub to process the results from the codec's buffer */
- fosa_mutex_lock(&port->fin_mutex);
- fosa_cond_wait(&port->fin_cond, &port->fin_mutex);
- fosa_mutex_unlock(&port->fin_mutex);
+ forb_syncobj_wait(&port->reply_processed);
}
/**
peer->addr = addr;
forb_peer_insert(forb, peer);
- fosa_mutex_lock(&port->hello_mutex);
- fosa_cond_signal(&port->hello_cond);
- fosa_mutex_unlock(&port->hello_mutex);
+ /* Broadcast our hello packet now */
+ forb_syncobj_signal(&port->hello);
}
}
}
CDR_data_size(&codec));
/* Wait for next hello interval or until somebody
- * signal us. */
+ * signals us. */
fosa_abs_time_incr(hello_time, hello_interval);
/* sem_timedwait would be more appropriate */
- fosa_mutex_lock(&port->hello_mutex);
- fosa_cond_timedwait(&port->hello_cond, &port->hello_mutex,
- &hello_time);
- fosa_mutex_unlock(&port->hello_mutex);
+ forb_syncobj_timedwait(&port->hello, &hello_time);
}
CDR_codec_release_buffer(&codec);
return NULL;
forb_port_insert(forb, port);
fosa_mutex_unlock(&forb->port_mutex);
- fosa_mutex_init(&port->hello_mutex, 0);
- fosa_cond_init(&port->hello_cond);
-
- fosa_mutex_init(&port->fin_mutex, 0);
- fosa_cond_init(&port->fin_cond);
+ forb_syncobj_init(&port->hello, 0);
+ forb_syncobj_init(&port->reply_processed, 0);
CDR_codec_init_static(&port->codec);
if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
*
* @param req Request structure to initialize
* @param obj Destination object
+ *
+ * @return Zero on success, FOSA error code on error.
*/
-void
+int
forb_request_init(forb_request_t *req, forb_object obj)
{
+ int ret = 0;
+
forb_t *forb = forb_obj_to_forb(obj);
req->obj = obj;
fosa_mutex_lock(&forb->id_mutex);
req->request_id = forb_data(obj->orb)->request_id++;
fosa_mutex_unlock(&forb->id_mutex);
+
+ ret = forb_syncobj_init(&req->reply_ready, 0);
+ return ret;
}
void
forb_request_destroy(forb_request_t *req)
{
- fosa_cond_destroy(&req->cond);
- fosa_mutex_destroy(&req->mutex);
+ forb_syncobj_destroy(&req->reply_ready);
}
env->major = FORB_EX_INTERNAL;
return;
}
- ret = forb_iop_prepend_message_header(req->codec, forb_iop_REQUEST);
+
+ req->env = env; /* Remember, where to return exceptions */
+
+ ret = forb_iop_prepend_message_header(req->cdr_request, forb_iop_REQUEST);
if (!ret) {
/* This should never happen */
env->major = FORB_EX_INTERNAL;
return;
}
- iret = fosa_mutex_init(&req->mutex, 0);
- if (iret != 0) {
- env->major = FORB_EX_INTERNAL;
- goto exception;
- }
- iret = fosa_cond_init(&req->cond);
- if (iret != 0) {
- env->major = FORB_EX_INTERNAL;
- goto exception;
- }
forb_request_insert(forb, req);
- fosa_mutex_lock(&req->mutex); /* Unlocked in forb_wait_for_reply() */
- size = forb_proto_send(peer, req->codec);
+ size = forb_proto_send(peer, req->cdr_request);
if (size <= 0) {
- fosa_mutex_unlock(&req->mutex);
+ forb_syncobj_destroy(&req->reply_ready);
forb_request_delete(forb, req);
env->major = FORB_EX_COMM_FAILURE;
}
void
forb_wait_for_reply(forb_request_t *req, CORBA_Environment *env)
{
- req->env = env; /* Tell, where to return exceptions */
- fosa_cond_wait(&req->cond, &req->mutex);
- fosa_mutex_unlock(&req->mutex); /* Locked in forb_send_request() */
+ forb_syncobj_wait(&req->reply_ready);
}
void
forb_reply_processed(forb_request_t *req)
{
/* Signal, that we are done */
- fosa_mutex_lock(req->fin_mutex);
- fosa_cond_signal(req->fin_cond);
- fosa_mutex_unlock(req->fin_mutex);
+ forb_syncobj_signal(req->reply_processed);
}
#define FORB_PROTO_H
#include <forb/forb-internal.h>
+#include "syncobj.h"
/**
* Helper structure for sending ORB requests.
*/
struct forb_request {
CORBA_unsigned_long request_id;
- fosa_mutex_t mutex;
- fosa_cond_t cond; /**< Synchronization object for waiting for reply */
- CDR_Codec *codec;
- CDR_Codec *reply;
+ CDR_Codec *cdr_request;
+ CDR_Codec *cdr_reply;
gavl_node_t node;
forb_object obj;
struct forb_env *env; /**< Where to stere exception returned in reply */
- fosa_mutex_t *fin_mutex;
- fosa_cond_t *fin_cond; /**< Synchronization object for waiting for finished processing */
+ forb_syncobj_t reply_ready; /**< Synchronization object for waiting for reply */
+ forb_syncobj_t *reply_processed; /**< Synchronization object for receiver thread to wait for stub. */
};
typedef struct forb_request forb_request_t;
forb_t *forb; /**< FORB, this port is registered in. */
fosa_thread_id_t receiver_thread; /**< The thread running forb_port_receiver_thread() */
fosa_thread_id_t discovery_thread;/**< The thread for periodic sending HELLO messages */
- fosa_cond_t hello_cond; /**< Condition variable for signaling the discovery thread to send the hello messages now. */
- fosa_mutex_t hello_mutex; /**< Mutex for @c hello_cond */
- fosa_cond_t fin_cond; /**< Condition variable for signaling the receiver thread to continue processing after the reply is passed to a stub. */
- fosa_mutex_t fin_mutex; /**< Mutex for @c fin_cond */
+ forb_syncobj_t hello; /**< Synchronization object for signaling the discovery thread to send the hello messages now. */
+ forb_syncobj_t reply_processed; /**< Synchronization object for signaling the receiver thread to continue processing after the reply is processed by a stub. */
CDR_Codec codec; /**< Receiving buffer for receiver thread */
void *addr; /**< Port's address in a protocol specific format. */
ul_list_node_t node; /**< Node in forb's port list */
void forb_destroy_port(forb_port_t *port);
size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec);
-void
+int
forb_request_init(forb_request_t *req, forb_object obj);
void
forb_request_destroy(forb_request_t *req);
--- /dev/null
+//----------------------------------------------------------------------
+// Copyright (C) 2006 - 2007 by the FRESCOR consortium:
+//
+// Universidad de Cantabria, SPAIN
+// University of York, UK
+// Scuola Superiore Sant'Anna, ITALY
+// Kaiserslautern University, GERMANY
+// Univ. Politecnica Valencia, SPAIN
+// Czech Technical University in Prague, CZECH REPUBLIC
+// ENEA SWEDEN
+// Thales Communication S.A. FRANCE
+// Visual Tools S.A. SPAIN
+// Rapita Systems Ltd UK
+// Evidence ITALY
+//
+// See http://www.frescor.org
+//
+// The FRESCOR project (FP6/2005/IST/5-034026) is funded
+// in part by the European Union Sixth Framework Programme
+// The European Union is not liable of any use that may be
+// made of this code.
+//
+//
+// based on previous work (FSF) done in the FIRST project
+//
+// Copyright (C) 2005 Mälardalen University, SWEDEN
+// Scuola Superiore S.Anna, ITALY
+// Universidad de Cantabria, SPAIN
+// University of York, UK
+//
+// This file is part of DTM (Distributed Transaction Manager)
+//
+// DTM is free software; you can redistribute it and/or modify it
+// under terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 2, or (at your option) any
+// later version. DTM is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details. You should have received a
+// copy of the GNU General Public License along with DTM; see file
+// COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,
+// Cambridge, MA 02139, USA.
+//
+// As a special exception, including DTM header files in a file,
+// instantiating DTM generics or templates, or linking other files
+// with DTM objects to produce an executable application, does not
+// by itself cause the resulting executable application to be covered
+// by the GNU General Public License. This exception does not
+// however invalidate any other reasons why the executable file might be
+// covered by the GNU Public License.
+// -----------------------------------------------------------------------
+
+/*!
+ * @file
+ *
+ * @brief Synchronization object used in FORB
+ *
+ * @author
+ * Michael Gonzalez Harbour <mgh@unican.es>
+ * Michal Sojka <sojkam1@fel.cvut.cz>
+ * Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ * @comments
+ *
+ * This module contains the definition of the data object and operations to
+ * create a pool of objects, obtain the id of an unused object, wait upon it,
+ * signal it, and finish using it.
+ *
+ * @license
+ *
+ * See the COPYING file in the FORB's root directory
+ *
+ */
+
+#include "syncobj.h"
+
+/**
+ * Initializes the synchronization object.
+ *
+ * @return Zero on success, FOSA error code on error.
+ **/
+
+int forb_syncobj_init(forb_syncobj_t *syncobj, int ceiling)
+{
+ int err;
+
+ err = fosa_cond_init(&syncobj->cond);
+ if (err != 0) return err;
+
+ syncobj->is_work_done = false;
+
+ err = fosa_mutex_init(&syncobj->mutex, ceiling);
+ if (err != 0) return err;
+
+ return 0;
+}
+
+/**
+ * Destroys all resources related to the synchronization object.
+ **/
+
+int forb_syncobj_destroy(forb_syncobj_t *syncobj)
+{
+ int err;
+
+ err = fosa_cond_destroy(&syncobj->cond);
+ if (err != 0) return err;
+
+ err = fosa_mutex_destroy(&syncobj->mutex);
+ if (err != 0) return err;
+
+ return 0;
+}
+
+/**
+ * Signal the synchronization object.
+ **/
+
+int forb_syncobj_signal(forb_syncobj_t *syncobj)
+{
+ int err;
+
+ err = fosa_mutex_lock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ syncobj->is_work_done = true;
+
+ err = fosa_cond_signal(&syncobj->cond);
+ if (err != 0) goto locked_error;
+
+ err = fosa_mutex_unlock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ return 0;
+
+locked_error:
+ fosa_mutex_unlock(&syncobj->mutex);
+ return err;
+}
+
+/**
+ * Wait on the synchronization object.
+ **/
+
+int forb_syncobj_wait(forb_syncobj_t *syncobj)
+{
+ int err;
+
+ err = fosa_mutex_lock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ while (syncobj->is_work_done == false) {
+ err = fosa_cond_wait(&syncobj->cond,
+ &syncobj->mutex);
+ if (err != 0) goto locked_error;
+ }
+
+ syncobj->is_work_done = false;
+
+ err = fosa_mutex_unlock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ return 0;
+
+locked_error:
+ fosa_mutex_unlock(&syncobj->mutex);
+ return err;
+}
+
+/**
+ * Wait on the synchronization object with a timeout.
+ *
+ **/
+
+int forb_syncobj_timedwait(forb_syncobj_t *syncobj,
+ const struct timespec *abstime)
+{
+ int err;
+
+ err = fosa_mutex_lock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ while (syncobj->is_work_done == false) {
+ err = fosa_cond_timedwait(&syncobj->cond,
+ &syncobj->mutex,
+ abstime);
+ if (err != 0) goto locked_error;
+ }
+
+ syncobj->is_work_done = false;
+
+ err = fosa_mutex_unlock(&syncobj->mutex);
+ if (err != 0) return err;
+
+ return 0;
+
+locked_error:
+ fosa_mutex_unlock(&syncobj->mutex);
+ return err;
+}
--- /dev/null
+#ifndef SYNC_OBJ_H
+#define SYNC_OBJ_H
+
+//----------------------------------------------------------------------
+// Copyright (C) 2006 - 2007 by the FRESCOR consortium:
+//
+// Universidad de Cantabria, SPAIN
+// University of York, UK
+// Scuola Superiore Sant'Anna, ITALY
+// Kaiserslautern University, GERMANY
+// Univ. Politecnica Valencia, SPAIN
+// Czech Technical University in Prague, CZECH REPUBLIC
+// ENEA SWEDEN
+// Thales Communication S.A. FRANCE
+// Visual Tools S.A. SPAIN
+// Rapita Systems Ltd UK
+// Evidence ITALY
+//
+// See http://www.frescor.org
+//
+// The FRESCOR project (FP6/2005/IST/5-034026) is funded
+// in part by the European Union Sixth Framework Programme
+// The European Union is not liable of any use that may be
+// made of this code.
+//
+//
+// based on previous work (FSF) done in the FIRST project
+//
+// Copyright (C) 2005 Mälardalen University, SWEDEN
+// Scuola Superiore S.Anna, ITALY
+// Universidad de Cantabria, SPAIN
+// University of York, UK
+//
+// This file is part of DTM (Distributed Transaction Manager)
+//
+// DTM is free software; you can redistribute it and/or modify it
+// under terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 2, or (at your option) any
+// later version. DTM is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details. You should have received a
+// copy of the GNU General Public License along with DTM; see file
+// COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,
+// Cambridge, MA 02139, USA.
+//
+// As a special exception, including DTM header files in a file,
+// instantiating DTM generics or templates, or linking other files
+// with DTM objects to produce an executable application, does not
+// by itself cause the resulting executable application to be covered
+// by the GNU General Public License. This exception does not
+// however invalidate any other reasons why the executable file might be
+// covered by the GNU Public License.
+// -----------------------------------------------------------------------
+
+/*!
+ * @file dtm_reply_objects.h
+ *
+ * @brief DTM reply objects
+ *
+ * This module contains the definition of the data object and operations to
+ * create a pool of objects, obtain the id of an unused object, wait upon it,
+ * signal it, and finish using it.
+ *
+ * @author Michael Gonzalez Harbour <mgh@unican.es>
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#ifndef _SYNC_OBJ_H_
+#define _SYNC_OBJ_H_
+
+#include <fosa_mutexes_and_condvars.h>
+#include <time.h> /* for timespec */
+#include <fosa_opaque_types.h> /* for FOSA_ETIMEDOUT */
+
+typedef struct forb_syncobj {
+ bool is_work_done;
+ fosa_cond_t cond;
+ fosa_mutex_t mutex;
+} forb_syncobj_t;
+
+
+extern int forb_syncobj_init(forb_syncobj_t *syncobj, int ceiling);
+extern int forb_syncobj_destroy(forb_syncobj_t *syncobj);
+extern int forb_syncobj_signal(forb_syncobj_t *syncobj);
+extern int forb_syncobj_wait(forb_syncobj_t *syncobj);
+extern int forb_syncobj_timedwait(forb_syncobj_t *syncobj,
+ const struct timespec *abstime);
+#define FORB_ETIMEDOUT FOSA_ETIMEDOUT
+
+#endif // _SYNC_OBJ_H_
+
+
+#endif