]> rtime.felk.cvut.cz Git - frescor/fna.git/commitdiff
negotiator thread and bwres layer
authorsangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Tue, 1 Apr 2008 16:09:15 +0000 (16:09 +0000)
committersangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Tue, 1 Apr 2008 16:09:15 +0000 (16:09 +0000)
git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1060 35b4ef3e-fd22-0410-ab77-dab3279adceb

src_frescan/frescan.c
src_frescan/frescan_bandwidth_reservation.c [new file with mode: 0644]
src_frescan/frescan_bandwidth_reservation.h [new file with mode: 0644]
src_frescan/frescan_config.h
src_frescan/frescan_data.h
src_frescan/frescan_debug.h
src_frescan/frescan_negotiator_thread.c [new file with mode: 0644]
src_frescan/frescan_negotiator_thread.h [new file with mode: 0644]
src_frescan/frescan_requests_queue.c
src_frescan/frescan_requests_queue.h
src_frescan/frescan_servers_replenishments.c

index 30fbc02b3e724a6f8d092fbd6d0584f28c382aef..8f1b9c485bf23e34f7a1618191182f47f2deeec9 100644 (file)
@@ -34,6 +34,7 @@
 #include "frescan_debug.h"     // DEBUG
 #include "frescan_id.h"        // frescan_id_set_field, frescan_id_get_field
 #include "frescan_hw_buffer.h" // frescan_hw_buffer_update
+#include "frescan_reply_objects.h"     // frescan_replyobjects_init
 #include "frescan_servers_replenishments.h" // frescan_replenishments_xxx
 
 static int frescan_hook_frame_recv (const struct can_chip_t *chip,
@@ -146,6 +147,12 @@ int frescan_init(frescan_init_params_t *params)
                 return -1;
         }
 
+        ret = frescan_replyobjects_init(FRESCAN_REPLY_OBJECTS_MX_CEILING);
+        if (ret != 0) {
+                ERROR("could not initialize the reply objects\n");
+                return -1;
+        }
+
         return 0;
 }
 
