From: sangorrin Date: Tue, 1 Apr 2008 16:09:15 +0000 (+0000) Subject: negotiator thread and bwres layer X-Git-Url: https://rtime.felk.cvut.cz/gitweb/frescor/fna.git/commitdiff_plain/b8916a2947d44f3a6a97360657a482cd97705d40 negotiator thread and bwres layer git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1060 35b4ef3e-fd22-0410-ab77-dab3279adceb --- diff --git a/src_frescan/frescan.c b/src_frescan/frescan.c index 30fbc02..8f1b9c4 100644 --- a/src_frescan/frescan.c +++ b/src_frescan/frescan.c @@ -34,6 +34,7 @@ #include "frescan_debug.h" // DEBUG #include "frescan_id.h" // frescan_id_set_field, frescan_id_get_field #include "frescan_hw_buffer.h" // frescan_hw_buffer_update +#include "frescan_reply_objects.h" // frescan_replyobjects_init #include "frescan_servers_replenishments.h" // frescan_replenishments_xxx static int frescan_hook_frame_recv (const struct can_chip_t *chip, @@ -146,6 +147,12 @@ int frescan_init(frescan_init_params_t *params) return -1; } + ret = frescan_replyobjects_init(FRESCAN_REPLY_OBJECTS_MX_CEILING); + if (ret != 0) { + ERROR("could not initialize the reply objects\n"); + return -1; + } + return 0; } diff --git a/src_frescan/frescan_bandwidth_reservation.c b/src_frescan/frescan_bandwidth_reservation.c new file mode 100644 index 0000000..82166eb --- /dev/null +++ b/src_frescan/frescan_bandwidth_reservation.c @@ -0,0 +1,137 @@ +/*! + * @file frescan_bandwidth_reservation.c + * + * @brief FRESCAN bandwidth reservation layer + * + * This module contains function to negotiate contracts and get the + * corresponding frescan sporadic servers. + * + * @version 0.01 + * + * @date 1-Apr-2008 + * + * @author Daniel Sangorrin + * + */ + +#include "frescan_bandwidth_reservation.h" +#include "frescan_data.h" +#include "frescan_requests_queue.h" // frescan_requests_init +#include "frescan_negotiator_thread.h" // frescan_negotiator_thread_create +#include "frescan_debug.h" +#include "frescan_config.h" + +/** + * frescan_bwres_init() + * + * Init the frescan bandwidth reservation layer + */ + +int frescan_bwres_init(frescan_network_t net) +{ + int ret; + + if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am master node\n"); + ret = frescan_requests_init(FRESCAN_REQUESTS_MX_CEILING); + if (ret != 0) { + ERROR("could not initialize the requests\n"); + return ret; + } + + ret = frescan_negotiator_thread_create(net); + if (ret != 0) { + ERROR("could not initialize the negotiator thread\n"); + return ret; + } + } else { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am a slave node\n"); + } + + return 0; +} + +/** + * frescan_bwres_negotiate() + * + * negotiate a contract. For that we allocate a reply object and then + * we enqueue our request in the master's requests queue (which can be + * local or require a network message) + */ + +int frescan_bwres_negotiate(frescan_network_t net, + const frescan_contract_t *contract, + frescan_ss_t *id) +{ + int ret; + frescan_robj_id_t reply; + frescan_request_id_t request; + + ret = frescan_replyobject_alloc(&reply, FRESCAN_BWRES_MX_PRIO); + if (ret != 0) { + ERROR("could not allocate reply object\n"); + return ret; + } + + if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { + ret = frescan_request_alloc(&request); + if (ret != 0) { + ERROR("could not allocate request\n"); + return ret; + } + + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "set FRESCAN_NEGOTIATE type: %d\n", FRESCAN_NEGOTIATE); + ret = frescan_request_set_type(request, FRESCAN_NEGOTIATE); + if (ret != 0) { + ERROR("could not set type\n"); + return ret; + } + + ret = frescan_request_set_reply(request, reply); + if (ret != 0) { + ERROR("could not set reply\n"); + return ret; + } + + ret = frescan_request_set_contract(request, contract); + if (ret != 0) { + ERROR("could not set contract\n"); + return ret; + } + + ret = frescan_request_set_src(request, FRESCAN_NEG_MASTER_NODE); + if (ret != 0) { + ERROR("could not set src\n"); + return ret; + } + + ret = frescan_requestqueue_enqueue(request); + if (ret != 0) { + ERROR("could not enqueue the request\n"); + return ret; + } + } else { + DEBUG(true, "send the request to the master.. not done\n"); + } + + ret = frescan_replyobject_wait(reply); + if (ret != 0) { + ERROR("error while waiting on the reply object\n"); + return ret; + } + + ret = frescan_replyobject_free(reply); + if (ret != 0) { + ERROR("could not free reply object\n"); + return ret; + } + + ret = frescan_request_free(request); + if (ret != 0) { + ERROR("could not free request\n"); + return ret; + } + + return 0; +} diff --git a/src_frescan/frescan_bandwidth_reservation.h b/src_frescan/frescan_bandwidth_reservation.h new file mode 100644 index 0000000..b40cc01 --- /dev/null +++ b/src_frescan/frescan_bandwidth_reservation.h @@ -0,0 +1,31 @@ +/*! + * @file frescan_bandwidth_reservation.h + * + * @brief FRESCAN bandwidth reservation layer + * + * This module contains function to negotiate contracts and get the + * corresponding frescan sporadic servers. + * + * @version 0.01 + * + * @date 1-Apr-2008 + * + * @author Daniel Sangorrin + * + */ + +#ifndef _FRESCAN_BANDWIDTH_RESERVATION_H_ +#define _FRESCAN_BANDWIDTH_RESERVATION_H_ + +#include "frescan.h" +#include "frescan_data.h" + +extern int frescan_bwres_init(frescan_network_t net); + +extern int frescan_bwres_negotiate(frescan_network_t net, + const frescan_contract_t *contract, + frescan_ss_t *id); + +// TODO: add other functions: renegotiate, cancel... + +#endif // _FRESCAN_BANDWIDTH_RESERVATION_H_ diff --git a/src_frescan/frescan_config.h b/src_frescan/frescan_config.h index 0883220..1da0e44 100644 --- a/src_frescan/frescan_config.h +++ b/src_frescan/frescan_config.h @@ -36,7 +36,12 @@ #define FRESCAN_BACKGROUND_PRIO 0 #define FRESCAN_MX_REPLY_OBJECTS 40 #define FRESCAN_REPL_THREAD_PRIO 60 +#define FRESCAN_NEG_THREAD_PRIO 50 #define FRESCAN_MX_REQUESTS 40 +#define FRESCAN_NEG_MASTER_NODE 0 +#define FRESCAN_REPLY_OBJECTS_MX_CEILING 90 +#define FRESCAN_REQUESTS_MX_CEILING 90 +#define FRESCAN_BWRES_MX_PRIO 60 #define FRESCAN_MLOCK_T unsigned #define FRESCAN_CREATE_LOCK(l) diff --git a/src_frescan/frescan_data.h b/src_frescan/frescan_data.h index 079a4e2..420a626 100644 --- a/src_frescan/frescan_data.h +++ b/src_frescan/frescan_data.h @@ -179,6 +179,7 @@ typedef struct { frescan_node_t local_node; int fd; fosa_thread_id_t repl_thread_id; + fosa_thread_id_t neg_thread_id; frescan_queues_t queues; frescan_packet_t *last_packet; frescan_prio_t last_packet_prio; diff --git a/src_frescan/frescan_debug.h b/src_frescan/frescan_debug.h index 7048c89..740898d 100644 --- a/src_frescan/frescan_debug.h +++ b/src_frescan/frescan_debug.h @@ -51,6 +51,9 @@ #define FRESCAN_QUEUES_ENABLE_DEBUG false #define FRESCAN_HW_BUFFER_ENABLE_DEBUG false #define FRESCAN_REPL_ENABLE_DEBUG false -#define FRESCAN_REPLYOBJ_ENABLE_DEBUG true +#define FRESCAN_REPLYOBJ_ENABLE_DEBUG false +#define FRESCAN_NEG_THREAD_ENABLE_DEBUG true +#define FRESCAN_BWRES_ENABLE_DEBUG true +#define FRESCAN_REQUESTS_ENABLE_DEBUG true #endif // _MARTE_FRESCAN_DEBUG_H_ diff --git a/src_frescan/frescan_negotiator_thread.c b/src_frescan/frescan_negotiator_thread.c new file mode 100644 index 0000000..e260293 --- /dev/null +++ b/src_frescan/frescan_negotiator_thread.c @@ -0,0 +1,115 @@ +/*! + * @file frescan_negotiator_thread.c + * + * @brief FRESCAN negotiator thread + * + * This module contains the negotiator thread, with an operation to create it. + * + * @version 0.01 + * + * @date 1-Apr-2008 + * + * @author Daniel Sangorrin + * + */ + +#include +#include "frescan_negotiator_thread.h" +#include "fosa_threads_and_signals.h" // fosa_thread_attr_init... +#include "frescan_config.h" +#include "frescan_debug.h" +#include "frescan_data.h" +#include "frescan_requests_queue.h" + +static void *frescan_neg_thread(void *arg); + +/** + * frescan_negotiator_thread_create() + */ + +int frescan_negotiator_thread_create(frescan_network_t net) +{ + int ret; + fosa_thread_attr_t attr; + + ret = fosa_thread_attr_init(&attr); + if (ret != 0) { + ERROR("could not init thread attributes\n"); + return ret; + } + + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO); + if (ret != 0) { + ERROR("could not set neg thread prio %d\n", + FRESCAN_NEG_THREAD_PRIO); + return ret; + } + + ret = fosa_thread_create(&the_networks[net].neg_thread_id, + &attr, + frescan_neg_thread, + (void *)(uint32_t)net); + + if (ret != 0) { + ERROR("could not create the negotiator thread\n"); + return ret; + } + + ret = fosa_thread_attr_destroy(&attr); + if (ret != 0) { + ERROR("could not destroy thread attributes\n"); + return ret; + } + + return 0; +} + +/** + * frescan_neg_thread - the thread that negotiates contracts + */ + +static void *frescan_neg_thread(void *arg) +{ + int err; + frescan_request_id_t request; + frescan_req_type_t type; + frescan_robj_id_t reply; + frescan_contract_t *contract; + + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n"); + + while(1) { + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n"); + + err = frescan_requestqueue_dequeue(&request); + assert(err == 0); + + err = frescan_request_get_type(request, &type); + assert(err == 0); + + switch(type) { + case FRESCAN_NEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_NEGOTIATE request\n"); + err = frescan_request_get_contract(request, &contract); + assert(err == 0); + err = frescan_request_get_reply(request, &reply); + assert(err == 0); + err = frescan_replyobject_signal(reply); + assert(err == 0); + break; + case FRESCAN_RENEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_RENEGOTIATE request\n"); + break; + case FRESCAN_CANCEL: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_CANCEL request\n"); + break; + default: + ERROR("wrong request type %d\n", type); + } + } + + return NULL; +} diff --git a/src_frescan/frescan_negotiator_thread.h b/src_frescan/frescan_negotiator_thread.h new file mode 100644 index 0000000..875fe48 --- /dev/null +++ b/src_frescan/frescan_negotiator_thread.h @@ -0,0 +1,23 @@ +/*! + * @file frescan_negotiator_thread.h + * + * @brief FRESCAN negotiator thread + * + * This module contains the negotiator thread, with an operation to create it. + * + * @version 0.01 + * + * @date 1-Apr-2008 + * + * @author Daniel Sangorrin + * + */ + +#ifndef _FRESCAN_NEGOTIATOR_THREAD_H_ +#define _FRESCAN_NEGOTIATOR_THREAD_H_ + +#include "frescan.h" + +extern int frescan_negotiator_thread_create(frescan_network_t net); + +#endif // _FRESCAN_NEGOTIATOR_THREAD_H_ diff --git a/src_frescan/frescan_requests_queue.c b/src_frescan/frescan_requests_queue.c index 2c6f81b..baa2db3 100644 --- a/src_frescan/frescan_requests_queue.c +++ b/src_frescan/frescan_requests_queue.c @@ -20,6 +20,7 @@ #include "frescan.h" #include "frescan_requests_queue.h" #include "frescan_config.h" +#include "frescan_debug.h" #include "fosa_mutexes_and_condvars.h" static bool is_initialized = false; @@ -141,6 +142,7 @@ locked_error: int frescan_request_set_type(frescan_request_id_t id, frescan_req_type_t type) { + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, type); the_requests_pool[id].type = type; return 0; } @@ -162,9 +164,9 @@ int frescan_request_set_reply(frescan_request_id_t id, frescan_robj_id_t reply) **/ int frescan_request_set_contract(frescan_request_id_t id, - frescan_contract_t *contract) + const frescan_contract_t *contract) { - the_requests_pool[id].contract = contract; + the_requests_pool[id].contract = (frescan_contract_t *)contract; return 0; } @@ -187,6 +189,7 @@ int frescan_request_set_src(frescan_request_id_t id, frescan_node_t src) int frescan_request_get_type(frescan_request_id_t id, frescan_req_type_t *type) { *type = the_requests_pool[id].type; + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, *type); return 0; } @@ -236,8 +239,16 @@ int frescan_requestqueue_enqueue(frescan_request_id_t id) err = fosa_mutex_lock(&requests_mutex); if (err != 0) return err; - list_add_tail(&the_requests_list.request_list, - &the_requests_pool[id].request_list); + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, + "is list empty A? %d\n", + list_empty(&the_requests_list.request_list)); + + list_add_tail(&the_requests_pool[id].request_list, + &the_requests_list.request_list); + + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, + "is list empty B? %d\n", + list_empty(&the_requests_list.request_list)); err = fosa_cond_signal(&requests_cond); if (err != 0) goto locked_error; @@ -266,12 +277,17 @@ int frescan_requestqueue_dequeue(frescan_request_id_t *id) err = fosa_mutex_lock(&requests_mutex); if (err != 0) return err; - while (list_empty(&the_requests_pool[*id].request_list)) { + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "entering\n"); + + while (list_empty(&the_requests_list.request_list)) { err = fosa_cond_wait(&requests_cond, &requests_mutex); if (err != 0) goto locked_error; + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "received signal\n"); } - list_for_each(pos, &the_requests_pool[*id].request_list) { + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "dequeueing a request\n"); + + list_for_each(pos, &the_requests_list.request_list) { request = list_entry(pos, struct request_t, request_list); break; } @@ -280,6 +296,10 @@ int frescan_requestqueue_dequeue(frescan_request_id_t *id) *id = request->pool_pos; + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, + "is list empty now? %d\n", + list_empty(&the_requests_list.request_list)); + err = fosa_mutex_unlock(&requests_mutex); if (err != 0) return err; diff --git a/src_frescan/frescan_requests_queue.h b/src_frescan/frescan_requests_queue.h index 3ae1108..d0d0844 100644 --- a/src_frescan/frescan_requests_queue.h +++ b/src_frescan/frescan_requests_queue.h @@ -42,7 +42,7 @@ extern int frescan_request_set_reply(frescan_request_id_t id, frescan_robj_id_t reply); extern int frescan_request_set_contract(frescan_request_id_t id, - frescan_contract_t *contract); + const frescan_contract_t *contract); extern int frescan_request_set_src(frescan_request_id_t id, frescan_node_t src); diff --git a/src_frescan/frescan_servers_replenishments.c b/src_frescan/frescan_servers_replenishments.c index 78b34f4..b573487 100644 --- a/src_frescan/frescan_servers_replenishments.c +++ b/src_frescan/frescan_servers_replenishments.c @@ -31,6 +31,7 @@ #include "frescan_config.h" // FRESCAN_MX_REPL_OPS #include "frescan_debug.h" // ERROR #include "frescan_data.h" // frescan_repl_op_t +#include "fosa_threads_and_signals.h" // fosa_thread_attr_init... /** * the_repl_op_pool - pool of replenishment operations