]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_frescan/frescan_bwres_threads.c
add group negotiations to frescan and change all the requests and messages to map...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
index fed6212169192b8f70a175cc093fffa477d33e65..38bcc21f535b0fd77157962bb3df874a555057d2 100644 (file)
@@ -69,6 +69,7 @@
 #include "frescan_bwres_threads.h"
 #include "frescan_bwres_messages.h"
 #include "frescan_bwres_requests.h"
+#include "frescan_bwres_robjs.h"
 #include "frescan_bwres_analysis.h"
 #include "frescan_bwres_mode_change.h"
 #include "frescan_config.h"
 #include "frescan_data.h"
 #include "frescan_servers.h"
 
+static void *frescan_manager_thread(void *arg);
+static void *frescan_acceptor_thread(void *arg);
+
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_gn_restore_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data);
+
 /**
  * frescan_manager_thread_create()
  *
@@ -83,8 +99,6 @@
  * in a request queue for LOCAL or EXTERNAL requests.
  */
 
-static void *frescan_manager_thread(void *arg);
-
 int frescan_manager_thread_create(frescan_network_t net)
 {
         int ret;
@@ -93,10 +107,10 @@ int frescan_manager_thread_create(frescan_network_t net)
         ret = fosa_thread_attr_init(&attr);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_NEG_THREAD_PRIO);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].manager_thread_id,
+        ret = fosa_thread_create(&frescan_data[net].manager_thread_id,
                                  &attr,
                                  frescan_manager_thread,
                                  (void *)(uint32_t)net);
@@ -115,8 +129,6 @@ int frescan_manager_thread_create(frescan_network_t net)
  * messages from the network and converting them into requests.
  */
 
-static void *frescan_acceptor_thread(void *arg);
-
 int frescan_acceptor_thread_create(frescan_network_t net)
 {
         int ret;
@@ -125,10 +137,10 @@ int frescan_acceptor_thread_create(frescan_network_t net)
         ret = fosa_thread_attr_init(&attr);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_ACCEPTOR_PRIO);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
+        ret = fosa_thread_create(&frescan_data[net].acceptor_thread_id,
                                  &attr,
                                  frescan_acceptor_thread,
                                  (void *)(uint32_t)net);
@@ -144,54 +156,47 @@ int frescan_acceptor_thread_create(frescan_network_t net)
  * frescan_manager_thread
  */
 
-static void frescan_manager_neg(frescan_request_data_t *req_data);
-static void frescan_manager_reneg(frescan_request_data_t *req_data);
-static void frescan_manager_cancel(frescan_request_data_t *req_data);
-static void frescan_manager_repneg(frescan_request_data_t *req_data);
-static void frescan_manager_budget_change(frescan_request_data_t *req_data);
-
 static void *frescan_manager_thread(void *arg)
 {
         int ret;
-        frescan_request_id_t   req;
-        frescan_request_data_t *req_data;
+        frescan_bwres_request_id_t   req;
+        frescan_bwres_request_data_t *req_data;
         frescan_network_t net = (uint32_t)arg;
 
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "wait for a request\n");
 
-                ret = frescan_requests_dequeue(&req);
+                ret = frescan_bwres_requests_dequeue(&req);
                 assert(ret == 0);
 
