]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_frescan/frescan_bwres_threads.c
bug corrected: when the contract negotiation failed in the master it didnt signal...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
index 9d7aefe06edbcca0235f50b88ec234982cfa1636..b72827f2480f158eb0ad02e1558241133da0135a 100644 (file)
  *
  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
  *
+ * @license
+ *
+ * -----------------------------------------------------------------------
+ *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
+ *
+ *    Universidad de Cantabria,              SPAIN
+ *    University of York,                    UK
+ *    Scuola Superiore Sant'Anna,            ITALY
+ *    Kaiserslautern University,             GERMANY
+ *    Univ. Politécnica  Valencia,           SPAIN
+ *    Czech Technical University in Prague,  CZECH REPUBLIC
+ *    ENEA                                   SWEDEN
+ *    Thales Communication S.A.              FRANCE
+ *    Visual Tools S.A.                      SPAIN
+ *    Rapita Systems Ltd                     UK
+ *    Evidence                               ITALY
+ *
+ *    See http://www.frescor.org for a link to partners' websites
+ *
+ *           FRESCOR project (FP6/2005/IST/5-034026) is funded
+ *        in part by the European Union Sixth Framework Programme
+ *        The European Union is not liable of any use that may be
+ *        made of this code.
+ *
+ *  This file is part of FRESCAN
+ *
+ *  FRESCAN is free software; you can  redistribute it and/or  modify
+ *  it under the terms of  the GNU General Public License as published by
+ *  the Free Software Foundation;  either  version 2, or (at  your option)
+ *  any later version.
+ *
+ *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
+ *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
+ *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
+ *  General Public License for more details.
+ *
+ *  You should have  received a  copy of  the  GNU  General Public License
+ *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
+ *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
+ *  02111-1307, USA.
+ *
+ * As a special exception, including FRESCAN header files in a file,
+ * instantiating FRESCAN generics or templates, or linking other files
+ * with FRESCAN objects to produce an executable application, does not
+ * by itself cause the resulting executable application to be covered
+ * by the GNU General Public License. This exception does not
+ * however invalidate any other reasons why the executable file might be
+ * covered by the GNU Public License.
+ * -----------------------------------------------------------------------
+ *
  */
 
 #include <assert.h>
@@ -19,6 +69,9 @@
 #include "frescan_bwres_threads.h"
 #include "frescan_bwres_messages.h"
 #include "frescan_bwres_requests.h"
+#include "frescan_bwres_robjs.h"
+#include "frescan_bwres_analysis.h"
+#include "frescan_bwres_mode_change.h"
 #include "frescan_config.h"
 #include "frescan_debug.h"
 #include "frescan_data.h"
 static void *frescan_manager_thread(void *arg);
 static void *frescan_acceptor_thread(void *arg);
 
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_gn_restore_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data);
+
 /**
  * frescan_manager_thread_create()
  *
@@ -40,39 +105,28 @@ int frescan_manager_thread_create(frescan_network_t net)
         fosa_thread_attr_t attr;
 
         ret = fosa_thread_attr_init(&attr);
-        if (ret != 0) {
-                ERROR("could not init thread attributes\n");
-                return ret;
-        }
-
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
-        if (ret != 0) {
-                ERROR("could not set neg thread prio %d\n",
-                      FRESCAN_NEG_THREAD_PRIO);
-                return ret;
-        }
+        if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
-                                  &attr,
-                                  frescan_master_neg_thread,
-                                  (void *)(uint32_t)net);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_NEG_THREAD_PRIO);
+        if (ret != 0) return ret;
 
-        if (ret != 0) {
-                ERROR("could not create the negotiator thread\n");
-                return ret;
-        }
+        ret = fosa_thread_create(&frescan_data[net].manager_thread_id,
+                                 &attr,
+                                 frescan_manager_thread,
+                                 (void *)(uint32_t)net);
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_destroy(&attr);
-        if (ret != 0) {
-                ERROR("could not destroy thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         return 0;
 }
 
 /**
  * frescan_acceptor_thread_create()
+ *
+ * This call creates the acceptor thread which will be waiting negotiation
+ * messages from the network and converting them into requests.
  */
 
 int frescan_acceptor_thread_create(frescan_network_t net)