diff --git a/src_frescan/frescan_bandwidth_reservation.c b/src_frescan/frescan_bandwidth_reservation.c
new file mode 100644 (file)
index 0000000..82166eb
--- /dev/null
@@ -0,0 +1,137 @@
+/*!
+ * @file frescan_bandwidth_reservation.c
+ *
+ * @brief FRESCAN bandwidth reservation layer
+ *
+ * This module contains function to negotiate contracts and get the
+ * corresponding frescan sporadic servers.
+ *
+ * @version 0.01
+ *
+ * @date 1-Apr-2008
+ *
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#include "frescan_bandwidth_reservation.h"
+#include "frescan_data.h"
+#include "frescan_requests_queue.h"    // frescan_requests_init
+#include "frescan_negotiator_thread.h" // frescan_negotiator_thread_create
+#include "frescan_debug.h"
+#include "frescan_config.h"
+
+/**
+ * frescan_bwres_init()
+ *
+ * Init the frescan bandwidth reservation layer
+ */
+
+int frescan_bwres_init(frescan_network_t net)
+{
+        int ret;
+
+        if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) {
+                DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am master node\n");
+                ret = frescan_requests_init(FRESCAN_REQUESTS_MX_CEILING);
+                if (ret != 0) {
+                        ERROR("could not initialize the requests\n");
+                        return ret;
+                }
+
+                ret = frescan_negotiator_thread_create(net);
+                if (ret != 0) {
+                        ERROR("could not initialize the negotiator thread\n");
+                        return ret;
+                }
+        } else {
+                DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am a slave node\n");
+        }
+
+        return 0;
+}
+
+/**
+ * frescan_bwres_negotiate()
+ *
+ * negotiate a contract. For that we allocate a reply object and then
+ * we enqueue our request in the master's requests queue (which can be
+ * local or require a network message)
+ */
+
+int frescan_bwres_negotiate(frescan_network_t net,
+                            const frescan_contract_t *contract,
+                            frescan_ss_t *id)
+{
+        int ret;
+        frescan_robj_id_t reply;
+        frescan_request_id_t request;
+
+        ret = frescan_replyobject_alloc(&reply, FRESCAN_BWRES_MX_PRIO);
+        if (ret != 0) {
+                ERROR("could not allocate reply object\n");
+                return ret;
+        }
+
+        if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) {
+                ret = frescan_request_alloc(&request);
+                if (ret != 0) {
+                        ERROR("could not allocate request\n");
+                        return ret;
+                }
+
+                DEBUG(FRESCAN_BWRES_ENABLE_DEBUG,
+                      "set FRESCAN_NEGOTIATE type: %d\n", FRESCAN_NEGOTIATE);
+                ret = frescan_request_set_type(request, FRESCAN_NEGOTIATE);
+                if (ret != 0) {
+                        ERROR("could not set type\n");
+                        return ret;
+                }
+
+                ret = frescan_request_set_reply(request, reply);
+                if (ret != 0) {
+                        ERROR("could not set reply\n");
+                        return ret;
+                }
+
+                ret = frescan_request_set_contract(request, contract);
+                if (ret != 0) {
+                        ERROR("could not set contract\n");
+                        return ret;
+                }
+
+                ret = frescan_request_set_src(request, FRESCAN_NEG_MASTER_NODE);
+                if (ret != 0) {
+                        ERROR("could not set src\n");
+                        return ret;
+                }
+
+                ret = frescan_requestqueue_enqueue(request);
+                if (ret != 0) {
+                        ERROR("could not enqueue the request\n");
+                        return ret;
+                }
+        } else {
+                DEBUG(true, "send the request to the master.. not done\n");
+        }
+
+        ret = frescan_replyobject_wait(reply);
+        if (ret != 0) {
+                ERROR("error while waiting on the reply object\n");
+                return ret;
+        }
+
+        ret = frescan_replyobject_free(reply);
+        if (ret != 0) {
+                ERROR("could not free reply object\n");
+                return ret;
+        }
+
+        ret = frescan_request_free(request);
+        if (ret != 0) {
+                ERROR("could not free request\n");
+                return ret;
+        }
+
+        return 0;
+}
diff --git a/src_frescan/frescan_bandwidth_reservation.h b/src_frescan/frescan_bandwidth_reservation.h
new file mode 100644 (file)
index 0000000..b40cc01
--- /dev/null
@@ -0,0 +1,31 @@
+/*!
+ * @file frescan_bandwidth_reservation.h
+ *
+ * @brief FRESCAN bandwidth reservation layer
+ *
+ * This module contains function to negotiate contracts and get the
+ * corresponding frescan sporadic servers.
+ *
+ * @version 0.01
+ *
+ * @date 1-Apr-2008
+ *
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#ifndef _FRESCAN_BANDWIDTH_RESERVATION_H_
+#define _FRESCAN_BANDWIDTH_RESERVATION_H_
+
+#include "frescan.h"
+#include "frescan_data.h"
+
+extern int frescan_bwres_init(frescan_network_t net);
+
+extern int frescan_bwres_negotiate(frescan_network_t net,
+                                   const frescan_contract_t *contract,
+                                   frescan_ss_t *id);
+
+// TODO: add other functions: renegotiate, cancel...
+
+#endif // _FRESCAN_BANDWIDTH_RESERVATION_H_
index 088322062be3d8f47b83e2d78b7c8d8354fb24dc..1da0e44bb5f8b6b92892aaf68e2ff404629646b7 100644 (file)
 #define FRESCAN_BACKGROUND_PRIO   0
 #define FRESCAN_MX_REPLY_OBJECTS  40
 #define FRESCAN_REPL_THREAD_PRIO  60
+#define FRESCAN_NEG_THREAD_PRIO   50
 #define FRESCAN_MX_REQUESTS       40
+#define FRESCAN_NEG_MASTER_NODE   0
+#define FRESCAN_REPLY_OBJECTS_MX_CEILING 90
+#define FRESCAN_REQUESTS_MX_CEILING 90
+#define FRESCAN_BWRES_MX_PRIO     60
 
 #define FRESCAN_MLOCK_T            unsigned
 #define FRESCAN_CREATE_LOCK(l)
index 079a4e26e9e41f6fc9c3aec4a6f46736d1155ce5..420a626335fec4c2bab73ad2aaa9fc72056105b2 100644 (file)
@@ -179,6 +179,7 @@ typedef struct {
         frescan_node_t local_node;
         int fd;
         fosa_thread_id_t repl_thread_id;
+        fosa_thread_id_t neg_thread_id;
         frescan_queues_t queues;
         frescan_packet_t *last_packet;
         frescan_prio_t last_packet_prio;
index 7048c893e598ef84f552d7298db4671419fbe979..740898d19c31b0a7b68c9c56289580444024d920 100644 (file)
@@ -51,6 +51,9 @@
 #define FRESCAN_QUEUES_ENABLE_DEBUG false
 #define FRESCAN_HW_BUFFER_ENABLE_DEBUG false
 #define FRESCAN_REPL_ENABLE_DEBUG false
-#define FRESCAN_REPLYOBJ_ENABLE_DEBUG true
+#define FRESCAN_REPLYOBJ_ENABLE_DEBUG false
+#define FRESCAN_NEG_THREAD_ENABLE_DEBUG true
+#define FRESCAN_BWRES_ENABLE_DEBUG true
+#define FRESCAN_REQUESTS_ENABLE_DEBUG true
 
 #endif // _MARTE_FRESCAN_DEBUG_H_
diff --git a/src_frescan/frescan_negotiator_thread.c b/src_frescan/frescan_negotiator_thread.c
new file mode 100644 (file)
index 0000000..e260293
--- /dev/null
@@ -0,0 +1,115 @@
+/*!
+ * @file frescan_negotiator_thread.c
+ *
+ * @brief FRESCAN negotiator thread
+ *
+ * This module contains the negotiator thread, with an operation to create it.
+ *
+ * @version 0.01
+ *
+ * @date 1-Apr-2008
+ *
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#include <assert.h>
+#include "frescan_negotiator_thread.h"
+#include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
+#include "frescan_config.h"
+#include "frescan_debug.h"
+#include "frescan_data.h"
+#include "frescan_requests_queue.h"
+
+static void *frescan_neg_thread(void *arg);
+
+/**
+ * frescan_negotiator_thread_create()
+ */
+
+int frescan_negotiator_thread_create(frescan_network_t net)
+{
+        int ret;
+        fosa_thread_attr_t attr;
+
+        ret = fosa_thread_attr_init(&attr);
+        if (ret != 0) {
+                ERROR("could not init thread attributes\n");
+                return ret;
+        }
+
+        ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
+        if (ret != 0) {
+                ERROR("could not set neg thread prio %d\n",
+                      FRESCAN_NEG_THREAD_PRIO);
+                return ret;
+        }
+
+        ret = fosa_thread_create(&the_networks[net].neg_thread_id,
+                                 &attr,
+                                 frescan_neg_thread,
+                                 (void *)(uint32_t)net);
+
+        if (ret != 0) {
+                ERROR("could not create the negotiator thread\n");
+                return ret;
+        }
+
+        ret = fosa_thread_attr_destroy(&attr);
+        if (ret != 0) {
+                ERROR("could not destroy thread attributes\n");
+                return ret;
+        }
+
+        return 0;
+}
+
+/**
+ * frescan_neg_thread - the thread that negotiates contracts
+ */
+
+static void *frescan_neg_thread(void *arg)
+{
+        int err;
+        frescan_request_id_t request;
+        frescan_req_type_t   type;
+        frescan_robj_id_t    reply;
+        frescan_contract_t   *contract;
+
+        DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+
+        while(1) {
+                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
+
+                err = frescan_requestqueue_dequeue(&request);
+                assert(err == 0);
+
+                err = frescan_request_get_type(request, &type);
+                assert(err == 0);
+
+                switch(type) {
+                        case FRESCAN_NEGOTIATE:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_NEGOTIATE request\n");
+                                err = frescan_request_get_contract(request, &contract);
+                                assert(err == 0);
+                                err = frescan_request_get_reply(request, &reply);
+                                assert(err == 0);
+                                err = frescan_replyobject_signal(reply);
+                                assert(err == 0);
+                                break;
+                        case FRESCAN_RENEGOTIATE:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_RENEGOTIATE request\n");
+                                break;
+                        case FRESCAN_CANCEL:
+                                DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
+                                      "FRESCAN_CANCEL request\n");
+                                break;
+                        default:
+                                ERROR("wrong request type %d\n", type);
+                }
+        }
+
+        return NULL;
+}
diff --git a/src_frescan/frescan_negotiator_thread.h b/src_frescan/frescan_negotiator_thread.h
new file mode 100644 (file)
index 0000000..875fe48
--- /dev/null
@@ -0,0 +1,23 @@
+/*!
+ * @file frescan_negotiator_thread.h
+ *
+ * @brief FRESCAN negotiator thread
+ *
+ * This module contains the negotiator thread, with an operation to create it.
+ *
+ * @version 0.01
+ *
+ * @date 1-Apr-2008
+ *
+ * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
+ *
+ */
+
+#ifndef _FRESCAN_NEGOTIATOR_THREAD_H_
+#define _FRESCAN_NEGOTIATOR_THREAD_H_
+
+#include "frescan.h"
+
+extern int frescan_negotiator_thread_create(frescan_network_t net);
+
+#endif // _FRESCAN_NEGOTIATOR_THREAD_H_
index 2c6f81bb5b46a6f2425a8dd83e031949fb9e78af..baa2db3eb0057c268afc21149bc3dda8fde245d6 100644 (file)
@@ -20,6 +20,7 @@
 #include "frescan.h"
 #include "frescan_requests_queue.h"
 #include "frescan_config.h"
