]> rtime.felk.cvut.cz Git - frescor/fna.git/commitdiff
joining threads module
authorsangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Tue, 15 Apr 2008 07:43:54 +0000 (07:43 +0000)
committersangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Tue, 15 Apr 2008 07:43:54 +0000 (07:43 +0000)
git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1106 35b4ef3e-fd22-0410-ab77-dab3279adceb

src_frescan/frescan_acceptor_threads.c
src_frescan/frescan_acceptor_threads.h

index 01153218ce135570637ba203210c0515f0e74486..65118b487b0f9c0592cfc7624f01ddd551d9fa39 100644 (file)
@@ -1,9 +1,10 @@
 /*!
- * @file frescan_acceptor_threads.c
+ * @file frescan_negotiation_threads.h
  *
- * @brief FRESCAN acceptor threads
+ * @brief FRESCAN negotiation threads
  *
- * This module contains the acceptor threads, with an operation to create them.
+ * This module contains the acceptor threads and the master thread for local
+ * negotiations, with functions to create them.
  *
  * @version 0.01
  *
 
 #include <assert.h>
 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
-#include "frescan_acceptor_threads.h"
+#include "frescan_negotiation_threads.h"
 #include "frescan_config.h"
 #include "frescan_debug.h"
 #include "frescan_data.h"
 #include "frescan_negotiation_messages.h"
+#include "frescan_requests_queue.h"
+#include "frescan_servers.h"
 
 static void *frescan_acceptor_thread(void *arg);
+static void *frescan_master_neg_thread(void *arg);
 
 /**
  * frescan_acceptor_thread_create()
@@ -45,7 +49,7 @@ int frescan_acceptor_thread_create(frescan_network_t net)
                 return ret;
         }
 
-        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
+        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
                                   &attr,
                                   frescan_acceptor_thread,
                                   (void *)(uint32_t)net);
@@ -64,13 +68,51 @@ int frescan_acceptor_thread_create(frescan_network_t net)
         return 0;
 }
 
+/**
+ * frescan_master_neg_thread_create()
+ */
+
+int frescan_master_neg_thread_create(frescan_network_t net)
+{
+        int ret;
+        fosa_thread_attr_t attr;
+
+        ret = fosa_thread_attr_init(&attr);
+        if (ret != 0) {
+                ERROR("could not init thread attributes\n");
+                return ret;
+        }
+
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
+        if (ret != 0) {
+                ERROR("could not set neg thread prio %d\n",
+                      FRESCAN_NEG_THREAD_PRIO);
+                return ret;
+        }
+
+        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
+                                  &attr,
+                                  frescan_master_neg_thread,
+                                  (void *)(uint32_t)net);
+
+        if (ret != 0) {
+                ERROR("could not create the negotiator thread\n");
+                return ret;
+        }
+
+        ret = fosa_thread_attr_destroy(&attr);
+        if (ret != 0) {
+                ERROR("could not destroy thread attributes\n");
+                return ret;
+        }
+
+        return 0;
+}
+
 /**
  * frescan_acceptor_thread()
  *
- * a loop waiting for negotiation requests on the network. When it receives
- * a request it acts like a normal thread requesting a negotiation to the
- * negotiator thread. Once it gets the results it sends them back to the
- * node that asked the negotiation.
+ * a loop waiting for messages on the network and parsing them.
  */
 
 static void *frescan_acceptor_thread(void *arg)
@@ -108,3 +150,75 @@ static void *frescan_acceptor_thread(void *arg)
 
         return NULL;
 }
+
+/**
+ * frescan_master_neg_thread
+ *
+ * the thread that negotiates LOCAL contracts in the MASTER node.
+ */
+
+static void *frescan_master_neg_thread(void *arg)
+{
+        int err;
+        frescan_request_id_t request;
+        frescan_req_type_t   type;
+        frescan_robj_id_t    reply;
+        frescan_contract_t   *contract;
+        frescan_neg_return_info_t *neg_return_info;
+        frescan_server_params_t server_params;
+
+        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+
+        while(1) {
+                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
+
+                err = frescan_requestqueue_dequeue(&request);
+                assert(err == 0);
+
+                err = frescan_request_get_type(request, &type);
+                assert(err == 0);
+
+                switch(type) {
+                case FRESCAN_NEGOTIATE:
+                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                "FRESCAN_NEGOTIATE request\n");
+                        err = frescan_request_get_contract(request, &contract);
+                        assert(err == 0);
+                        err = frescan_request_get_reply(request, &reply);
+                        assert(err == 0);
+                        err = frescan_request_get_return_info
+                                        (request, (void *)&neg_return_info);
+                        assert(err == 0);
+
+                        // TODO: sched test + add contract to table
+                        // so far always accepted witht he min values
+                        neg_return_info->error = 0;
+                        server_params.values = contract->min_values;
+                        server_params.prio   = contract->prio;
+                        err = frescan_servers_create
+                                       ((frescan_network_t)(uint32_t)arg,
+                                        &server_params,
+                                        &neg_return_info->id);
+                        assert(err == 0);
+
+                        err = frescan_replyobject_signal(reply);
+                        assert(err == 0);
+                        break;
+
+                case FRESCAN_RENEGOTIATE:
+                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                "FRESCAN_RENEGOTIATE request\n");
+                        break;
+
+                case FRESCAN_CANCEL:
+                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                "FRESCAN_CANCEL request\n");
+                        break;
+
+                default:
+                        ERROR("wrong request type %d\n", type);
+                }
+        }
+
+        return NULL;
+}
index d9aa1a6ed109e3dfa1ecc3886d6a0e88f7d926a4..d4ba22586d025b293a21c5801c2394a496a09aa3 100644 (file)
@@ -1,9 +1,10 @@
 /*!
- * @file frescan_acceptor_threads.h
+ * @file frescan_negotiation_threads.h
  *
- * @brief FRESCAN acceptor threads
+ * @brief FRESCAN negotiation threads
  *
- * This module contains the acceptor threads, with an operation to create them.
+ * This module contains the acceptor threads and the master thread for local
+ * negotiations, with functions to create them.
  *
  * @version 0.01
  *
  *
  */
 
-#ifndef _FRESCAN_ACCEPTOR_THREADS_H_
-#define _FRESCAN_ACCEPTOR_THREADS_H_
+#ifndef _FRESCAN_NEGOTIATION_THREADS_H_
+#define _FRESCAN_NEGOTIATION_THREADS_H_
 
 #include "frescan.h"
 
+/**
+ * frescan_master_neg_thread_create()
+ *
+ * This call creates the thread in charge of LOCAL negotiations at the
+ * MASTER node, so in the rest of nodes it doesnt have to be called at
+ * initialization. This thread will await in a local request queue for
+ * LOCAL negotiation requests from threads in the same CPU.
+ * In the case of SLAVE nodes, the negotiation requests are simply performed
+ * by sending an appropiate message to the MASTER node and then awaiting
+ * in a reply object until an acceptor thread signals it.
+ */
+
+extern int frescan_master_neg_thread_create(frescan_network_t net);
+
+/**
+ * frescan_acceptor_thread_create()
+ *
+ * This call is called in every node (including the MASTER node) conforming
+ * a set of threads, one at each node, that awaits negotiation messages
+ * from a receive endpoint and perform the corresponding operations.
+ */
+
 extern int frescan_acceptor_thread_create(frescan_network_t net);
 
-#endif // _FRESCAN_ACCEPTOR_THREADS_H_
+#endif // _FRESCAN_NEGOTIATION_THREADS_H_