@@ -81,33 +135,19 @@ int frescan_acceptor_thread_create(frescan_network_t net)
         fosa_thread_attr_t attr;
 
         ret = fosa_thread_attr_init(&attr);
-        if (ret != 0) {
-                ERROR("could not init thread attributes\n");
-                return ret;
-        }
-
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
-        if (ret != 0) {
-                ERROR("could not set acceptor thread prio %d\n",
-                      FRESCAN_ACCEPTOR_THREAD_PRIO);
-                return ret;
-        }
+        if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
-                                  &attr,
-                                  frescan_acceptor_thread,
-                                  (void *)(uint32_t)net);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_ACCEPTOR_PRIO);
+        if (ret != 0) return ret;
 
-        if (ret != 0) {
-                ERROR("could not create the negotiator thread\n");
-                return ret;
-        }
+        ret = fosa_thread_create(&frescan_data[net].acceptor_thread_id,
+                                 &attr,
+                                 frescan_acceptor_thread,
+                                 (void *)(uint32_t)net);
+        if (ret != 0) return ret;
 
         ret = fosa_thread_attr_destroy(&attr);
-        if (ret != 0) {
-                ERROR("could not destroy thread attributes\n");
-                return ret;
-        }
+        if (ret != 0) return ret;
 
         return 0;
 }
@@ -118,70 +158,48 @@ int frescan_acceptor_thread_create(frescan_network_t net)
 
 static void *frescan_manager_thread(void *arg)
 {
-        int err;
-        frescan_request_id_t request;
-        frescan_req_type_t   type;
-        frescan_robj_id_t    reply;
-        frescan_contract_t   *contract;
-        frescan_neg_return_info_t *neg_return_info;
-        frescan_server_params_t server_params;
+        int ret;
+        frescan_bwres_request_id_t   req;
+        frescan_bwres_request_data_t *req_data;
         frescan_network_t net = (uint32_t)arg;
 
-        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
-
-                err = frescan_requestqueue_dequeue(&request);
-                assert(err == 0);
-
-                err = frescan_request_get_type(request, &type);
-                assert(err == 0);
-
-                switch(type) {
-                        case FRESCAN_NEGOTIATE:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_NEGOTIATE request\n");
-                                err = frescan_request_get_contract(request, &contract);
-                                assert(err == 0);
-                                err = frescan_request_get_reply(request, &reply);
-                                assert(err == 0);
-                                err = frescan_request_get_return_info
-                                                (request, (void *)&neg_return_info);
-                                assert(err == 0);
-
-                        // TODO: sched test + add contract to table
-                        // so far always accepted witht he min values
-                                neg_return_info->error = 0;
-                                server_params.values = contract->min_values;
-                                server_params.prio   = contract->prio;
-
-                                err = frescan_servers_create
-                                                (net,
-                                                &server_params,
-                                                &neg_return_info->id);
-                                assert(err == 0);
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "wait for a request\n");
 
-                                err = frescan_replyobject_signal(reply);
-                                assert(err == 0);
-                                break;
+                ret = frescan_bwres_requests_dequeue(&req);
+                assert(ret == 0);
 
-                        case FRESCAN_RENEGOTIATE:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_RENEGOTIATE request\n");
-                                break;
+                ret = frescan_bwres_requests_get_data(req, &req_data);
+                assert(ret == 0);
 
-                        case FRESCAN_CANCEL:
-                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
-                                      "FRESCAN_CANCEL request\n");
+                switch(req_data->type) {
+                        case FRESCAN_BWRES_REQ_GN:
+                                frescan_manager_req_gn(req_data);
                                 break;
-
+                        case FRESCAN_BWRES_REP_GN:
+                                frescan_manager_rep_gn(req_data);
+                                break;
+                        case FRESCAN_BWRES_REQ_MC:
+                                frescan_manager_req_mc(req_data);
+                                break;
+                        case FRESCAN_BWRES_REQ_RES:
+                        case FRESCAN_BWRES_REQ_RES_GET:
+                        case FRESCAN_BWRES_REP_RES_GET:
+                        case FRESCAN_BWRES_REQ_RES_SET:
+                        case FRESCAN_BWRES_REQ_RES_COMMIT:
+                        case FRESCAN_BWRES_REQ_RES_CANCEL:
                         default:
-                                ERROR("wrong request type %d\n", type);
+                                FRESCAN_ERROR("request type not supported\n");
+                                assert(0);
                 }
