]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_frescan/frescan_bwres_threads.c
add group negotiations to frescan and change all the requests and messages to map...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
index 2f78a87ddc257711d9c56bff0abe430c5c6ed069..38bcc21f535b0fd77157962bb3df874a555057d2 100644 (file)
  *
  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
  *
+ * @license
+ *
+ * -----------------------------------------------------------------------
+ *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
+ *
+ *    Universidad de Cantabria,              SPAIN
+ *    University of York,                    UK
+ *    Scuola Superiore Sant'Anna,            ITALY
+ *    Kaiserslautern University,             GERMANY
+ *    Univ. Politécnica  Valencia,           SPAIN
+ *    Czech Technical University in Prague,  CZECH REPUBLIC
+ *    ENEA                                   SWEDEN
+ *    Thales Communication S.A.              FRANCE
+ *    Visual Tools S.A.                      SPAIN
+ *    Rapita Systems Ltd                     UK
+ *    Evidence                               ITALY
+ *
+ *    See http://www.frescor.org for a link to partners' websites
+ *
+ *           FRESCOR project (FP6/2005/IST/5-034026) is funded
+ *        in part by the European Union Sixth Framework Programme
+ *        The European Union is not liable of any use that may be
+ *        made of this code.
+ *
+ *  This file is part of FRESCAN
+ *
+ *  FRESCAN is free software; you can  redistribute it and/or  modify
+ *  it under the terms of  the GNU General Public License as published by
+ *  the Free Software Foundation;  either  version 2, or (at  your option)
+ *  any later version.
+ *
+ *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
+ *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
+ *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
+ *  General Public License for more details.
+ *
+ *  You should have  received a  copy of  the  GNU  General Public License
+ *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
+ *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
+ *  02111-1307, USA.
+ *
+ * As a special exception, including FRESCAN header files in a file,
+ * instantiating FRESCAN generics or templates, or linking other files
+ * with FRESCAN objects to produce an executable application, does not
+ * by itself cause the resulting executable application to be covered
+ * by the GNU General Public License. This exception does not
+ * however invalidate any other reasons why the executable file might be
+ * covered by the GNU Public License.
+ * -----------------------------------------------------------------------
+ *
  */
 
 #include <assert.h>
 #include "frescan_bwres_threads.h"
 #include "frescan_bwres_messages.h"
 #include "frescan_bwres_requests.h"
+#include "frescan_bwres_robjs.h"
+#include "frescan_bwres_analysis.h"
+#include "frescan_bwres_mode_change.h"
 #include "frescan_config.h"
 #include "frescan_debug.h"
 #include "frescan_data.h"
 #include "frescan_servers.h"
 
+static void *frescan_manager_thread(void *arg);
+static void *frescan_acceptor_thread(void *arg);
+
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_gn_restore_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data);
+
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data);
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data);
+
 /**
  * frescan_manager_thread_create()
  *
@@ -31,8 +99,6 @@
  * in a request queue for LOCAL or EXTERNAL requests.
  */
 
-static void *frescan_manager_thread(void *arg);
-
 int frescan_manager_thread_create(frescan_network_t net)
 {
         int ret;
@@ -41,10 +107,10 @@ int frescan_manager_thread_create(frescan_network_t net)
         ret = fosa_thread_attr_init(&attr);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_NEG_THREAD_PRIO);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].manager_thread_id,
+        ret = fosa_thread_create(&frescan_data[net].manager_thread_id,
                                  &attr,
                                  frescan_manager_thread,
                                  (void *)(uint32_t)net);
@@ -63,8 +129,6 @@ int frescan_manager_thread_create(frescan_network_t net)
  * messages from the network and converting them into requests.
  */
 
-static void *frescan_acceptor_thread(void *arg);
-
 int frescan_acceptor_thread_create(frescan_network_t net)
 {
         int ret;
@@ -73,10 +137,10 @@ int frescan_acceptor_thread_create(frescan_network_t net)
         ret = fosa_thread_attr_init(&attr);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_ACCEPTOR_PRIO);
         if (ret != 0) return ret;
 
-        ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
+        ret = fosa_thread_create(&frescan_data[net].acceptor_thread_id,
                                  &attr,
                                  frescan_acceptor_thread,
                                  (void *)(uint32_t)net);
@@ -92,53 +156,47 @@ int frescan_acceptor_thread_create(frescan_network_t net)
  * frescan_manager_thread
  */
 
-static int frescan_manager_neg(frescan_request_data_t *req_data);
-static int frescan_manager_reneg(frescan_request_data_t *req_data);
-static int frescan_manager_cancel(frescan_request_data_t *req_data);
-static int frescan_manager_repneg(frescan_request_data_t *req_data);
-
 static void *frescan_manager_thread(void *arg)
 {
         int ret;
-        frescan_request_id_t   req;
-        frescan_request_data_t *req_data;
+        frescan_bwres_request_id_t   req;
+        frescan_bwres_request_data_t *req_data;
         frescan_network_t net = (uint32_t)arg;
 
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "wait for a request\n");
 
