X-Git-Url: http://rtime.felk.cvut.cz/gitweb/frescor/fna.git/blobdiff_plain/ff14c3a38ad29e8617c11fc8f3868cc144eee318..134806f7924a468e3ebef661f39ae7bc50aa4ad3:/src_frescan/frescan_bwres_threads.c diff --git a/src_frescan/frescan_bwres_threads.c b/src_frescan/frescan_bwres_threads.c index fed6212..38bcc21 100644 --- a/src_frescan/frescan_bwres_threads.c +++ b/src_frescan/frescan_bwres_threads.c @@ -69,6 +69,7 @@ #include "frescan_bwres_threads.h" #include "frescan_bwres_messages.h" #include "frescan_bwres_requests.h" +#include "frescan_bwres_robjs.h" #include "frescan_bwres_analysis.h" #include "frescan_bwres_mode_change.h" #include "frescan_config.h" @@ -76,6 +77,21 @@ #include "frescan_data.h" #include "frescan_servers.h" +static void *frescan_manager_thread(void *arg); +static void *frescan_acceptor_thread(void *arg); + +static void frescan_manager_gn_prepare_scenario + (frescan_bwres_sa_scenario_t *scenario, + frescan_bwres_request_data_t *req_data); + +static void frescan_manager_gn_restore_scenario + (frescan_bwres_sa_scenario_t *scenario, + frescan_bwres_request_data_t *req_data); + +static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data); +static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data); +static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data); + /** * frescan_manager_thread_create() * @@ -83,8 +99,6 @@ * in a request queue for LOCAL or EXTERNAL requests. */ -static void *frescan_manager_thread(void *arg); - int frescan_manager_thread_create(frescan_network_t net) { int ret; @@ -93,10 +107,10 @@ int frescan_manager_thread_create(frescan_network_t net) ret = fosa_thread_attr_init(&attr); if (ret != 0) return ret; - ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO); + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_NEG_THREAD_PRIO); if (ret != 0) return ret; - ret = fosa_thread_create(&the_networks[net].manager_thread_id, + ret = fosa_thread_create(&frescan_data[net].manager_thread_id, &attr, frescan_manager_thread, (void *)(uint32_t)net); @@ -115,8 +129,6 @@ int frescan_manager_thread_create(frescan_network_t net) * messages from the network and converting them into requests. */ -static void *frescan_acceptor_thread(void *arg); - int frescan_acceptor_thread_create(frescan_network_t net) { int ret; @@ -125,10 +137,10 @@ int frescan_acceptor_thread_create(frescan_network_t net) ret = fosa_thread_attr_init(&attr); if (ret != 0) return ret; - ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO); + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_ACCEPTOR_PRIO); if (ret != 0) return ret; - ret = fosa_thread_create(&the_networks[net].acceptor_thread_id, + ret = fosa_thread_create(&frescan_data[net].acceptor_thread_id, &attr, frescan_acceptor_thread, (void *)(uint32_t)net); @@ -144,54 +156,47 @@ int frescan_acceptor_thread_create(frescan_network_t net) * frescan_manager_thread */ -static void frescan_manager_neg(frescan_request_data_t *req_data); -static void frescan_manager_reneg(frescan_request_data_t *req_data); -static void frescan_manager_cancel(frescan_request_data_t *req_data); -static void frescan_manager_repneg(frescan_request_data_t *req_data); -static void frescan_manager_budget_change(frescan_request_data_t *req_data); - static void *frescan_manager_thread(void *arg) { int ret; - frescan_request_id_t req; - frescan_request_data_t *req_data; + frescan_bwres_request_id_t req; + frescan_bwres_request_data_t *req_data; frescan_network_t net = (uint32_t)arg; - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n"); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "manager thread starts\n"); while(1) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n"); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "wait for a request\n"); - ret = frescan_requests_dequeue(&req); + ret = frescan_bwres_requests_dequeue(&req); assert(ret == 0); - ret = frescan_requests_get_data(req, &req_data); + ret = frescan_bwres_requests_get_data(req, &req_data); assert(ret == 0); switch(req_data->type) { - case FRESCAN_REQ_NEG: - frescan_manager_neg(req_data); - break; - case FRESCAN_REQ_RENEG: - frescan_manager_reneg(req_data); - break; - case FRESCAN_REQ_CANCEL: - frescan_manager_cancel(req_data); + case FRESCAN_BWRES_REQ_GN: + frescan_manager_req_gn(req_data); break; - case FRESCAN_REP_DEC_BUDGET: - case FRESCAN_REP_INC_BUDGET: - frescan_manager_budget_change(req_data); + case FRESCAN_BWRES_REP_GN: + frescan_manager_rep_gn(req_data); break; - case FRESCAN_REP_NEG: - frescan_manager_repneg(req_data); + case FRESCAN_BWRES_REQ_MC: + frescan_manager_req_mc(req_data); break; + case FRESCAN_BWRES_REQ_RES: + case FRESCAN_BWRES_REQ_RES_GET: + case FRESCAN_BWRES_REP_RES_GET: + case FRESCAN_BWRES_REQ_RES_SET: + case FRESCAN_BWRES_REQ_RES_COMMIT: + case FRESCAN_BWRES_REQ_RES_CANCEL: default: FRESCAN_ERROR("request type not supported\n"); assert(0); } - if(req_data->request_node != the_networks[net].local_node) { - ret = frescan_requests_free(req); // acceptor request + if(req_data->request_node != frescan_data[net].local_node) { + ret = frescan_bwres_requests_free(req); assert(ret == 0); } } @@ -204,16 +209,16 @@ static void *frescan_manager_thread(void *arg) static void *frescan_acceptor_thread(void *arg) { int ret; - frescan_request_id_t req; + frescan_bwres_request_id_t req; frescan_network_t net = (uint32_t)arg; - DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n"); + DEBUG(FRESCAN_BWRES_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n"); while(1) { ret = frescan_messages_recv_request(net, &req); assert(ret == 0); - ret = frescan_requests_enqueue(req); + ret = frescan_bwres_requests_enqueue(req); assert(ret == 0); } @@ -221,256 +226,200 @@ static void *frescan_acceptor_thread(void *arg) } /** - * frescan_manager_neg + * frescan_manager_req_gn */ -static void frescan_manager_neg(frescan_request_data_t *req_data) +static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data) { int ret; + frescan_node_t me; bool accepted; - frescan_sa_scenario_t *scenario; - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n"); - - scenario = &the_networks[req_data->net].scenario; + frescan_bwres_sa_scenario_t *scenario; - if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "add contract to scenario\n"); + me = frescan_data[req_data->net].local_node; - ret = frescan_sa_add_contract - (scenario, - req_data->ss, - req_data->request_node, - req_data->contract); + if (me != FRESCAN_BWRES_MASTER_NODE) { + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, + "send gn req to master\n"); + ret = frescan_messages_send_request(req_data); assert(ret == 0); + return; + } - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "perform sched analysis\n"); - - ret = frescan_sa_sched_test(scenario, &accepted); - assert(ret == 0); + scenario = &frescan_data[req_data->net].scenario; - if (accepted) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "schedulable! distribute spare capacity\n"); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "prepare new scenario\n"); + frescan_manager_gn_prepare_scenario(scenario, req_data); - ret = frescan_sa_spare_capacity(scenario); - assert(ret == 0); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "perform sched analysis\n"); + ret = frescan_bwres_sa_sched_test(scenario, &accepted); + assert(ret == 0); - req_data->return_value = FRESCAN_REQ_ACCEPTED; + if (accepted) { + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "ACCEPTED!\n"); + req_data->return_value = FRESCAN_BWRES_REQ_ACCEPTED; - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "perform the mode change protocol!\n"); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, + "spare capacity and mode change\n"); - ret = frescan_bwres_mode_change_protocol(req_data); - assert(ret == 0); + ret = frescan_bwres_sa_spare_capacity(scenario); + assert(ret == 0); - if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "signal local request\n"); - ret = frescan_bwres_robjs_signal - (req_data->robj); - assert(ret == 0); - } - } else { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "not schedulable!\n"); - - ret = frescan_sa_remove_contract - (scenario, - req_data->ss, - req_data->request_node); - assert(ret == 0); + ret = frescan_bwres_mode_change_protocol(req_data); + assert(ret == 0); + } else { + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "FAILED!\n"); + req_data->return_value = FRESCAN_BWRES_REQ_NOT_ACCEPTED; + frescan_manager_gn_restore_scenario(scenario, req_data); + } - req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED; - - if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "signal local request\n"); - ret = frescan_bwres_robjs_signal(req_data->robj); - assert(ret == 0); - } else { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "sending reply\n"); - req_data->type = FRESCAN_REP_NEG; - ret = frescan_messages_send_request(req_data); - assert(ret == 0); - } - } + if (req_data->request_node == me) { + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "signal robj\n"); + ret = frescan_bwres_robjs_signal(req_data->robj); + assert(ret == 0); } else { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "send negotiation request to master\n"); + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "send reply\n"); + req_data->type = FRESCAN_BWRES_REP_GN; ret = frescan_messages_send_request(req_data); assert(ret == 0); } } /** - * frescan_manager_reneg + * frescan_manager_gn_prepare_scenario */ -static void frescan_manager_reneg(frescan_request_data_t *req_data) +static void frescan_manager_gn_prepare_scenario + (frescan_bwres_sa_scenario_t *scenario, + frescan_bwres_request_data_t *req_data) { - int ret; - bool is_schedulable; - frsh_contract_t old_contract; - frescan_sa_scenario_t *scenario; - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n"); - - scenario = &the_networks[req_data->net].scenario; + int ret, i; + frescan_server_params_t server_params; + + // NEG-GROUP + server_params.budget = 0; + server_params.period.tv_sec = 0; + server_params.period.tv_nsec = 0; + server_params.prio = 0; + + for(i=0; icontracts_to_neg->size; i++) { + ret = frescan_servers_create(req_data->net, + &server_params, + &req_data->ss_new->ss[i]); + assert(ret == 0); - if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) { - // scheduling analysis - ret = frescan_sa_update_contract + ret = frescan_bwres_sa_add_contract (scenario, - req_data->ss, + req_data->ss_new->ss[i], req_data->request_node, - req_data->contract, - &old_contract); + &req_data->contracts_to_neg->contracts[i]); assert(ret == 0); + } - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "perform sched analysis\n"); - - ret = frescan_sa_sched_test(scenario, &is_schedulable); + // RENEG-GROUP + scenario->backup_contracts_to_reneg.size = + req_data->contracts_to_reneg->size; + + for(i=0; icontracts_to_reneg->size; i++) { + ret = frescan_bwres_sa_update_contract + (scenario, + req_data->ss_to_reneg->ss[i], + req_data->request_node, + &req_data->contracts_to_reneg->contracts[i], + &scenario->backup_contracts_to_reneg.contracts[i]); assert(ret == 0); + } - if (is_schedulable) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "schedulable! distribute spare capacity\n"); - - ret = frescan_sa_spare_capacity(scenario); - assert(ret == 0); - - req_data->return_value = FRESCAN_REQ_ACCEPTED; - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "perform the mode change protocol!\n"); + // CANCEL-GROUP + scenario->backup_contracts_to_cancel.size = + req_data->ss_to_cancel->size; - ret = frescan_bwres_mode_change_protocol(req_data); - assert(ret == 0); - } else { - ret = frescan_sa_update_contract - (scenario, - req_data->ss, - req_data->request_node, - &old_contract, - NULL); - assert(ret == 0); - req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED; - } - - // signal or reply the results - if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n"); - ret = frescan_bwres_robjs_signal(req_data->robj); - assert(ret == 0); - } - } else { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "send renegotiation request to master\n"); - ret = frescan_messages_send_request(req_data); + for(i=0; iss_to_cancel->size; i++) { + ret = frescan_bwres_sa_remove_contract + (scenario, + req_data->ss_to_cancel->ss[i], + req_data->request_node, + &scenario->backup_contracts_to_cancel.contracts[i]); assert(ret == 0); } } /** - * frescan_manager_cancel + * frescan_manager_gn_restore_scenario */ -static void frescan_manager_cancel(frescan_request_data_t *req_data) +static void frescan_manager_gn_restore_scenario + (frescan_bwres_sa_scenario_t *scenario, + frescan_bwres_request_data_t *req_data) { - int ret; - bool is_schedulable; - frescan_sa_scenario_t *scenario; - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n"); + int ret, i; - scenario = &the_networks[req_data->net].scenario; - - if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) { - ret = frescan_sa_remove_contract + // NEG-GROUP + for(i=0; icontracts_to_neg->size; i++) { + ret = frescan_bwres_sa_remove_contract (scenario, - req_data->ss, - req_data->request_node); - assert(ret == 0); - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "assign priorities\n"); - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "perform sched analysis\n"); - - ret = frescan_sa_sched_test(scenario, &is_schedulable); + req_data->ss_new->ss[i], + req_data->request_node, + NULL); assert(ret == 0); - assert(is_schedulable == true); - - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "redistribute spare capacity\n"); - - ret = frescan_sa_spare_capacity(scenario); + ret = frescan_servers_destroy(req_data->net, + req_data->ss_new->ss[i]); assert(ret == 0); + } - ret = frescan_bwres_mode_change_protocol(req_data); - assert(ret == 0); - } else { - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, - "send cancel request to master\n"); - ret = frescan_messages_send_request(req_data); + // RENEG-GROUP + for(i=0; icontracts_to_reneg->size; i++) { + ret = frescan_bwres_sa_update_contract + (scenario, + req_data->ss_to_reneg->ss[i], + req_data->request_node, + &scenario->backup_contracts_to_reneg.contracts[i], + NULL); assert(ret == 0); } - if (req_data->request_node == the_networks[req_data->net].local_node) { - ret = frescan_bwres_robjs_signal(req_data->robj); + // CANCEL-GROUP + for(i=0; iss_to_cancel->size; i++) { + ret = frescan_bwres_sa_add_contract + (scenario, + req_data->ss_to_cancel->ss[i], + req_data->request_node, + &scenario->backup_contracts_to_cancel.contracts[i]); assert(ret == 0); } } /** - * frescan_manager_budget_change + * frescan_manager_rep_gn */ -static void frescan_manager_budget_change(frescan_request_data_t *req_data) +static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data) { int ret; - frescan_sa_mode_change_type_t mode_change_type; - frescan_node_t me = the_networks[req_data->net].local_node; + frescan_bwres_request_data_t *orig_req_data; - DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "budget change request\n"); + ret = frescan_bwres_requests_get_data(req_data->req, &orig_req_data); + assert(ret == 0); - if (req_data->type == FRESCAN_REP_INC_BUDGET) { - mode_change_type = FRESCAN_SA_BUDGET_INC; - } else { - mode_change_type = FRESCAN_SA_BUDGET_DEC; - } + DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, + "reply to gn, return value:%d\n", req_data->return_value); + + orig_req_data->return_value = req_data->return_value; - ret = frescan_bwres_budget_change(req_data, - me, - mode_change_type); + ret = frescan_bwres_robjs_signal(orig_req_data->robj); assert(ret == 0); } /** - * frescan_manager_repneg + * frescan_manager_req_mc */ -static void frescan_manager_repneg(frescan_request_data_t *req_data) +static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data) { -// int ret; -// frescan_request_data_t *neg_req_data; -// -// DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n"); -// -// ret = frescan_requests_get_data(req_data->req, &neg_req_data); -// assert(ret == 0); -// -// neg_req_data->return_value = req_data->return_value; -// neg_req_data->final_values = req_data->final_values; -// -// ret = frescan_bwres_robjs_signal(neg_req_data->robj); -// assert(ret == 0); + int ret; + ret = frescan_bwres_mode_change_local(req_data->net, + req_data->mode_change_type); + assert(ret == 0); }