]> rtime.felk.cvut.cz Git - frescor/forb.git/commitdiff
Thread synchronization reworked to used syncobj inspired by DTM
authorMichal Sojka <sojkam1@fel.cvut.cz>
Mon, 22 Sep 2008 14:29:02 +0000 (16:29 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Mon, 22 Sep 2008 14:29:02 +0000 (16:29 +0200)
Makefile.omk
forb-idl/forb-idl-c-stubs.c
proto.c
proto.h
syncobj.c [new file with mode: 0644]
syncobj.h [new file with mode: 0644]

index ff5d8b70a82492504fb9f623b0d9ae3d385c09c8..4c32c5a26174b27c8c7e4946a81f2d65a08c518d 100644 (file)
@@ -2,7 +2,7 @@ SUBDIRS = tests-idl tests
 EXTRA_RULES_SUBDIRS = forb-idl\r
 \r
 lib_LIBRARIES += forb\r
-forb_SOURCES = forb.c cdr.c sha1.c uuid.c iop.c proto.c\r
+forb_SOURCES = forb.c cdr.c sha1.c uuid.c iop.c proto.c syncobj.c\r
 forb_CLIENT_IDL = forb-idl.idl\r
 \r
 lib_LIBRARIES += idltest\r
@@ -23,6 +23,7 @@ renamed_include_HEADERS += \
        $(call to_forb_subdir, cdr.h) \\r
        $(call to_forb_subdir, iop.h) \\r
        $(call to_forb_subdir, uuid.h) \\r
+       $(call to_forb_subdir, syncobj.h) \\r
        $(call to_forb_subdir, proto.h)\r
 \r
 renamed_include_GEN_HEADERS = \\r
index a52960caa8cf208b4ad55634e8ddad7133dc3678..6e71aff462a3a32cd37948343f651de5912662e1 100644 (file)
@@ -69,7 +69,7 @@ cs_output_stub (IDL_tree     tree,
                    "    forb_request_t req;\n"
                    "    CDR_codec_init_static(&codec);\n"
                    "    ex_on_fail(CDR_buffer_init(&codec, 256, forb_iop_MESSAGE_HEADER_SIZE), FORB_EX_NO_MEMORY);\n");
-       fprintf(of, "    forb_request_init(&req, _obj->orb);\n");
+       fprintf(of, "    ex_on_fail(forb_request_init(&req, _obj->orb) == 0, FORB_EX_INTERNAL);\n");
        fprintf(of, "    forb_iop_prepare_request(&req, &codec, \"%s\", _obj, FORB_METHOD_INDEX(%s), ev);\n",
                iface_id, opname);
        fprintf(of, "    if (forb_exception_occured(ev)) goto exception;\n");
@@ -93,7 +93,7 @@ cs_output_stub (IDL_tree     tree,
        if (has_retval) {
          fprintf(of, "    ");
          forb_cbe_write_typespec(of, IDL_OP_DCL(tree).op_type_spec);
-         fprintf(of, "_deserialize(req.reply, &"FORB_RETVAL_VAR_NAME");\n");
+         fprintf(of, "_deserialize(req.cdr_reply, &"FORB_RETVAL_VAR_NAME");\n");
        }
        for (sub = IDL_OP_DCL (tree).parameter_dcls; sub; sub = IDL_LIST (sub).next) {
                IDL_tree        parm = IDL_LIST (sub).data;
@@ -103,7 +103,7 @@ cs_output_stub (IDL_tree     tree,
                  char *name = IDL_IDENT(IDL_PARAM_DCL(parm).simple_declarator).str;
                  fprintf(of, "    ");
                  forb_cbe_write_typespec(of, IDL_PARAM_DCL(parm).param_type_spec);
-                 fprintf(of, "_deserialize(req.reply, %s);\n", name);
+                 fprintf(of, "_deserialize(req.cdr_reply, %s);\n", name);
                }
        }
        fprintf(of, "exception:\n"
diff --git a/proto.c b/proto.c
index 507d89142922fe05e69cfeecbfbccc4c8b3b6d54..2151640dd16dd143dbd23cad0a11349d1cecaa33 100644 (file)
--- a/proto.c
+++ b/proto.c
@@ -130,22 +130,18 @@ process_reply(forb_port_t *port, CDR_Codec *codec)
                ul_logerr("Received reply to unknown request_id %ud\n", rh.request_id);
                return;
        }
-       fosa_mutex_lock(&req->mutex);
        if (rh.flags & forb_iop_FLAG_EXCEPTION) {
                forb_exception_deserialize(codec, req->env);
        } else {
-               req->reply = codec;
+               req->cdr_reply = codec;
        }
-       req->fin_mutex = &port->fin_mutex;
-       req->fin_cond = &port->fin_cond;
-       
-       fosa_cond_signal(&req->cond);
-       fosa_mutex_unlock(&req->mutex);
+       req->reply_processed = &port->reply_processed;
+
+       /* Resume the stub witing in forb_wait_for_reply() */
+       forb_syncobj_signal(&req->reply_ready);
 
        /* Wait for stub to process the results from the codec's buffer */
-       fosa_mutex_lock(&port->fin_mutex);
-       fosa_cond_wait(&port->fin_cond, &port->fin_mutex);
-       fosa_mutex_unlock(&port->fin_mutex);
+       forb_syncobj_wait(&port->reply_processed);
 }
 
 /** 
@@ -188,9 +184,8 @@ process_hello(forb_port_t *port, CDR_Codec *codec)
                        peer->addr = addr;
                        forb_peer_insert(forb, peer);
 
-                       fosa_mutex_lock(&port->hello_mutex);
-                       fosa_cond_signal(&port->hello_cond);
-                       fosa_mutex_unlock(&port->hello_mutex);
+                       /* Broadcast our hello packet now */
+                       forb_syncobj_signal(&port->hello);
                }
        }
 }