+#include "frescan_debug.h"
 #include "fosa_mutexes_and_condvars.h"
 
 static bool is_initialized = false;
@@ -141,6 +142,7 @@ locked_error:
 
 int frescan_request_set_type(frescan_request_id_t id, frescan_req_type_t type)
 {
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, type);
         the_requests_pool[id].type = type;
         return 0;
 }
@@ -162,9 +164,9 @@ int frescan_request_set_reply(frescan_request_id_t id, frescan_robj_id_t reply)
  **/
 
 int frescan_request_set_contract(frescan_request_id_t id,
-                                 frescan_contract_t *contract)
+                                 const frescan_contract_t *contract)
 {
-        the_requests_pool[id].contract = contract;
+        the_requests_pool[id].contract = (frescan_contract_t *)contract;
         return 0;
 }
 
@@ -187,6 +189,7 @@ int frescan_request_set_src(frescan_request_id_t id, frescan_node_t src)
 int frescan_request_get_type(frescan_request_id_t id, frescan_req_type_t *type)
 {
         *type = the_requests_pool[id].type;
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, *type);
         return 0;
 }
 
@@ -236,8 +239,16 @@ int frescan_requestqueue_enqueue(frescan_request_id_t id)
         err = fosa_mutex_lock(&requests_mutex);
         if (err != 0) return err;
 