-                ret = frescan_requests_dequeue(&req);
+                ret = frescan_bwres_requests_dequeue(&req);
                 assert(ret == 0);
 
-                ret = frescan_requests_get_data(req, &req_data);
+                ret = frescan_bwres_requests_get_data(req, &req_data);
                 assert(ret == 0);
 
                 switch(req_data->type) {
-                        case FRESCAN_REQ_NEG:
-                                ret = frescan_manager_neg(req_data);
-                                assert(ret == 0);
-                                break;
-                        case FRESCAN_REQ_RENEG:
-                                ret = frescan_manager_reneg(req_data);
-                                assert(ret == 0);
+                        case FRESCAN_BWRES_REQ_GN:
+                                frescan_manager_req_gn(req_data);
                                 break;
-                        case FRESCAN_REQ_CANCEL:
-                                ret = frescan_manager_cancel(req_data);
-                                assert(ret == 0);
+                        case FRESCAN_BWRES_REP_GN:
+                                frescan_manager_rep_gn(req_data);
                                 break;
-                        case FRESCAN_REP_NEG:
-                                ret = frescan_manager_repneg(req_data);
-                                assert(ret == 0);
+                        case FRESCAN_BWRES_REQ_MC:
+                                frescan_manager_req_mc(req_data);
                                 break;
+                        case FRESCAN_BWRES_REQ_RES:
+                        case FRESCAN_BWRES_REQ_RES_GET:
+                        case FRESCAN_BWRES_REP_RES_GET:
+                        case FRESCAN_BWRES_REQ_RES_SET:
+                        case FRESCAN_BWRES_REQ_RES_COMMIT:
+                        case FRESCAN_BWRES_REQ_RES_CANCEL:
                         default:
-                                ERROR("request type not supported\n");
+                                FRESCAN_ERROR("request type not supported\n");
                                 assert(0);
                 }
 
-                if(req_data->request_node != the_networks[net].local_node) {
-                        ret = frescan_requests_free(req);
+                if(req_data->request_node != frescan_data[net].local_node) {
+                        ret = frescan_bwres_requests_free(req);
                         assert(ret == 0);
                 }
         }
@@ -151,91 +209,217 @@ static void *frescan_manager_thread(void *arg)
 static void *frescan_acceptor_thread(void *arg)
 {
         int ret;
-        frescan_recv_params_t recv_params;
-        uint8_t msg[2000]; // TODO: use a constant with the max neg message size
-        size_t recv_bytes;
-        frescan_node_t from;
-        frescan_prio_t prio;
-        frescan_request_id_t req;
-        frescan_request_data_t *req_data;
-
-        DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG, "acceptor th starts\n");
+        frescan_bwres_request_id_t req;
+        frescan_network_t net = (uint32_t)arg;
 
-        recv_params.net     = (frescan_network_t)(uint32_t)arg;
-        recv_params.channel = FRESCAN_NEG_CHANNEL;
-        recv_params.flags   = FRESCAN_SYNC;
+        DEBUG(FRESCAN_BWRES_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
 
         while(1) {
-                DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-                      "waiting for a msg, net:%u chan:%u flags:%u\n",
-                      recv_params.net, recv_params.channel, recv_params.flags);
+                ret = frescan_messages_recv_request(net, &req);
+                assert(ret == 0);
 
-                ret = frescan_recv(&recv_params,
-                                   msg,
-                                   sizeof(msg),
-                                   &recv_bytes,
-                                   &from,
-                                   &prio);
+                ret = frescan_bwres_requests_enqueue(req);
                 assert(ret == 0);
+        }
 
-                DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
-                      "msg received, from:%u size:%u prio:%u\n",
-                      from, recv_bytes, prio);
+        return NULL;
+}
 
-                ret = frescan_requests_alloc(&req);
-                assert(ret == 0);
+/**
+ * frescan_manager_req_gn
+ */
 
-                ret = frescan_requests_get_data(req, &req_data);
-                assert(ret == 0);
+static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data)
+{
+        int ret;
+        frescan_node_t me;
+        bool accepted;
+        frescan_bwres_sa_scenario_t *scenario;
+
+        me = frescan_data[req_data->net].local_node;
 
-                ret = frescan_message_to_request(msg, req_data);
+        if (me != FRESCAN_BWRES_MASTER_NODE) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "send gn req to master\n");
+                ret = frescan_messages_send_request(req_data);
                 assert(ret == 0);
+                return;
+        }
+
+        scenario = &frescan_data[req_data->net].scenario;
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "prepare new scenario\n");
+        frescan_manager_gn_prepare_scenario(scenario, req_data);
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "perform sched analysis\n");
+        ret = frescan_bwres_sa_sched_test(scenario, &accepted);
+        assert(ret == 0);
 
-                req_data->request_node = from;
+        if (accepted) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "ACCEPTED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_ACCEPTED;
 
-                ret = frescan_requests_enqueue(req);
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+                      "spare capacity and mode change\n");
+
+                ret = frescan_bwres_sa_spare_capacity(scenario);
+                assert(ret == 0);
+
+                ret = frescan_bwres_mode_change_protocol(req_data);
                 assert(ret == 0);