-        }
 
-        return NULL;
+                if(req_data->request_node != frescan_data[net].local_node) {
+                        ret = frescan_bwres_requests_free(req);
+                        assert(ret == 0);
+                }
+        }
 }
 
 /**
@@ -191,35 +209,261 @@ static void *frescan_manager_thread(void *arg)
 static void *frescan_acceptor_thread(void *arg)
 {
         int ret;
-        frescan_recv_params_t params;
-        uint8_t msg[200];
-        size_t recv_bytes;
-        frescan_node_t from;
-        frescan_prio_t prio;
-
-        DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-              "master acceptor thread starts\n");
+        frescan_bwres_request_id_t req;
+        frescan_network_t net = (uint32_t)arg;
 
-        params.net = (frescan_network_t)(uint32_t)arg;
-        params.channel = FRESCAN_NEG_CHANNEL;
-        params.flags = FRESCAN_SYNC;
+        DEBUG(FRESCAN_BWRES_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-                      "waiting for msg, net:%u chan:%u flags:%u\n",
-                      params.net, params.channel, params.flags);
+                ret = frescan_messages_recv_request(net, &req);
+                assert(ret == 0);
 
-                ret = frescan_recv(&params, msg, 200,
-                                   &recv_bytes, &from, &prio);
+                ret = frescan_bwres_requests_enqueue(req);
                 assert(ret == 0);
+        }
 
-                DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-                      "msg received, from:%u size:%u prio:%u\n",
-                      from, recv_bytes, prio);
+        return NULL;
+}
+
+/**
+ * frescan_manager_req_gn
+ */
+
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data)
+{
+        int ret, i;
+        frescan_node_t me;
+        bool accepted;
+        frescan_bwres_sa_scenario_t *scenario;
+        frescan_ss_t ss;
+        frescan_server_params_t server_params;
+        frescan_bwres_vres_t *vres;
 
-                ret = frescan_message_parse(params.net, msg, recv_bytes, from);
+        me = frescan_data[req_data->net].local_node;
+
+        if (me != FRESCAN_BWRES_MASTER_NODE) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "send gn req to master\n");
+                ret = frescan_messages_send_request(req_data);
                 assert(ret == 0);
+                return;
         }
 