@@ -308,13 +303,10 @@ static void *port_discovery_thread(void *arg)
                                 CDR_data_size(&codec));
 
                /* Wait for next hello interval or until somebody
-                * signal us. */
+                * signals us. */
                fosa_abs_time_incr(hello_time, hello_interval);
                /* sem_timedwait would be more appropriate */
-               fosa_mutex_lock(&port->hello_mutex);
-               fosa_cond_timedwait(&port->hello_cond, &port->hello_mutex,
-                                   &hello_time);
-               fosa_mutex_unlock(&port->hello_mutex);
+               forb_syncobj_timedwait(&port->hello, &hello_time);
        }
        CDR_codec_release_buffer(&codec);
        return NULL;
@@ -343,11 +335,8 @@ int forb_register_port(forb_t *forb, forb_port_t *port)
        forb_port_insert(forb, port);
        fosa_mutex_unlock(&forb->port_mutex);
 
-       fosa_mutex_init(&port->hello_mutex, 0);
-       fosa_cond_init(&port->hello_cond);
-
-       fosa_mutex_init(&port->fin_mutex, 0);
-       fosa_cond_init(&port->fin_cond);
+       forb_syncobj_init(&port->hello, 0);
+       forb_syncobj_init(&port->reply_processed, 0);
 
        CDR_codec_init_static(&port->codec);
        if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
@@ -459,22 +448,28 @@ GAVL_CUST_NODE_INT_IMP(forb_request_nolock /* cust_prefix */,
  * 
  * @param req Request structure to initialize
  * @param obj Destination object
+ *
+ * @return Zero on success, FOSA error code on error.
  */
-void
+int
 forb_request_init(forb_request_t *req, forb_object obj)
 {
+       int ret = 0;
+       
        forb_t *forb = forb_obj_to_forb(obj);
        req->obj = obj;
        fosa_mutex_lock(&forb->id_mutex);
        req->request_id = forb_data(obj->orb)->request_id++;
        fosa_mutex_unlock(&forb->id_mutex);
+
+       ret = forb_syncobj_init(&req->reply_ready, 0);
+       return ret;
 }
 
 void
 forb_request_destroy(forb_request_t *req)
 {
-       fosa_cond_destroy(&req->cond);
-       fosa_mutex_destroy(&req->mutex);
+       forb_syncobj_destroy(&req->reply_ready);
 }
 
 
@@ -503,7 +498,10 @@ forb_send_request(forb_request_t *req, CORBA_Environment *env)
                env->major = FORB_EX_INTERNAL;
                return;
        }
-       ret = forb_iop_prepend_message_header(req->codec, forb_iop_REQUEST);
+
+       req->env = env;     /* Remember, where to return exceptions */
+
+       ret = forb_iop_prepend_message_header(req->cdr_request, forb_iop_REQUEST);
        if (!ret) {
                /* This should never happen */
                env->major = FORB_EX_INTERNAL;
@@ -515,22 +513,11 @@ forb_send_request(forb_request_t *req, CORBA_Environment *env)
                return;
        }
        
-       iret = fosa_mutex_init(&req->mutex, 0);
-       if (iret != 0) {
-               env->major = FORB_EX_INTERNAL;
-               goto exception;
-       }
-       iret = fosa_cond_init(&req->cond);
-       if (iret != 0) {
-               env->major = FORB_EX_INTERNAL;
-               goto exception;
-       }
        forb_request_insert(forb, req);
 