+        } else {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "FAILED!\n");
+                req_data->return_value = FRESCAN_BWRES_REQ_NOT_ACCEPTED;
+                frescan_manager_gn_restore_scenario(scenario, req_data);
         }
 
-        return NULL;
+        if (req_data->request_node == me) {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "signal robj\n");
+                ret = frescan_bwres_robjs_signal(req_data->robj);
+                assert(ret == 0);
+        } else {
+                DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "send reply\n");
+                req_data->type = FRESCAN_BWRES_REP_GN;
+                ret = frescan_messages_send_request(req_data);
+                assert(ret == 0);
+        }
 }
 
 /**
- * frescan_manager_neg
+ * frescan_manager_gn_prepare_scenario
  */
 
-static int frescan_manager_neg(frescan_request_data_t *req_data)
+static void frescan_manager_gn_prepare_scenario
+                                (frescan_bwres_sa_scenario_t  *scenario,
+                                 frescan_bwres_request_data_t *req_data)
 {
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
-        return 0;
+        int ret, i;
+        frescan_server_params_t server_params;
+
+        // NEG-GROUP
+        server_params.budget         = 0;
+        server_params.period.tv_sec  = 0;
+        server_params.period.tv_nsec = 0;
+        server_params.prio           = 0;
+
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = frescan_servers_create(req_data->net,
+                                             &server_params,
+                                             &req_data->ss_new->ss[i]);
+                assert(ret == 0);
+
+                ret = frescan_bwres_sa_add_contract
+                                (scenario,
+                                 req_data->ss_new->ss[i],
+                                 req_data->request_node,
+                                 &req_data->contracts_to_neg->contracts[i]);
+                assert(ret == 0);
+        }
+
+        // RENEG-GROUP
+        scenario->backup_contracts_to_reneg.size =
+                                        req_data->contracts_to_reneg->size;
+
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &req_data->contracts_to_reneg->contracts[i],
+                         &scenario->backup_contracts_to_reneg.contracts[i]);
+                assert(ret == 0);
+        }
+
+        // CANCEL-GROUP
+        scenario->backup_contracts_to_cancel.size =
+                                                req_data->ss_to_cancel->size;
+
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
+                assert(ret == 0);
+        }
 }
 
 /**
- * frescan_manager_neg
+ * frescan_manager_gn_restore_scenario
  */
 
-static int frescan_manager_reneg(frescan_request_data_t *req_data)
+static void frescan_manager_gn_restore_scenario
+                (frescan_bwres_sa_scenario_t  *scenario,
+                 frescan_bwres_request_data_t *req_data)
 {
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
-        return 0;
+        int ret, i;
+
+        // NEG-GROUP
+        for(i=0; i<req_data->contracts_to_neg->size; i++) {
+                ret = frescan_bwres_sa_remove_contract
+                                (scenario,
+                                 req_data->ss_new->ss[i],
+                                 req_data->request_node,
+                                 NULL);
+                assert(ret == 0);
+
+                ret = frescan_servers_destroy(req_data->net,
+                                              req_data->ss_new->ss[i]);
+                assert(ret == 0);
+        }
+
+        // RENEG-GROUP
+        for(i=0; i<req_data->contracts_to_reneg->size; i++) {
+                ret = frescan_bwres_sa_update_contract
+                        (scenario,
+                         req_data->ss_to_reneg->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_reneg.contracts[i],
+                         NULL);
+                assert(ret == 0);
+        }
+
+        // CANCEL-GROUP
+        for(i=0; i<req_data->ss_to_cancel->size; i++) {
+                ret = frescan_bwres_sa_add_contract
+                        (scenario,
+                         req_data->ss_to_cancel->ss[i],
+                         req_data->request_node,
+                         &scenario->backup_contracts_to_cancel.contracts[i]);
+                assert(ret == 0);
+        }
 }
 
 /**
- * frescan_manager_neg
+ * frescan_manager_rep_gn
  */
 
-static int frescan_manager_cancel(frescan_request_data_t *req_data)
+static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data)
 {
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
-        return 0;
+        int ret;
+        frescan_bwres_request_data_t *orig_req_data;
+
+        ret = frescan_bwres_requests_get_data(req_data->req, &orig_req_data);
+        assert(ret == 0);
+
+        DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
+              "reply to gn, return value:%d\n", req_data->return_value);
+
+        orig_req_data->return_value = req_data->return_value;
+
+        ret = frescan_bwres_robjs_signal(orig_req_data->robj);
+        assert(ret == 0);
 }
 
 /**
- * frescan_manager_neg
+ * frescan_manager_req_mc
  */
 
-static int frescan_manager_repneg(frescan_request_data_t *req_data)
+static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data)
 {
-        DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
-        return 0;
+        int ret;
+
+        ret = frescan_bwres_mode_change_local(req_data->net,
+                                              req_data->mode_change_type);
+        assert(ret == 0);
 }