]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_frescan/frescan_bwres_threads.c
renamings... redo the request and messages part... also now there will be two threads...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
index da7623e5451fe82fdf57006529c8acddb88e04f1..9d7aefe06edbcca0235f50b88ec234982cfa1636 100644 (file)
 #include <assert.h>
 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
 #include "frescan_bwres_threads.h"
+#include "frescan_bwres_messages.h"
+#include "frescan_bwres_requests.h"
 #include "frescan_config.h"
 #include "frescan_debug.h"
 #include "frescan_data.h"
-#include "frescan_bwres_messages.h"
-#include "frescan_requests.h"
 #include "frescan_servers.h"
 
+static void *frescan_manager_thread(void *arg);
 static void *frescan_acceptor_thread(void *arg);
-static void *frescan_master_neg_thread(void *arg);
 
 /**
- * frescan_acceptor_thread_create()
+ * frescan_manager_thread_create()
+ *
+ * This call creates the manager thread at each node which will be waiting
+ * in a request queue for LOCAL or EXTERNAL requests.
  */
 
-int frescan_acceptor_thread_create(frescan_network_t net)
+int frescan_manager_thread_create(frescan_network_t net)
 {
         int ret;
         fosa_thread_attr_t attr;
@@ -42,16 +45,16 @@ int frescan_acceptor_thread_create(frescan_network_t net)
                 return ret;
         }
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
         if (ret != 0) {
-                ERROR("could not set acceptor thread prio %d\n",
-                      FRESCAN_ACCEPTOR_THREAD_PRIO);
+                ERROR("could not set neg thread prio %d\n",
+                      FRESCAN_NEG_THREAD_PRIO);
                 return ret;
         }
 
-        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
+        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
                                   &attr,
-                                  frescan_acceptor_thread,
+                                  frescan_master_neg_thread,
                                   (void *)(uint32_t)net);
 
         if (ret != 0) {
@@ -69,10 +72,10 @@ int frescan_acceptor_thread_create(frescan_network_t net)
 }
 
 /**
- * frescan_master_neg_thread_create()
+ * frescan_acceptor_thread_create()
  */
 
-int frescan_master_neg_thread_create(frescan_network_t net)
+int frescan_acceptor_thread_create(frescan_network_t net)
 {
         int ret;
         fosa_thread_attr_t attr;
@@ -83,16 +86,16 @@ int frescan_master_neg_thread_create(frescan_network_t net)
                 return ret;
         }
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
         if (ret != 0) {
-                ERROR("could not set neg thread prio %d\n",
-                      FRESCAN_NEG_THREAD_PRIO);
+                ERROR("could not set acceptor thread prio %d\n",
+                      FRESCAN_ACCEPTOR_THREAD_PRIO);
                 return ret;
         }
 
-        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
+        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
                                   &attr,
-                                  frescan_master_neg_thread,
+                                  frescan_acceptor_thread,
                                   (void *)(uint32_t)net);
 
         if (ret != 0) {
@@ -109,10 +112,80 @@ int frescan_master_neg_thread_create(frescan_network_t net)
         return 0;
 }
 
+/**
+ * frescan_manager_thread
+ */
+
+static void *frescan_manager_thread(void *arg)
+{
+        int err;
+        frescan_request_id_t request;
+        frescan_req_type_t   type;
+        frescan_robj_id_t    reply;
+        frescan_contract_t   *contract;
+        frescan_neg_return_info_t *neg_return_info;
+        frescan_server_params_t server_params;
+        frescan_network_t net = (uint32_t)arg;
+
+        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+
+        while(1) {
+                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
+
+                err = frescan_requestqueue_dequeue(&request);
+                assert(err == 0);
+
+                err = frescan_request_get_type(request, &type);
+                assert(err == 0);
+
+                switch(type) {
+                        case FRESCAN_NEGOTIATE:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_NEGOTIATE request\n");
+                                err = frescan_request_get_contract(request, &contract);
+                                assert(err == 0);
+                                err = frescan_request_get_reply(request, &reply);
+                                assert(err == 0);
+                                err = frescan_request_get_return_info
+                                                (request, (void *)&neg_return_info);
+                                assert(err == 0);
+
+                        // TODO: sched test + add contract to table
+                        // so far always accepted witht he min values
+                                neg_return_info->error = 0;
+                                server_params.values = contract->min_values;
+                                server_params.prio   = contract->prio;
+
+                                err = frescan_servers_create
+                                                (net,
+                                                &server_params,
+                                                &neg_return_info->id);
+                                assert(err == 0);
+
+                                err = frescan_replyobject_signal(reply);
+                                assert(err == 0);
+                                break;
+
+                        case FRESCAN_RENEGOTIATE:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_RENEGOTIATE request\n");
+                                break;
+
+                        case FRESCAN_CANCEL:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_CANCEL request\n");
+                                break;
+
+                        default:
+                                ERROR("wrong request type %d\n", type);
+                }
+        }
+
+        return NULL;
+}
+
 /**
  * frescan_acceptor_thread()
- *
- * a loop waiting for messages on the network and parsing them.
  */
 
 static void *frescan_acceptor_thread(void *arg)
@@ -150,77 +223,3 @@ static void *frescan_acceptor_thread(void *arg)
 
         return NULL;
 }
-
-/**
- * frescan_master_neg_thread
- *
- * the thread that negotiates LOCAL contracts in the MASTER node.
- */
-
-static void *frescan_master_neg_thread(void *arg)
-{
-        int err;
-        frescan_request_id_t request;
-        frescan_req_type_t   type;
-        frescan_robj_id_t    reply;
-        frescan_contract_t   *contract;
-        frescan_neg_return_info_t *neg_return_info;
-        frescan_server_params_t server_params;
-        frescan_network_t net = (uint32_t)arg;
-
-        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
-
-        while(1) {
-                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
-
-                err = frescan_requestqueue_dequeue(&request);
-                assert(err == 0);
-
-                err = frescan_request_get_type(request, &type);
-                assert(err == 0);
-
-                switch(type) {
-                case FRESCAN_NEGOTIATE:
-                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                "FRESCAN_NEGOTIATE request\n");
-                        err = frescan_request_get_contract(request, &contract);
-                        assert(err == 0);
-                        err = frescan_request_get_reply(request, &reply);
-                        assert(err == 0);
-                        err = frescan_request_get_return_info
-                                        (request, (void *)&neg_return_info);
-                        assert(err == 0);
-
-                        // TODO: sched test + add contract to table
-                        // so far always accepted witht he min values
-                        neg_return_info->error = 0;
-                        server_params.values = contract->min_values;
-                        server_params.prio   = contract->prio;
-
-                        err = frescan_servers_create
-                                       (net,
-                                        &server_params,
-                                        &neg_return_info->id);
-                        assert(err == 0);
-
-                        err = frescan_replyobject_signal(reply);
-                        assert(err == 0);
-                        break;
-
-                case FRESCAN_RENEGOTIATE:
-                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                "FRESCAN_RENEGOTIATE request\n");
-                        break;
-
-                case FRESCAN_CANCEL:
-                        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                "FRESCAN_CANCEL request\n");
-                        break;
-
-                default:
-                        ERROR("wrong request type %d\n", type);
-                }
-        }
-
-        return NULL;
-}