-       fosa_mutex_lock(&req->mutex); /* Unlocked in forb_wait_for_reply() */
-       size = forb_proto_send(peer, req->codec);
+       size = forb_proto_send(peer, req->cdr_request);
        if (size <= 0) {
-               fosa_mutex_unlock(&req->mutex);
+               forb_syncobj_destroy(&req->reply_ready);
                forb_request_delete(forb, req);
                env->major = FORB_EX_COMM_FAILURE;
        }
@@ -541,16 +528,12 @@ exception:
 void
 forb_wait_for_reply(forb_request_t *req, CORBA_Environment *env)
 {
-       req->env = env;         /* Tell, where to return exceptions */
-       fosa_cond_wait(&req->cond, &req->mutex); 
-       fosa_mutex_unlock(&req->mutex); /* Locked in forb_send_request() */
+       forb_syncobj_wait(&req->reply_ready); 
 }
 
 void
 forb_reply_processed(forb_request_t *req)
 {
        /* Signal, that we are done */
-       fosa_mutex_lock(req->fin_mutex);
-       fosa_cond_signal(req->fin_cond);
-       fosa_mutex_unlock(req->fin_mutex);
+       forb_syncobj_signal(req->reply_processed);
 }
diff --git a/proto.h b/proto.h
index 767466254b59434eb2505f4fff15202d0bc0d773..d58ffe487878260037f364641b32b48537db15c3 100644 (file)
--- a/proto.h
+++ b/proto.h
@@ -12,6 +12,7 @@
 #define FORB_PROTO_H
 
 #include <forb/forb-internal.h>
+#include "syncobj.h"
 
 /**
  * Helper structure for sending ORB requests.
  */
 struct forb_request {
        CORBA_unsigned_long request_id;
-       fosa_mutex_t mutex;
-       fosa_cond_t cond;       /**< Synchronization object for waiting for reply */
-       CDR_Codec *codec;
-       CDR_Codec *reply;
+       CDR_Codec *cdr_request;
+       CDR_Codec *cdr_reply;
        gavl_node_t node;
        forb_object obj;
        struct forb_env *env;   /**< Where to stere exception returned in reply */
-       fosa_mutex_t *fin_mutex;
-       fosa_cond_t *fin_cond;  /**< Synchronization object for waiting for finished processing */
+       forb_syncobj_t reply_ready; /**< Synchronization object for waiting for reply */
+       forb_syncobj_t *reply_processed; /**< Synchronization object for receiver thread to wait for stub. */
 };
 
 typedef struct forb_request forb_request_t;
@@ -124,10 +123,8 @@ struct forb_port {
        forb_t *forb;                     /**< FORB, this port is registered in. */
        fosa_thread_id_t receiver_thread; /**< The thread running forb_port_receiver_thread() */
        fosa_thread_id_t discovery_thread;/**< The thread for periodic sending HELLO messages */
-       fosa_cond_t hello_cond;           /**< Condition variable for signaling the discovery thread to send the hello messages now. */
-       fosa_mutex_t hello_mutex;         /**< Mutex for @c hello_cond */
-       fosa_cond_t fin_cond;             /**< Condition variable for signaling the receiver thread to continue processing after the reply is passed to a stub. */
-       fosa_mutex_t fin_mutex;           /**< Mutex for @c fin_cond */
+       forb_syncobj_t hello;             /**< Synchronization object for signaling the discovery thread to send the hello messages now. */
+       forb_syncobj_t reply_processed;   /**< Synchronization object for signaling the receiver thread to continue processing after the reply is processed by a stub. */
        CDR_Codec codec;                  /**< Receiving buffer for receiver thread */
        void *addr;                       /**< Port's address in a protocol specific format. */
        ul_list_node_t node;              /**< Node in forb's port list */
@@ -228,7 +225,7 @@ int forb_register_port(forb_t *forb, forb_port_t *port);
 void forb_destroy_port(forb_port_t *port);
 size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec);
 
-void
+int
 forb_request_init(forb_request_t *req, forb_object obj);
 void
 forb_request_destroy(forb_request_t *req);