-                ret = frescan_requests_get_data(req, &req_data);
+                ret = frescan_bwres_requests_get_data(req, &req_data);
                 assert(ret == 0);
 
                 switch(req_data->type) {
-                        case FRESCAN_REQ_NEG:
-                                frescan_manager_neg(req_data);
-                                break;
-                        case FRESCAN_REQ_RENEG:
-                                frescan_manager_reneg(req_data);
-                                break;
-                        case FRESCAN_REQ_CANCEL:
-                                frescan_manager_cancel(req_data);
+                        case FRESCAN_BWRES_REQ_GN:
+                                frescan_manager_req_gn(req_data);
                                 break;
-                        case FRESCAN_REP_DEC_BUDGET:
-                        case FRESCAN_REP_INC_BUDGET:
-                                frescan_manager_budget_change(req_data);
+                        case FRESCAN_BWRES_REP_GN:
+                                frescan_manager_rep_gn(req_data);
                                 break;
-                        case FRESCAN_REP_NEG:
-                                frescan_manager_repneg(req_data);
+                        case FRESCAN_BWRES_REQ_MC:
+                                frescan_manager_req_mc(req_data);
                                 break;
+                        case FRESCAN_BWRES_REQ_RES:
+                        case FRESCAN_BWRES_REQ_RES_GET:
+                        case FRESCAN_BWRES_REP_RES_GET:
+                        case FRESCAN_BWRES_REQ_RES_SET:
+                        case FRESCAN_BWRES_REQ_RES_COMMIT:
+                        case FRESCAN_BWRES_REQ_RES_CANCEL:
                         default:
                                 FRESCAN_ERROR("request type not supported\n");
                                 assert(0);
                 }
 
-                if(req_data->request_node != the_networks[net].local_node) {
-                        ret = frescan_requests_free(req); // acceptor request
+                if(req_data->request_node != frescan_data[net].local_node) {
+                        ret = frescan_bwres_requests_free(req);
                         assert(ret == 0);
                 }
         }
@@ -204,16 +209,16 @@ static void *frescan_manager_thread(void *arg)
 static void *frescan_acceptor_thread(void *arg)
 {
         int ret;
-        frescan_request_id_t req;
+        frescan_bwres_request_id_t req;
         frescan_network_t net = (uint32_t)arg;
 
-        DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
+        DEBUG(FRESCAN_BWRES_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
 
         while(1) {
                 ret = frescan_messages_recv_request(net, &req);
                 assert(ret == 0);
 
-                ret = frescan_requests_enqueue(req);
+                ret = frescan_bwres_requests_enqueue(req);
                 assert(ret == 0);
         }
 
@@ -221,256 +226,200 @@ static void *frescan_acceptor_thread(void *arg)
 }
 
 /**
- * frescan_manager_neg
+ * frescan_manager_req_gn
  */
 
-static void frescan_manager_neg(frescan_request_data_t *req_data)
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data)
 {
         int ret;
+        frescan_node_t me;
         bool accepted;
-        frescan_sa_scenario_t *scenario;
-
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
-
-        scenario = &the_networks[req_data->net].scenario;
+        frescan_bwres_sa_scenario_t *scenario;
 
-        if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "add contract to scenario\n");
+        me = frescan_data[req_data->net].local_node;
 
-                ret = frescan_sa_add_contract
-                                (scenario,
-                                 req_data->ss,
-                                 req_data->request_node,
-                                 req_data->contract);
+        if (me != FRESCAN_BWRES_MASTER_NODE) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "send gn req to master\n");
+                ret = frescan_messages_send_request(req_data);
                 assert(ret == 0);
+                return;
+        }
 
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "perform sched analysis\n");
-
-                ret = frescan_sa_sched_test(scenario, &accepted);
-                assert(ret == 0);
+        scenario = &frescan_data[req_data->net].scenario;
 
-                if (accepted) {
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                              "schedulable! distribute spare capacity\n");
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "prepare new scenario\n");
+        frescan_manager_gn_prepare_scenario(scenario, req_data);
 
-                        ret = frescan_sa_spare_capacity(scenario);
-                        assert(ret == 0);
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "perform sched analysis\n");
+        ret = frescan_bwres_sa_sched_test(scenario, &accepted);
+        assert(ret == 0);
 
-                        req_data->return_value = FRESCAN_REQ_ACCEPTED;
+        if (accepted) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "ACCEPTED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_ACCEPTED;
 
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                              "perform the mode change protocol!\n");
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "spare capacity and mode change\n");
 
-                        ret = frescan_bwres_mode_change_protocol(req_data);
-                        assert(ret == 0);
+                ret = frescan_bwres_sa_spare_capacity(scenario);
+                assert(ret == 0);
 