-        return NULL;
+        scenario = &frescan_data[req_data->net].scenario;
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "prepare new scenario\n");
+        frescan_manager_gn_prepare_scenario(scenario, req_data);
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "perform sched analysis\n");
+        ret = frescan_bwres_sa_sched_test(scenario, &accepted);
+        assert(ret == 0);
+
+        if (accepted) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "ACCEPTED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_ACCEPTED;
+
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "spare capacity and mode change\n");
+
+                ret = frescan_bwres_sa_spare_capacity(scenario);
+                assert(ret == 0);
+
+                ret = frescan_bwres_mode_change_protocol(req_data);
+                assert(ret == 0);
+        } else {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "FAILED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_NOT_ACCEPTED;
+                frescan_manager_gn_restore_scenario(scenario, req_data);
+        }
+
+        if (req_data->request_node != me) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "send reply\n");
+                req_data->type = FRESCAN_BWRES_REP_GN;
+
+                ret = frescan_messages_send_request(req_data);
+                assert(ret == 0);
+                return;
+        }
+
+        if (req_data->return_value == FRESCAN_BWRES_REQ_ACCEPTED) {
+                // create servers for new contracts
+                req_data->ss_new->size = req_data->contracts_to_neg->size;
+                for(i=0; i<req_data->ss_new->size; i++) {
+                        vres = &frescan_data[req_data->net].scenario.
+                                        vres_pool[me]
+                                                [req_data->ss_new->ss[i]];
+
+                        server_params.budget = frsh_rel_time_to_usec(
+                                        frsh_sa_time_to_rel_time(vres->old_c)) /
+                                        FRESCAN_FRAME_TX_TIME_US;
+
+                        server_params.period = frsh_sa_time_to_rel_time
+                                                                (vres->old_t);
+                        server_params.prio   = vres->old_p;
+
+                        // Create server
+                        ret = frescan_servers_create(req_data->net,
+                                                &server_params,
+                                                &ss);
+                        assert(ret == 0);
+                        assert (req_data->ss_new->ss[i] == ss);
+                }
+        }
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "signal robj\n");
+        ret = frescan_bwres_robjs_signal(req_data->robj);
+        assert(ret == 0);
+}
+
+/**
+ * frescan_manager_gn_prepare_scenario
+ */
+
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data)
+{
+        int ret, i;
+
+        // NEG-GROUP
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = freelist_alloc(&frescan_data[req_data->net].scenario.
+                                      ss_id_freelist[req_data->request_node]);
+                assert(ret >= 0);
+
+                req_data->ss_new->ss[i] = (frescan_ss_t)ret;
+
+                ret = frescan_bwres_sa_add_contract
+                                (scenario,
+                                 req_data->ss_new->ss[i],
+                                 req_data->request_node,
+                                 &req_data->contracts_to_neg->contracts[i]);
+                assert(ret == 0);
+        }
+
+        req_data->ss_new->size = req_data->contracts_to_neg->size;
+
+        // RENEG-GROUP
+        scenario->backup_contracts_to_reneg.size =
+                                        req_data->contracts_to_reneg->size;
+
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &req_data->contracts_to_reneg->contracts[i],
+                         &scenario->backup_contracts_to_reneg.contracts[i]);
+                assert(ret == 0);
+        }
+
+        // CANCEL-GROUP
+        scenario->backup_contracts_to_cancel.size =
+                                                req_data->ss_to_cancel->size;
+
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
+                assert(ret == 0);
+        }
+}
+
+/**
+ * frescan_manager_gn_restore_scenario
+ */
+
+static void frescan_manager_gn_restore_scenario
+                (frescan_bwres_sa_scenario_t  *scenario,
+                 frescan_bwres_request_data_t *req_data)
+{
+        int ret, i;
+
+        // NEG-GROUP
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
+                                (scenario,
+                                 req_data->ss_new->ss[i],
+                                 req_data->request_node,
+                                 NULL);
+                assert(ret == 0);
+
+                ret = freelist_free(&frescan_data[req_data->net].scenario.
+                                        ss_id_freelist[req_data->request_node],
+                                    req_data->ss_new->ss[i]);
+                assert(ret == 0);
+        }
+
+        // RENEG-GROUP
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_reneg.contracts[i],
+                         NULL);
+                assert(ret == 0);
+        }
+
+        // CANCEL-GROUP
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_add_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
+                assert(ret == 0);
+        }
+}
+
+/**
+ * frescan_manager_rep_gn
+ */
+
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data)
+{
+        int ret, i;
+        frescan_bwres_request_data_t *caller_req;
+
+        ret = frescan_bwres_requests_get_data(req_data->req, &caller_req);
+        assert(ret == 0);
+
+        caller_req->return_value = req_data->return_value;
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+              "ret:%d -> %s\n", req_data->return_value,
+              (req_data->return_value ==
+               FRESCAN_BWRES_REQ_ACCEPTED) ? "OK" : "FAIL");
+
+        if (req_data->return_value == FRESCAN_BWRES_REQ_ACCEPTED) {
+                assert (req_data->ss_new->size ==
+                        caller_req->contracts_to_neg->size);
+                caller_req->ss_new->size = req_data->ss_new->size;
+
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "ss_new->size:%u\n", caller_req->ss_new->size);
+
+                for (i=0; i<caller_req->ss_new->size; i++) {
+                        caller_req->ss_new->ss[i] = req_data->ss_new->ss[i];
+                }
+        }
+
+        ret = frescan_bwres_robjs_signal(caller_req->robj);
+        assert(ret == 0);
+}
+
+/**
+ * frescan_manager_req_mc
+ */
+
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data)
+{
+        int ret;
+
+        ret = frescan_bwres_mode_change_local(req_data->net,
+                                              req_data->mode_change_type,
+                                              req_data->ss_to_cancel);
+        assert(ret == 0);
 }