diff --git a/syncobj.c b/syncobj.c
new file mode 100644 (file)
index 0000000..966a110
--- /dev/null
+++ b/syncobj.c
@@ -0,0 +1,200 @@
+//----------------------------------------------------------------------
+//  Copyright (C) 2006 - 2007 by the FRESCOR consortium:
+//
+//    Universidad de Cantabria,              SPAIN
+//    University of York,                    UK
+//    Scuola Superiore Sant'Anna,            ITALY
+//    Kaiserslautern University,             GERMANY
+//    Univ. Politecnica  Valencia,           SPAIN
+//    Czech Technical University in Prague,  CZECH REPUBLIC
+//    ENEA                                   SWEDEN
+//    Thales Communication S.A.              FRANCE
+//    Visual Tools S.A.                      SPAIN
+//    Rapita Systems Ltd                     UK
+//    Evidence                               ITALY
+//
+//    See http://www.frescor.org
+//
+//        The FRESCOR project (FP6/2005/IST/5-034026) is funded
+//        in part by the European Union Sixth Framework Programme
+//        The European Union is not liable of any use that may be
+//        made of this code.
+//
+//
+//  based on previous work (FSF) done in the FIRST project
+//
+//   Copyright (C) 2005  Mälardalen University, SWEDEN
+//                       Scuola Superiore S.Anna, ITALY
+//                       Universidad de Cantabria, SPAIN
+//                       University of York, UK
+//
+// This file is part of DTM (Distributed Transaction Manager)
+//
+// DTM is free software; you can redistribute it and/or modify it
+// under terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 2, or (at your option) any
+// later version.  DTM is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details. You should have received a
+// copy of the GNU General Public License along with DTM; see file
+// COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,
+// Cambridge, MA 02139, USA.
+//
+// As a special exception, including DTM header files in a file,
+// instantiating DTM generics or templates, or linking other files
+// with DTM objects to produce an executable application, does not
+// by itself cause the resulting executable application to be covered
+// by the GNU General Public License. This exception does not
+// however invalidate any other reasons why the executable file might be
+// covered by the GNU Public License.
+// -----------------------------------------------------------------------
+
+/*!
+ * @file 
+ *
+ * @brief Synchronization object used in FORB
+ *
+ * @author
+ *      Michael Gonzalez Harbour <mgh@unican.es>
+ *      Michal Sojka <sojkam1@fel.cvut.cz>
+ *      Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ * @comments
+ *
+ * This module contains the definition of the data object and operations to
+ * create a pool of objects, obtain the id of an unused object, wait upon it,
+ * signal it, and finish using it.
+ *
+ * @license
+ *
+ * See the COPYING file in the FORB's root directory
+ *
+ */
+
+#include "syncobj.h"
+
+/**
+ * Initializes the synchronization object.
+ *
+ * @return Zero on success, FOSA error code on error.
+ **/
+
+int forb_syncobj_init(forb_syncobj_t *syncobj, int ceiling)
+{
+        int err;
+
+       err = fosa_cond_init(&syncobj->cond);
+        if (err != 0) return err;
+
+        syncobj->is_work_done = false;
+
+        err = fosa_mutex_init(&syncobj->mutex, ceiling);
+        if (err != 0) return err;
+
+        return 0;
+}
+
+/**
+ * Destroys all resources related to the synchronization object.
+ **/
+
+int forb_syncobj_destroy(forb_syncobj_t *syncobj)
+{
+        int err;
+
+       err = fosa_cond_destroy(&syncobj->cond);
+       if (err != 0) return err;
+       
+       err = fosa_mutex_destroy(&syncobj->mutex);
+       if (err != 0) return err;
+
+        return 0;
+}
+
+/**
+ * Signal the synchronization object.
+ **/
+
+int forb_syncobj_signal(forb_syncobj_t *syncobj)
+{
+        int err;
+
+        err = fosa_mutex_lock(&syncobj->mutex);
+        if (err != 0) return err;
+
+       syncobj->is_work_done = true;
+
+       err = fosa_cond_signal(&syncobj->cond);
+       if (err != 0) goto locked_error;
+
+        err = fosa_mutex_unlock(&syncobj->mutex);
+        if (err != 0) return err;
+
+        return 0;
+
+locked_error:
+        fosa_mutex_unlock(&syncobj->mutex);
+        return err;
+}
+
+/**
+ * Wait on the synchronization object.
+ **/
+
+int forb_syncobj_wait(forb_syncobj_t *syncobj)
+{
+        int err;
+
+        err = fosa_mutex_lock(&syncobj->mutex);
+        if (err != 0) return err;
+
+                while (syncobj->is_work_done == false) {
+                        err = fosa_cond_wait(&syncobj->cond,
+                                             &syncobj->mutex);
+                        if (err != 0) goto locked_error;
+                }
+
+                syncobj->is_work_done = false;
+
+        err = fosa_mutex_unlock(&syncobj->mutex);
+        if (err != 0) return err;
+
+        return 0;
+
+locked_error:
+        fosa_mutex_unlock(&syncobj->mutex);
+        return err;
+}
+
+/**
+ * Wait on the synchronization object with a timeout.
+ *
+ **/
+
+int forb_syncobj_timedwait(forb_syncobj_t *syncobj,
+                          const struct timespec *abstime)
+{
+        int err;
+
+        err = fosa_mutex_lock(&syncobj->mutex);
+        if (err != 0) return err;
+
+        while (syncobj->is_work_done == false) {
+                err = fosa_cond_timedwait(&syncobj->cond,
+                                          &syncobj->mutex,
+                                          abstime);
+                if (err != 0) goto locked_error;
+        }
+
+        syncobj->is_work_done = false;
+
+        err = fosa_mutex_unlock(&syncobj->mutex);
+        if (err != 0) return err;
+
+        return 0;
+
+locked_error:
+        fosa_mutex_unlock(&syncobj->mutex);
+        return err;
+}
diff --git a/syncobj.h b/syncobj.h
new file mode 100644 (file)
index 0000000..f56bce6
--- /dev/null
+++ b/syncobj.h
@@ -0,0 +1,96 @@
+#ifndef SYNC_OBJ_H
+#define SYNC_OBJ_H
+
+//----------------------------------------------------------------------
+//  Copyright (C) 2006 - 2007 by the FRESCOR consortium:
+//
+//    Universidad de Cantabria,              SPAIN
+//    University of York,                    UK
+//    Scuola Superiore Sant'Anna,            ITALY
+//    Kaiserslautern University,             GERMANY
+//    Univ. Politecnica  Valencia,           SPAIN
+//    Czech Technical University in Prague,  CZECH REPUBLIC
+//    ENEA                                   SWEDEN
+//    Thales Communication S.A.              FRANCE
+//    Visual Tools S.A.                      SPAIN
+//    Rapita Systems Ltd                     UK
+//    Evidence                               ITALY
+//
+//    See http://www.frescor.org
+//
+//        The FRESCOR project (FP6/2005/IST/5-034026) is funded
+//        in part by the European Union Sixth Framework Programme
+//        The European Union is not liable of any use that may be
+//        made of this code.
+//
+//
+//  based on previous work (FSF) done in the FIRST project
+//
+//   Copyright (C) 2005  Mälardalen University, SWEDEN
+//                       Scuola Superiore S.Anna, ITALY
+//                       Universidad de Cantabria, SPAIN
+//                       University of York, UK
+//
+// This file is part of DTM (Distributed Transaction Manager)
+//
+// DTM is free software; you can redistribute it and/or modify it
+// under terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 2, or (at your option) any
+// later version.  DTM is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details. You should have received a
+// copy of the GNU General Public License along with DTM; see file
+// COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,
+// Cambridge, MA 02139, USA.
+//
+// As a special exception, including DTM header files in a file,
+// instantiating DTM generics or templates, or linking other files
+// with DTM objects to produce an executable application, does not
+// by itself cause the resulting executable application to be covered
+// by the GNU General Public License. This exception does not
+// however invalidate any other reasons why the executable file might be
+// covered by the GNU Public License.
+// -----------------------------------------------------------------------
+
+/*!
+ * @file dtm_reply_objects.h
+ *
+ * @brief DTM reply objects
+ *
+ * This module contains the definition of the data object and operations to
+ * create a pool of objects, obtain the id of an unused object, wait upon it,
+ * signal it, and finish using it.
+ *
+ * @author Michael Gonzalez Harbour <mgh@unican.es>
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#ifndef _SYNC_OBJ_H_
+#define _SYNC_OBJ_H_
+
+#include <fosa_mutexes_and_condvars.h>
+#include <time.h> /* for timespec */
+#include <fosa_opaque_types.h> /* for FOSA_ETIMEDOUT */
+
+typedef struct forb_syncobj {
+        bool is_work_done;
+        fosa_cond_t cond;
+        fosa_mutex_t mutex;
+} forb_syncobj_t;
+
+
+extern int forb_syncobj_init(forb_syncobj_t *syncobj, int ceiling);
+extern int forb_syncobj_destroy(forb_syncobj_t *syncobj);
+extern int forb_syncobj_signal(forb_syncobj_t *syncobj);
+extern int forb_syncobj_wait(forb_syncobj_t *syncobj);
+extern int forb_syncobj_timedwait(forb_syncobj_t *syncobj,
+                                 const struct timespec *abstime);
+#define FORB_ETIMEDOUT FOSA_ETIMEDOUT
+
+#endif // _SYNC_OBJ_H_
+
+
+#endif