-                        if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
-                                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                                      "signal local request\n");
-                                ret = frescan_bwres_robjs_signal
-                                                        (req_data->robj);
-                                assert(ret == 0);
-                        }
-                } else {
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                              "not schedulable!\n");
-
-                        ret = frescan_sa_remove_contract
-                                        (scenario,
-                                         req_data->ss,
-                                         req_data->request_node);
-                        assert(ret == 0);
+                ret = frescan_bwres_mode_change_protocol(req_data);
+                assert(ret == 0);
+        } else {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "FAILED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_NOT_ACCEPTED;
+                frescan_manager_gn_restore_scenario(scenario, req_data);
+        }
 
-                        req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
-
-                        if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
-                                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                                      "signal local request\n");
-                                ret = frescan_bwres_robjs_signal(req_data->robj);
-                                assert(ret == 0);
-                        } else {
-                                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                                      "sending reply\n");
-                                req_data->type = FRESCAN_REP_NEG;
-                                ret = frescan_messages_send_request(req_data);
-                                assert(ret == 0);
-                        }
-                }
+        if (req_data->request_node == me) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "signal robj\n");
+                ret = frescan_bwres_robjs_signal(req_data->robj);
+                assert(ret == 0);
         } else {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "send negotiation request to master\n");
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "send reply\n");
+                req_data->type = FRESCAN_BWRES_REP_GN;
                 ret = frescan_messages_send_request(req_data);
                 assert(ret == 0);
         }
 }
 
 /**
- * frescan_manager_reneg
+ * frescan_manager_gn_prepare_scenario
  */
 
-static void frescan_manager_reneg(frescan_request_data_t *req_data)
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data)
 {
-        int ret;
-        bool is_schedulable;
-        frsh_contract_t old_contract;
-        frescan_sa_scenario_t *scenario;
-
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
-
-        scenario = &the_networks[req_data->net].scenario;
+        int ret, i;
+        frescan_server_params_t server_params;
+
+        // NEG-GROUP
+        server_params.budget         = 0;
+        server_params.period.tv_sec  = 0;
+        server_params.period.tv_nsec = 0;
+        server_params.prio           = 0;
+
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = frescan_servers_create(req_data->net,
+                                             &server_params,
+                                             &req_data->ss_new->ss[i]);
+                assert(ret == 0);
 
-        if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
-                // scheduling analysis
-                ret = frescan_sa_update_contract
+                ret = frescan_bwres_sa_add_contract
                                 (scenario,
-                                 req_data->ss,
+                                 req_data->ss_new->ss[i],
                                  req_data->request_node,
-                                 req_data->contract,
-                                 &old_contract);
+                                 &req_data->contracts_to_neg->contracts[i]);
                 assert(ret == 0);
+        }
 
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "perform sched analysis\n");
-
-                ret = frescan_sa_sched_test(scenario, &is_schedulable);
+        // RENEG-GROUP
+        scenario->backup_contracts_to_reneg.size =
+                                        req_data->contracts_to_reneg->size;
+
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &req_data->contracts_to_reneg->contracts[i],
+                         &scenario->backup_contracts_to_reneg.contracts[i]);
                 assert(ret == 0);
+        }
 
-                if (is_schedulable) {
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                              "schedulable! distribute spare capacity\n");
-
-                        ret = frescan_sa_spare_capacity(scenario);
-                        assert(ret == 0);
-
-                        req_data->return_value = FRESCAN_REQ_ACCEPTED;
-
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                              "perform the mode change protocol!\n");
+        // CANCEL-GROUP
+        scenario->backup_contracts_to_cancel.size =
+                                                req_data->ss_to_cancel->size;
 
-                        ret = frescan_bwres_mode_change_protocol(req_data);
-                        assert(ret == 0);
-                } else {
-                        ret = frescan_sa_update_contract
-                                        (scenario,
-                                         req_data->ss,
-                                         req_data->request_node,
-                                         &old_contract,
-                                         NULL);
-                        assert(ret == 0);
-                        req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
-                }
-
-                // signal or reply the results
-                if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
-                        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
-                        ret = frescan_bwres_robjs_signal(req_data->robj);
-                        assert(ret == 0);
-                }
-        } else {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "send renegotiation request to master\n");
-                ret = frescan_messages_send_request(req_data);
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
                 assert(ret == 0);
         }
 }
 
 /**
- * frescan_manager_cancel
+ * frescan_manager_gn_restore_scenario
  */
 