-        list_add_tail(&the_requests_list.request_list,
-                      &the_requests_pool[id].request_list);
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG,
+              "is list empty A? %d\n",
+              list_empty(&the_requests_list.request_list));
+
+        list_add_tail(&the_requests_pool[id].request_list,
+                      &the_requests_list.request_list);
+
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG,
+              "is list empty B? %d\n",
+              list_empty(&the_requests_list.request_list));
 
         err = fosa_cond_signal(&requests_cond);
         if (err != 0) goto locked_error;
@@ -266,12 +277,17 @@ int frescan_requestqueue_dequeue(frescan_request_id_t *id)
         err = fosa_mutex_lock(&requests_mutex);
         if (err != 0) return err;
 
-        while (list_empty(&the_requests_pool[*id].request_list)) {
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "entering\n");
+
+        while (list_empty(&the_requests_list.request_list)) {
                 err = fosa_cond_wait(&requests_cond, &requests_mutex);
                 if (err != 0) goto locked_error;
+                DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "received signal\n");
         }
 
-        list_for_each(pos, &the_requests_pool[*id].request_list) {
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "dequeueing a request\n");
+
+        list_for_each(pos, &the_requests_list.request_list) {
                 request = list_entry(pos, struct request_t, request_list);
                 break;
         }
@@ -280,6 +296,10 @@ int frescan_requestqueue_dequeue(frescan_request_id_t *id)
 
         *id = request->pool_pos;
 
+        DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG,
+              "is list empty now? %d\n",
+              list_empty(&the_requests_list.request_list));
+
         err = fosa_mutex_unlock(&requests_mutex);
         if (err != 0) return err;
 
index 3ae1108927e7586ee60552cec0e5294208ffcb5d..d0d08442f83a491510fdf2d16232f7d347cdafe4 100644 (file)
@@ -42,7 +42,7 @@ extern int frescan_request_set_reply(frescan_request_id_t id,
                                      frescan_robj_id_t reply);
 
 extern int frescan_request_set_contract(frescan_request_id_t id,
-                                        frescan_contract_t *contract);
+                                        const frescan_contract_t *contract);
 
 extern int frescan_request_set_src(frescan_request_id_t id,
                                    frescan_node_t src);
index 78b34f4b845c9e72cda3a82d0e15f15d284b2c33..b573487ecc94b336fc172b0825cb146beb632007 100644 (file)
@@ -31,6 +31,7 @@
 #include "frescan_config.h"  // FRESCAN_MX_REPL_OPS
 #include "frescan_debug.h"   // ERROR
 #include "frescan_data.h"    // frescan_repl_op_t
+#include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
 
 /**
  * the_repl_op_pool - pool of replenishment operations