]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_frescan/frescan_bwres_threads.c
updated everything to the new structure... it compiles ok... next step, make sure...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
index 9d7aefe06edbcca0235f50b88ec234982cfa1636..2f78a87ddc257711d9c56bff0abe430c5c6ed069 100644 (file)
@@ -24,9 +24,6 @@
 #include "frescan_data.h"
 #include "frescan_servers.h"
 
-static void *frescan_manager_thread(void *arg);
-static void *frescan_acceptor_thread(void *arg);
-
 /**
  * frescan_manager_thread_create()
  *
@@ -34,80 +31,59 @@ static void *frescan_acceptor_thread(void *arg);
  * in a request queue for LOCAL or EXTERNAL requests.
  */
 
+static void *frescan_manager_thread(void *arg);
+
 int frescan_manager_thread_create(frescan_network_t net)
 {
         int ret;
         fosa_thread_attr_t attr;
 
         ret = fosa_thread_attr_init(&attr);
-        if (ret != 0) {
-                ERROR("could not init thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
-        if (ret != 0) {
-                ERROR("could not set neg thread prio %d\n",
-                      FRESCAN_NEG_THREAD_PRIO);
-                return ret;
-        }
+        if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
-                                  &attr,
-                                  frescan_master_neg_thread,
-                                  (void *)(uint32_t)net);
-
-        if (ret != 0) {
-                ERROR("could not create the negotiator thread\n");
-                return ret;
-        }
+        ret = fosa_thread_create(&the_networks[net].manager_thread_id,
+                                 &attr,
+                                 frescan_manager_thread,
+                                 (void *)(uint32_t)net);
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_destroy(&attr);
-        if (ret != 0) {
-                ERROR("could not destroy thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         return 0;
 }
 
 /**
  * frescan_acceptor_thread_create()
+ *
+ * This call creates the acceptor thread which will be waiting negotiation
+ * messages from the network and converting them into requests.
  */
 
+static void *frescan_acceptor_thread(void *arg);
+
 int frescan_acceptor_thread_create(frescan_network_t net)
 {
         int ret;
         fosa_thread_attr_t attr;
 
         ret = fosa_thread_attr_init(&attr);
-        if (ret != 0) {
-                ERROR("could not init thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
-        if (ret != 0) {
-                ERROR("could not set acceptor thread prio %d\n",
-                      FRESCAN_ACCEPTOR_THREAD_PRIO);
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
-                                  &attr,
-                                  frescan_acceptor_thread,
-                                  (void *)(uint32_t)net);
-
-        if (ret != 0) {
-                ERROR("could not create the negotiator thread\n");
-                return ret;
-        }
+                                 &attr,
+                                 frescan_acceptor_thread,
+                                 (void *)(uint32_t)net);
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_destroy(&attr);
-        if (ret != 0) {
-                ERROR("could not destroy thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         return 0;
 }
@@ -116,72 +92,56 @@ int frescan_acceptor_thread_create(frescan_network_t net)
  * frescan_manager_thread
  */
 
+static int frescan_manager_neg(frescan_request_data_t *req_data);
+static int frescan_manager_reneg(frescan_request_data_t *req_data);
+static int frescan_manager_cancel(frescan_request_data_t *req_data);
+static int frescan_manager_repneg(frescan_request_data_t *req_data);
+
 static void *frescan_manager_thread(void *arg)
 {
-        int err;
-        frescan_request_id_t request;
-        frescan_req_type_t   type;
-        frescan_robj_id_t    reply;
-        frescan_contract_t   *contract;
-        frescan_neg_return_info_t *neg_return_info;
-        frescan_server_params_t server_params;
+        int ret;
+        frescan_request_id_t   req;
+        frescan_request_data_t *req_data;
         frescan_network_t net = (uint32_t)arg;
 
-        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
-
-                err = frescan_requestqueue_dequeue(&request);
-                assert(err == 0);
-
-                err = frescan_request_get_type(request, &type);
-                assert(err == 0);
-
-                switch(type) {
-                        case FRESCAN_NEGOTIATE:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_NEGOTIATE request\n");
-                                err = frescan_request_get_contract(request, &contract);
-                                assert(err == 0);
-                                err = frescan_request_get_reply(request, &reply);
-                                assert(err == 0);
-                                err = frescan_request_get_return_info
-                                                (request, (void *)&neg_return_info);
-                                assert(err == 0);
-
-                        // TODO: sched test + add contract to table
-                        // so far always accepted witht he min values
-                                neg_return_info->error = 0;
-                                server_params.values = contract->min_values;
-                                server_params.prio   = contract->prio;
-
-                                err = frescan_servers_create
-                                                (net,
-                                                &server_params,
-                                                &neg_return_info->id);
-                                assert(err == 0);
-
-                                err = frescan_replyobject_signal(reply);
-                                assert(err == 0);
-                                break;
+                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
 
-                        case FRESCAN_RENEGOTIATE:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_RENEGOTIATE request\n");
-                                break;
+                ret = frescan_requests_dequeue(&req);
+                assert(ret == 0);
 
-                        case FRESCAN_CANCEL:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_CANCEL request\n");
-                                break;
+                ret = frescan_requests_get_data(req, &req_data);
+                assert(ret == 0);
 
+                switch(req_data->type) {
+                        case FRESCAN_REQ_NEG:
+                                ret = frescan_manager_neg(req_data);
+                                assert(ret == 0);
+                                break;
+                        case FRESCAN_REQ_RENEG:
+                                ret = frescan_manager_reneg(req_data);
+                                assert(ret == 0);
+                                break;
+                        case FRESCAN_REQ_CANCEL:
+                                ret = frescan_manager_cancel(req_data);
+                                assert(ret == 0);
+                                break;
+                        case FRESCAN_REP_NEG:
+                                ret = frescan_manager_repneg(req_data);
+                                assert(ret == 0);
+                                break;
                         default:
-                                ERROR("wrong request type %d\n", type);
+                                ERROR("request type not supported\n");
+                                assert(0);
                 }
-        }
 
-        return NULL;
+                if(req_data->request_node != the_networks[net].local_node) {
+                        ret = frescan_requests_free(req);
+                        assert(ret == 0);
+                }
+        }
 }
 
 /**
@@ -191,35 +151,91 @@ static void *frescan_manager_thread(void *arg)
 static void *frescan_acceptor_thread(void *arg)
 {
         int ret;
-        frescan_recv_params_t params;
-        uint8_t msg[200];
+        frescan_recv_params_t recv_params;
+        uint8_t msg[2000]; // TODO: use a constant with the max neg message size
         size_t recv_bytes;
         frescan_node_t from;
         frescan_prio_t prio;
+        frescan_request_id_t req;
+        frescan_request_data_t *req_data;
 
-        DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-              "master acceptor thread starts\n");
+        DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG, "acceptor th starts\n");
 
-        params.net = (frescan_network_t)(uint32_t)arg;
-        params.channel = FRESCAN_NEG_CHANNEL;
-        params.flags = FRESCAN_SYNC;
+        recv_params.net     = (frescan_network_t)(uint32_t)arg;
+        recv_params.channel = FRESCAN_NEG_CHANNEL;
+        recv_params.flags   = FRESCAN_SYNC;
 
         while(1) {
                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-                      "waiting for msg, net:%u chan:%u flags:%u\n",
-                      params.net, params.channel, params.flags);
-
-                ret = frescan_recv(&params, msg, 200,
-                                   &recv_bytes, &from, &prio);
+                      "waiting for a msg, net:%u chan:%u flags:%u\n",
+                      recv_params.net, recv_params.channel, recv_params.flags);
+
+                ret = frescan_recv(&recv_params,
+                                   msg,
+                                   sizeof(msg),
+                                   &recv_bytes,
+                                   &from,
+                                   &prio);
                 assert(ret == 0);
 
                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
                       "msg received, from:%u size:%u prio:%u\n",
                       from, recv_bytes, prio);
 
-                ret = frescan_message_parse(params.net, msg, recv_bytes, from);
+                ret = frescan_requests_alloc(&req);
+                assert(ret == 0);
+
+                ret = frescan_requests_get_data(req, &req_data);
+                assert(ret == 0);
+
+                ret = frescan_message_to_request(msg, req_data);
+                assert(ret == 0);
+
+                req_data->request_node = from;
+
+                ret = frescan_requests_enqueue(req);
                 assert(ret == 0);
         }
 
         return NULL;
 }
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_neg(frescan_request_data_t *req_data)
+{
+        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
+        return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_reneg(frescan_request_data_t *req_data)
+{
+        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
+        return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_cancel(frescan_request_data_t *req_data)
+{
+        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
+        return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_repneg(frescan_request_data_t *req_data)
+{
+        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
+        return 0;
+}