-static void frescan_manager_cancel(frescan_request_data_t *req_data)
+static void frescan_manager_gn_restore_scenario
+                (frescan_bwres_sa_scenario_t  *scenario,
+                 frescan_bwres_request_data_t *req_data)
 {
-        int ret;
-        bool is_schedulable;
-        frescan_sa_scenario_t *scenario;
-
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
+        int ret, i;
 
-        scenario = &the_networks[req_data->net].scenario;
-
-        if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
-                ret = frescan_sa_remove_contract
+        // NEG-GROUP
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
                                 (scenario,
-                                 req_data->ss,
-                                 req_data->request_node);
-                assert(ret == 0);
-
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "assign priorities\n");
-
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "perform sched analysis\n");
-
-                ret = frescan_sa_sched_test(scenario, &is_schedulable);
+                                 req_data->ss_new->ss[i],
+                                 req_data->request_node,
+                                 NULL);
                 assert(ret == 0);
 
-                assert(is_schedulable == true);
-
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "redistribute spare capacity\n");
-
-                ret = frescan_sa_spare_capacity(scenario);
+                ret = frescan_servers_destroy(req_data->net,
+                                              req_data->ss_new->ss[i]);
                 assert(ret == 0);
+        }
 
-                ret = frescan_bwres_mode_change_protocol(req_data);
-                assert(ret == 0);
-        } else {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
-                      "send cancel request to master\n");
-                ret = frescan_messages_send_request(req_data);
+        // RENEG-GROUP
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_reneg.contracts[i],
+                         NULL);
                 assert(ret == 0);
         }
 
-        if (req_data->request_node == the_networks[req_data->net].local_node) {
-                ret = frescan_bwres_robjs_signal(req_data->robj);
+        // CANCEL-GROUP
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_add_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
                 assert(ret == 0);
         }
 }
 
 /**
- * frescan_manager_budget_change
+ * frescan_manager_rep_gn
  */
 
-static void frescan_manager_budget_change(frescan_request_data_t *req_data)
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data)
 {
         int ret;
-        frescan_sa_mode_change_type_t mode_change_type;
-        frescan_node_t me = the_networks[req_data->net].local_node;
+        frescan_bwres_request_data_t *orig_req_data;
 
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "budget change request\n");
+        ret = frescan_bwres_requests_get_data(req_data->req, &orig_req_data);
+        assert(ret == 0);
 
-        if (req_data->type == FRESCAN_REP_INC_BUDGET) {
-                mode_change_type = FRESCAN_SA_BUDGET_INC;
-        } else {
-                mode_change_type = FRESCAN_SA_BUDGET_DEC;
-        }
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+              "reply to gn, return value:%d\n", req_data->return_value);
+
+        orig_req_data->return_value = req_data->return_value;
 
-        ret = frescan_bwres_budget_change(req_data,
-                                          me,
-                                          mode_change_type);
+        ret = frescan_bwres_robjs_signal(orig_req_data->robj);
         assert(ret == 0);
 }
 
 /**
- * frescan_manager_repneg
+ * frescan_manager_req_mc
  */
 
-static void frescan_manager_repneg(frescan_request_data_t *req_data)
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data)
 {
-//         int ret;
-//         frescan_request_data_t *neg_req_data;
-//
-//         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
-//
-//         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
-//         assert(ret == 0);
-//
-//         neg_req_data->return_value = req_data->return_value;
-//         neg_req_data->final_values = req_data->final_values;
-//
-//         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
-//         assert(ret == 0);
+        int ret;
 
+        ret = frescan_bwres_mode_change_local(req_data->net,
+                                              req_data->mode_change_type);
+        assert(ret == 0);
 }