From 5ac29d6d0804a77ef8f795b2006723b6b71d3566 Mon Sep 17 00:00:00 2001 From: sangorrin Date: Thu, 17 Apr 2008 10:49:47 +0000 Subject: [PATCH] renamings... redo the request and messages part... also now there will be two threads in each node like in the dtm... still not compile.. i have to finish it in the afternoon git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1118 35b4ef3e-fd22-0410-ab77-dab3279adceb --- src_frescan/frescan.c | 8 +- src_frescan/frescan_bwres.c | 346 +++-------------- src_frescan/frescan_bwres_messages.c | 351 +++++++++++------- src_frescan/frescan_bwres_messages.h | 33 +- ...an_requests.c => frescan_bwres_requests.c} | 167 ++------- src_frescan/frescan_bwres_requests.h | 112 ++++++ ...can_reply_objs.c => frescan_bwres_robjs.c} | 30 +- ...can_reply_objs.h => frescan_bwres_robjs.h} | 22 +- src_frescan/frescan_bwres_threads.c | 185 +++++---- src_frescan/frescan_bwres_threads.h | 26 +- src_frescan/frescan_data.h | 9 - src_frescan/frescan_requests.h | 76 ---- 12 files changed, 580 insertions(+), 785 deletions(-) rename src_frescan/{frescan_requests.c => frescan_bwres_requests.c} (56%) create mode 100644 src_frescan/frescan_bwres_requests.h rename src_frescan/{frescan_reply_objs.c => frescan_bwres_robjs.c} (91%) rename src_frescan/{frescan_reply_objs.h => frescan_bwres_robjs.h} (52%) delete mode 100644 src_frescan/frescan_requests.h diff --git a/src_frescan/frescan.c b/src_frescan/frescan.c index df672f9..4084c8c 100644 --- a/src_frescan/frescan.c +++ b/src_frescan/frescan.c @@ -34,7 +34,7 @@ #include "frescan_debug.h" // DEBUG #include "frescan_id.h" // frescan_id_set_field, frescan_id_get_field #include "frescan_hw_buffer.h" // frescan_hw_buffer_update -#include "frescan_reply_objs.h" // frescan_replyobjects_init +#include "frescan_bwres_robjs.h" // frescan_replyobjects_init #include "frescan_servers_replenishments.h" // frescan_replenishments_xxx #include "frescan_packets.h" @@ -157,12 +157,6 @@ int frescan_init(frescan_init_params_t *params) return -1; } - ret = frescan_replyobjects_init(FRESCAN_REPLY_OBJECTS_MX_CEILING); - if (ret != 0) { - ERROR("could not initialize the reply objects\n"); - return -1; - } - return 0; } diff --git a/src_frescan/frescan_bwres.c b/src_frescan/frescan_bwres.c index a69de04..848ae3d 100644 --- a/src_frescan/frescan_bwres.c +++ b/src_frescan/frescan_bwres.c @@ -15,13 +15,13 @@ */ #include "frescan_bwres.h" +#include "frescan_bwres_requests.h" +#include "frescan_bwres_messages.h" +#include "frescan_bwres_threads.h" #include "frescan_data.h" -#include "frescan_requests.h" // frescan_requests_init #include "frescan_debug.h" #include "frescan_config.h" #include "frescan_servers.h" -#include "frescan_bwres_messages.h" -#include "frescan_bwres_threads.h" /** * frescan_bwres_init() @@ -35,6 +35,7 @@ int frescan_bwres_init(frescan_network_t net) frescan_server_params_t params; // TODO: server params must be configurable + // TODO: initialization tree like in DTM params.values.budget = 5; params.values.period.tv_sec = 1; params.values.period.tv_nsec = 0; @@ -42,35 +43,19 @@ int frescan_bwres_init(frescan_network_t net) ret = frescan_servers_create(net, ¶ms, &the_networks[net].neg_messages_ss_id); - if (ret != 0) { - ERROR("could not create server for negotiation messages\n"); - return ret; - } + if (ret != 0) return ret; + + ret = frescan_bwres_robjs_init(FRESCAN_REPLY_OBJECTS_MX_CEILING); + if (ret != 0) return ret; ret = frescan_requests_init(FRESCAN_REQUESTS_MX_CEILING); - if (ret != 0) { - ERROR("could not initialize the requests\n"); - return ret; - } + if (ret != 0) return ret; - if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am master node (%u)\n", - the_networks[net].local_node); - ret = frescan_master_neg_thread_create(net); - if (ret != 0) { - ERROR("could not initialize the negotiator thread\n"); - return ret; - } - } else { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "i am a slave node (%u)\n", - the_networks[net].local_node); - } + ret = frescan_manager_thread_create(net); + if (ret != 0) return ret; ret = frescan_acceptor_thread_create(net); - if (ret != 0) { - ERROR("could not create acceptor thread\n"); - return ret; - } + if (ret != 0) return ret; return 0; } @@ -78,286 +63,75 @@ int frescan_bwres_init(frescan_network_t net) /** * frescan_bwres_negotiate() * - * negotiate a contract. For that we allocate a reply object and then - * we enqueue our request in the master's requests queue (if we are in - * the master node) or send it to the master through the network. + * to negotiate a contract we follow the next steps: + * + * 1.- prepare a request + * 2.- enqueue the request + * 3.- wait in the reply object for a reply + * 4.- return the final values + * */ int frescan_bwres_negotiate(frescan_network_t net, const frescan_contract_t *contract, frescan_ss_t *id) { - int ret; - frescan_robj_id_t reply; - frescan_request_id_t request; - frescan_neg_return_info_t return_info; - uint8_t msg[200]; - int size; - frescan_send_params_t params; - - ret = frescan_replyobject_alloc(&reply, FRESCAN_BWRES_MX_PRIO); - if (ret != 0) { - ERROR("could not allocate reply object\n"); - return ret; - } - - ret = frescan_request_alloc(&request); - if (ret != 0) { - ERROR("could not allocate request\n"); - return ret; - } - - ret = frescan_request_set_return_info(request, - (void *) &return_info); - if (ret != 0) { - ERROR("could not set return_info pointer\n"); - return ret; - } - - ret = frescan_request_set_reply(request, reply); - if (ret != 0) { - ERROR("could not set reply\n"); - return ret; - } - - if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "I am master, negotiation in local node\n"); - - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "set FRESCAN_NEGOTIATE type: %d\n", FRESCAN_NEGOTIATE); - - ret = frescan_request_set_type(request, FRESCAN_NEGOTIATE); - if (ret != 0) { - ERROR("could not set type\n"); - return ret; - } - - ret = frescan_request_set_contract(request, contract); - if (ret != 0) { - ERROR("could not set contract\n"); - return ret; - } - - ret = frescan_request_set_src(request, FRESCAN_NEG_MASTER_NODE); - if (ret != 0) { - ERROR("could not set src\n"); - return ret; - } - - ret = frescan_requestqueue_enqueue(request); - if (ret != 0) { - ERROR("could not enqueue the request\n"); - return ret; - } - } else { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "I am slave, negotiation in master node\n"); - - size = frescan_neg_message_create(msg, request, contract); - - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "created a neg message, size: %d\n", size); - - params.net = net; - params.to = FRESCAN_NEG_MASTER_NODE; - params.channel = FRESCAN_NEG_CHANNEL; - - // NOTE: if we sent the negotiation msgs with fp: - // params.flags = FRESCAN_FP | FRESCAN_ASYNC; - // params.prio = 8; - - params.flags = FRESCAN_SS | FRESCAN_ASYNC; - params.ss = the_networks[net].neg_messages_ss_id; - - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "send msg to master, net:%u to:%u ss:%u\n", - params.net, params.to, params.ss); - - ret = frescan_send(¶ms, msg, size); - if (ret != 0) { - ERROR("error while sending neg request to master\n"); - return ret; - } - } - - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "wait on reply object\n"); - - ret = frescan_replyobject_wait(reply); - if (ret != 0) { - ERROR("error while waiting on the reply object\n"); - return ret; - } - - ret = frescan_replyobject_free(reply); - if (ret != 0) { - ERROR("could not free reply object\n"); - return ret; - } - - if (return_info.error) { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "negotiation was not accepted, error:%d\n", - return_info.error); - } else { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "negotiation finished succesfully ss:%d\n", - return_info.id); - } - - *id = return_info.id; - - ret = frescan_request_free(request); - if (ret != 0) { - ERROR("could not free request\n"); - return ret; - } - - return return_info.error; -} - -/** - * frescan_bwres_renegotiate() - * - * renegotiate a contract. For that we allocate a reply object and then - * we enqueue our request in the master's requests queue (if we are in - * the master node) or send it to the master through the network. - */ - -int frescan_bwres_renegotiate(frescan_network_t net, - const frescan_contract_t *contract, - frescan_ss_t id) -{ - int ret; - frescan_robj_id_t reply; - frescan_request_id_t request; - frescan_neg_return_info_t return_info; - uint8_t msg[200]; - int size; - frescan_send_params_t params; + int ret, negotiation_ret; + frescan_request_id_t req; + frescan_request_data_t *req_data; - ret = frescan_replyobject_alloc(&reply, FRESCAN_BWRES_MX_PRIO); - if (ret != 0) { - ERROR("could not allocate reply object\n"); - return ret; - } - - ret = frescan_request_alloc(&request); - if (ret != 0) { - ERROR("could not allocate request\n"); - return ret; - } - - ret = frescan_request_set_return_info(request, - (void *) &return_info); - if (ret != 0) { - ERROR("could not set return_info pointer\n"); - return ret; - } - - ret = frescan_request_set_reply(request, reply); - if (ret != 0) { - ERROR("could not set reply\n"); - return ret; - } - - ret = frescan_request_set_ss(request, id); - if (ret != 0) { - ERROR("could not set server id\n"); - return ret; - } + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "preparing a negotiation request\n"); - if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "I am master, renegotiation in local node\n"); + ret = frescan_request_alloc(&req); + if (ret != 0) return ret; - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "set FRESCAN_RENEGOTIATE type: %d\n", - FRESCAN_RENEGOTIATE); + ret = frescan_requests_get_data(req, &req_data); + if (ret != 0) return ret; - ret = frescan_request_set_type(request, FRESCAN_RENEGOTIATE); - if (ret != 0) { - ERROR("could not set type\n"); - return ret; - } + req_data->type = FRESCAN_REQ_NEG; + req_data->req = req; + req_data->contract = (frescan_contract_t *)contract; + req_data->request_node = the_networks[net].local_node; - ret = frescan_request_set_contract(request, contract); - if (ret != 0) { - ERROR("could not set contract\n"); - return ret; - } + ret = frescan_bwres_robjs_alloc(&req_data->robj, FRESCAN_BWRES_MX_PRIO); + if (ret != 0) return ret; - ret = frescan_request_set_src(request, FRESCAN_NEG_MASTER_NODE); - if (ret != 0) { - ERROR("could not set src\n"); - return ret; - } + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "enqueue the negotiation request\n"); - ret = frescan_requestqueue_enqueue(request); - if (ret != 0) { - ERROR("could not enqueue the request\n"); - return ret; - } - } else { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "I am slave, renegotiation in master node\n"); + ret = frescan_requests_enqueue(req); + if (ret != 0) return ret; - size = frescan_reneg_message_create(msg, request, contract); + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "wait for a reply\n"); - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "created a reneg message, size: %d\n", size); + ret = frescan_replyobject_wait(req_data->robj); + if (ret != 0) return ret; - params.net = net; - params.to = FRESCAN_NEG_MASTER_NODE; - params.channel = FRESCAN_NEG_CHANNEL; + ret = frescan_bwres_robjs_free(req_data->robj); + if (ret != 0) return ret; - // NOTE: if we sent the negotiation msgs with fp: - // params.flags = FRESCAN_FP | FRESCAN_ASYNC; - // params.prio = 8; + switch (req_data->return_value) { + case FRESCAN_REQ_ACCEPTED: + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "negotiation OK\n"); + *id = req_data->ss; + ret = 0; + break; - params.flags = FRESCAN_SS | FRESCAN_ASYNC; - params.ss = the_networks[net].neg_messages_ss_id; + case FRESCAN_REQ_NOT_ACCEPTED: + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "negotiation FAIL\n"); + ret = -1; + break; - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "send msg to master, net:%u to:%u ss:%u\n", - params.net, params.to, params.ss); + case FRESCAN_REQ_ERROR: + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "negotiation ERROR\n"); + ret = -1; + break; - ret = frescan_send(¶ms, msg, size); - if (ret != 0) { - ERROR("error while sending neg request to master\n"); - return ret; - } + default: + ERROR("return_value unknown\n"); + return -1; } - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "wait on reply object\n"); - - ret = frescan_replyobject_wait(reply); - if (ret != 0) { - ERROR("error while waiting on the reply object\n"); - return ret; - } - - ret = frescan_replyobject_free(reply); - if (ret != 0) { - ERROR("could not free reply object\n"); - return ret; - } - - if (return_info.error) { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "renegotiation was not accepted, error:%d\n", - return_info.error); - } else { - DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, - "renegotiation finished succesfully for ss:%d\n", - id); - } - - ret = frescan_request_free(request); - if (ret != 0) { - ERROR("could not free request\n"); - return ret; - } + frescan_request_free(req); - return return_info.error; + return ret; } diff --git a/src_frescan/frescan_bwres_messages.c b/src_frescan/frescan_bwres_messages.c index 5e5225d..2d1cd8d 100644 --- a/src_frescan/frescan_bwres_messages.c +++ b/src_frescan/frescan_bwres_messages.c @@ -17,175 +17,278 @@ #include #include "frescan_bwres_messages.h" +#include "frescan_bwres_requests.h" +#include "frescan.h" #include "frescan_config.h" #include "frescan_debug.h" -#include "frescan_requests.h" -#include "frescan_servers.h" /** * - * NEG MESSAGE + * FRESCAN_REQ_NEG MESSAGE + * ======================= + * This message is sent from a SLAVE to the MASTER when the slave wants + * to negotiate a new contract. It contains the type (a negotiation 'NEG' + * request), the LOCAL request id (so the MASTER can use it in the reply + * to identify to which request is replying), a preallocated sporadic + * server id (the MASTER will store it in its table together with the + * node so we can perform renegotiations and spare capacity distribution + * in the future) and the contract. * - * 1 2 N - * +-----------------------+ - * | 'NEG' | ID | CONTRACT | - * +-----------------------+ - * - * REP_NEG MESSAGE - * - * +----------------------------------------+ - * | 'REPNEG' | ID | ACCEPTED | SC_CONTRACT | - * +----------------------------------------+ - * - * RENEGOTIATE = NEG but with different type - * - * CANCEL MESSAGE - * - * +--------------------------------+ - * | 'CANCEL' | ID | CONTRACT_LABEL | - * +--------------------------------+ - * - * REP_CANCEL MESSAGE - * - * +---------------------------+ - * | 'REP_CANCEL' | ID | ERROR | - * +---------------------------+ - * - * ID: the id of the request + * 1 2 2 N + * +-----------------------------+ + * | 'NEG' | REQ | SS | CONTRACT | + * +-----------------------------+ * */ -typedef enum { - FRESCAN_MSG_TYPE_NEG = 0, - FRESCAN_MSG_TYPE_RENEG = 1, - FRESCAN_MSG_TYPE_CANCEL = 2, - FRESCAN_MSG_TYPE_REP_NEG = 3, - FRESCAN_MSG_TYPE_REP_RENEG = 4, - FRESCAN_MSG_TYPE_REP_CANCEL = 5 -} frescan_msg_type_t; - -int frescan_neg_message_create(uint8_t *msg, - frescan_request_id_t id, - const frescan_contract_t *contract) +struct frescan_req_neg_message_t { + frescan_request_type_t type; + frescan_request_id_t req; + frescan_ss_t ss; + frescan_contract_t contract; +} __attribute__ ((packed)); + +static int frescan_request_to_neg_message(const frescan_request_data_t *data, + uint8_t *msg) { - size_t num, bytes_written; - uint8_t *tmp = msg; + struct frescan_req_neg_message_t *neg_msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "creating a negotiation request message\n"); + neg_msg = (struct frescan_req_neg_message_t *)msg; - *tmp = (uint8_t)FRESCAN_MSG_TYPE_NEG; - tmp++; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "type: %d (1 byte)\n", FRESCAN_MSG_TYPE_NEG); + neg_msg->type = FRESCAN_REQ_NEG; + neg_msg->req = data->req; + neg_msg->ss = data->ss; + neg_msg->contract = *(data->contract); - num = 2; - memcpy(tmp, &id, num); - tmp = tmp + num; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "request id: %d (2 bytes)\n", id); + return 0; +} - num = sizeof(frescan_contract_t); - memcpy(tmp, contract, num); - tmp = tmp + num; +static int frescan_neg_message_to_request(const uint8_t *msg, + frescan_request_data_t *data) +{ + struct frescan_req_neg_message_t *neg_msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "contract (%d bytes)\n", num); + neg_msg = (struct frescan_req_neg_message_t *)msg; - bytes_written = tmp - msg; + data->type = FRESCAN_REQ_NEG; + data->req = neg_msg->req; + data->ss = neg_msg->ss; + *(data->contract) = neg_msg->contract; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "total bytes_written: %d\n", bytes_written); + return 0; +} - return (int)bytes_written; +/** + * + * FRESCAN_REQ_RENEG MESSAGE + * ========================= + * This message is sent from a SLAVE to the MASTER when the slave wants + * to renegotiate a contract. It contains the type (a renegotiation 'RENEG' + * request), the LOCAL request id (so the MASTER can use it in the reply + * to identify to which request is replying), the sporadic server id + * (the MASTER will look up its table together with the node to find the + * appropiate entry) and the contract. + * + * 1 2 2 N + * +-------------------------------+ + * | 'RENEG' | REQ | SS | CONTRACT | + * +-------------------------------+ + * + */ + +struct frescan_req_reneg_message_t { + frescan_request_type_t type; + frescan_request_id_t req; + frescan_ss_t ss; + frescan_contract_t contract; +} __attribute__ ((packed)); + +static int frescan_request_to_reneg_message(const frescan_request_data_t *data, + uint8_t *msg) +{ + struct frescan_req_reneg_message_t *reneg_msg; + + reneg_msg = (struct frescan_req_reneg_message_t *)msg; + + reneg_msg->type = FRESCAN_REQ_RENEG; + reneg_msg->req = data->req; + reneg_msg->ss = data->ss; + reneg_msg->contract = *(data->contract); + + return 0; } -int frescan_repneg_message_create(uint8_t *msg, - frescan_request_id_t id, - int accepted, - frescan_server_params_t *params) +static int frescan_reneg_message_to_request(const uint8_t *msg, + frescan_request_data_t *data) { - size_t num, bytes_written; - uint8_t *tmp = msg; + struct frescan_req_reneg_message_t *reneg_msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "creating a negotiation reply message\n"); + reneg_msg = (struct frescan_req_reneg_message_t *)msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "type: %d (1 byte)\n", FRESCAN_MSG_TYPE_REP_NEG); - *tmp = (uint8_t)FRESCAN_MSG_TYPE_REP_NEG; - tmp++; + data->type = FRESCAN_REQ_RENEG; + data->req = reneg_msg->req; + data->ss = reneg_msg->ss; + *(data->contract) = reneg_msg->contract; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "request id: %d (2 bytes)\n", id); - num = 2; - memcpy(tmp, &id, num); - tmp = tmp + num; + return 0; +} - *tmp = (uint8_t)accepted; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, "accepted:%u\n", *tmp); - tmp++; +/** + * FRESCAN_REQ_CANCEL MESSAGE + * ========================== + * This message is sent from a SLAVE to the MASTER to cancel a contract. + * It contains the type, 'CANCEL' and the sporadic server id (the MASTER will + * have to look it up in the table). The MASTER doesnt need to reply this + * message. + * + * +---------------+ + * | 'CANCEL' | SS | + * +---------------+ + * + */ - num = sizeof(frescan_budget_period_t); - memcpy(tmp, ¶ms->values, num); - tmp = tmp + num; +struct frescan_req_cancel_message_t { + frescan_request_type_t type; + frescan_ss_t ss; +} __attribute__ ((packed)); - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "budget_period (%d bytes)\n", num); +static int frescan_request_to_cancel_message(const frescan_request_data_t *data, + uint8_t *msg) +{ + struct frescan_req_cancel_message_t *cancel_msg; - num = 1; - memcpy(tmp, ¶ms->prio, num); - tmp = tmp + num; + cancel_msg = (struct frescan_req_cancel_message_t *)msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "server prio:%u\n", params->prio); + cancel_msg->type = FRESCAN_REQ_CANCEL; + cancel_msg->ss = data->ss; + + return 0; +} + +static int frescan_cancel_message_to_request(const uint8_t *msg, + frescan_request_data_t *data) +{ + struct frescan_req_cancel_message_t *cancel_msg; - bytes_written = tmp - msg; + cancel_msg = (struct frescan_req_cancel_message_t *)msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "total bytes_written: %d\n", bytes_written); + data->type = FRESCAN_REQ_CANCEL; + data->ss = cancel_msg->ss; - return (int)bytes_written; + return 0; } -int frescan_reneg_message_create(uint8_t *msg, - frescan_request_id_t id, - const frescan_contract_t *contract) +/** + * + * FRESCAN_REP_NEG MESSAGE + * ======================= + * This message is sent from the MASTER to a slave as a reply to a + * FRESCAN_REQ_NEG or a FRESCAN_REQ_RENEG, to say if they were admited. + * It contains the type 'REPNEG', the request ID of the slave and a + * return value to say if the contract is admited, not admited or if + * there was an error. + * + * +-------------------------------+ + * | 'REPNEG' | REQ | RETURN_VALUE | + * +-------------------------------+ + * + */ + +struct frescan_rep_neg_message_t { + frescan_request_type_t type; + frescan_request_id_t req; + frescan_request_retval_t return_value; +} __attribute__ ((packed)); + +static int frescan_request_to_repneg_message(const frescan_request_data_t *data, + uint8_t *msg) { - size_t num, bytes_written; - uint8_t *tmp = msg; + struct frescan_rep_neg_message_t *repneg_msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "creating a renegotiation request message\n"); + repneg_msg = (struct frescan_rep_neg_message_t *)msg; - *tmp = (uint8_t)FRESCAN_MSG_TYPE_RENEG; - tmp++; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "type: %d (1 byte)\n", FRESCAN_MSG_TYPE_RENEG); + repneg_msg->type = FRESCAN_REP_NEG; + repneg_msg->req = data->req; + repneg_msg->return_value = data->return_value; - num = 2; - memcpy(tmp, &id, num); - tmp = tmp + num; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "request id: %d (2 bytes)\n", id); + return 0; +} - num = sizeof(frescan_contract_t); - memcpy(tmp, contract, num); - tmp = tmp + num; +static int frescan_repneg_message_to_request(const uint8_t *msg, + frescan_request_data_t *data) +{ + struct frescan_rep_neg_message_t *repneg_msg; - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "contract (%d bytes)\n", num); + repneg_msg = (struct frescan_rep_neg_message_t *)msg; - // TODO: ADD server id? or look up the label?? + data->type = FRESCAN_REP_NEG; + data->req = repneg_msg->req; + data->return_value = repneg_msg->return_value; - bytes_written = tmp - msg; + return 0; +} - DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, - "total bytes_written: %d\n", bytes_written); +/** + * frescan_request_to_message() - converts a request into a network message + * + * this function converts a request with the necessary data into a message + * that can be sent through the network. + * + * @req_data: the request data to fill the message bytes (in) + * @msg: buffer with the bytes that will be sent to the network (out) + * + */ - return (int)bytes_written; +int frescan_request_to_message(const frescan_request_data_t *req_data, + uint8_t *msg) +{ + switch(req_data->type) { + case FRESCAN_REQ_NEG: + return frescan_request_to_neg_message(req_data, msg); + case FRESCAN_REQ_RENEG: + return frescan_request_to_reneg_message(req_data, msg); + case FRESCAN_REQ_CANCEL: + return frescan_request_to_cancel_message(req_data, msg); + case FRESCAN_REP_NEG: + return frescan_request_to_repneg_message(req_data, msg); + default: + ERROR("request type not supported\n"); + return -1; + } } +/** + * frescan_message_to_request() - converts a network message into a request + * + * this function is the opposite to the previous one. It will be used by + * the acceptor threads to transform messages received from the network + * into requests. + * + * @msg: buffer with the bytes received from the network (in) + * @req_data: the request data to fill from the message bytes (out) + * + */ + +int frescan_message_to_request(const uint8_t *msg, + frescan_request_data_t *req_data) +{ + switch(*((frescan_request_type_t *)msg)) { + case FRESCAN_REQ_NEG: + return frescan_neg_message_to_request(msg, req_data); + case FRESCAN_REQ_RENEG: + return frescan_reneg_message_to_request(msg, req_data); + case FRESCAN_REQ_CANCEL: + return frescan_cancel_message_to_request(msg, req_data); + case FRESCAN_REP_NEG: + return frescan_repneg_message_to_request(msg, req_data); + default: + ERROR("request type not supported\n"); + return -1; + } +} + +/* + int frescan_message_parse(frescan_network_t net, const uint8_t *msg, size_t size, @@ -372,4 +475,4 @@ int frescan_message_parse(frescan_network_t net, } return 0; -} +}*/ diff --git a/src_frescan/frescan_bwres_messages.h b/src_frescan/frescan_bwres_messages.h index c54d234..74d3ee2 100644 --- a/src_frescan/frescan_bwres_messages.h +++ b/src_frescan/frescan_bwres_messages.h @@ -19,13 +19,36 @@ #define _FRESCAN_NEGOTIATION_MESSAGES_H_ #include -#include "frescan_requests.h" +#include "frescan_bwres_requests.h" #include "frescan_data.h" -extern int frescan_request_to_message(frescan_request_id_t id, // in - uint8_t *msg); // out +/** + * frescan_request_to_message() - converts a request into a network message + * + * this function converts a request with the necessary data into a message + * that can be sent through the network. + * + * @req_data: the request data to fill the message bytes (in) + * @msg: buffer with the bytes that will be sent to the network (out) + * + */ + +extern int frescan_request_to_message(const frescan_request_data_t *req_data, + uint8_t *msg); + +/** + * frescan_message_to_request() - converts a network message into a request + * + * this function is the opposite to the previous one. It will be used by + * the acceptor threads to transform messages received from the network + * into requests. + * + * @msg: buffer with the bytes received from the network (in) + * @req_data: the request data to fill from the message bytes (out) + * + */ -extern int frescan_message_to_request(const uint8_t *msg, // in - frescan_request_id_t id); // in out +extern int frescan_message_to_request(const uint8_t *msg, + frescan_request_data_t *req_data); #endif // _FRESCAN_NEGOTIATION_MESSAGES_H_ diff --git a/src_frescan/frescan_requests.c b/src_frescan/frescan_bwres_requests.c similarity index 56% rename from src_frescan/frescan_requests.c rename to src_frescan/frescan_bwres_requests.c index 0ac95f3..b21db88 100644 --- a/src_frescan/frescan_requests.c +++ b/src_frescan/frescan_bwres_requests.c @@ -1,7 +1,7 @@ /*! - * @file frescan_requests.c + * @file frescan_bwres_requests.c * - * @brief FRESCAN requests + * @brief FRESCAN bandwith reservation layer: requests * * This module contains an operation to create the queue, an operation to * enqueue a message (with a request), and an operation to @@ -18,7 +18,7 @@ #include #include #include "frescan.h" -#include "frescan_requests.h" +#include "frescan_bwres_requests.h" #include "frescan_config.h" #include "frescan_debug.h" #include "fosa_mutexes_and_condvars.h" @@ -33,13 +33,8 @@ static bool is_initialized = false; **/ struct request_t { - frescan_req_type_t type; - frescan_robj_id_t reply; - frescan_contract_t *contract; - frescan_ss_t ss; - frescan_node_t src; - void *return_info; - struct list_head request_list; + frescan_request_data_t request_data; + struct list_head request_list; int pool_pos; }; @@ -48,6 +43,7 @@ static fosa_cond_t requests_cond; static struct request_t the_requests_pool[FRESCAN_MX_REQUESTS]; static freelist_t freelist; + static struct request_t the_requests_list; /** @@ -77,13 +73,13 @@ int frescan_requests_init(int max_ceiling) } /** - * frescan_request_alloc() + * frescan_requests_alloc() * * Allocate a request with the mutex locked * **/ -int frescan_request_alloc(frescan_request_id_t *id) +int frescan_requests_alloc(frescan_request_id_t *req) { int err, pos; @@ -98,8 +94,8 @@ int frescan_request_alloc(frescan_request_id_t *id) err = fosa_mutex_unlock(&requests_mutex); if (err != 0) return err; - *id = (unsigned int)pos; - the_requests_pool[*id].pool_pos = pos; + *req = (unsigned int)pos; + the_requests_pool[*req].pool_pos = pos; return 0; @@ -109,13 +105,13 @@ locked_error: } /** - * frescan_request_free() + * frescan_requests_free() * * free the request from the pool * **/ -int frescan_request_free(frescan_request_id_t id) +int frescan_requests_free(frescan_request_id_t req) { int err; @@ -124,7 +120,7 @@ int frescan_request_free(frescan_request_id_t id) err = fosa_mutex_lock(&requests_mutex); if (err != 0) return err; - err = freelist_free(&freelist, id); + err = freelist_free(&freelist, req); if (err != 0) goto locked_error; err = fosa_mutex_unlock(&requests_mutex); @@ -138,139 +134,24 @@ locked_error: } /** - * frescan_request_set_type() - * - **/ - -int frescan_request_set_type(frescan_request_id_t id, frescan_req_type_t type) -{ - DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, type); - the_requests_pool[id].type = type; - return 0; -} - -/** - * frescan_request_set_reply() - * - **/ - -int frescan_request_set_reply(frescan_request_id_t id, frescan_robj_id_t reply) -{ - the_requests_pool[id].reply = reply; - return 0; -} - -/** - * frescan_request_set_contract() - * - **/ - -int frescan_request_set_contract(frescan_request_id_t id, - const frescan_contract_t *contract) -{ - the_requests_pool[id].contract = (frescan_contract_t *)contract; - return 0; -} - -/** - * frescan_request_set_ss() + * frescan_requests_get_data() - gets the data of the request * - **/ - -int frescan_request_set_ss(frescan_request_id_t id, - frescan_ss_t ss) -{ - the_requests_pool[id].ss = ss; - return 0; -} - -/** - * frescan_request_set_src() - * - **/ - -int frescan_request_set_src(frescan_request_id_t id, frescan_node_t src) -{ - the_requests_pool[id].src = src; - return 0; -} - -/** - * frescan_request_set_return_info() - * - **/ - -int frescan_request_set_return_info(frescan_request_id_t id, - void *return_info) -{ - the_requests_pool[id].return_info = return_info; - return 0; -} - -/** - * frescan_request_get_type() - * - **/ - -int frescan_request_get_type(frescan_request_id_t id, frescan_req_type_t *type) -{ - *type = the_requests_pool[id].type; - DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "id:%d, type:%d\n", id, *type); - return 0; -} - -/** - * frescan_request_get_reply() - * - **/ - -int frescan_request_get_reply(frescan_request_id_t id, frescan_robj_id_t *reply) -{ - *reply = the_requests_pool[id].reply; - return 0; -} - -/** - * frescan_request_get_contract() - * - **/ - -int frescan_request_get_contract(frescan_request_id_t id, - frescan_contract_t **contract) -{ - *contract = the_requests_pool[id].contract; - return 0; -} - -/** - * frescan_request_get_src() - * - **/ - -int frescan_request_get_src(frescan_request_id_t id, frescan_node_t *src) -{ - *src = the_requests_pool[id].src; - return 0; -} - -/** - * frescan_request_get_return_info() - * - **/ + */ -int frescan_request_get_return_info(frescan_request_id_t id, - void **return_info) +int frescan_requests_get_data(frescan_request_id_t req, + frescan_request_data_t **data) { - *return_info = the_requests_pool[id].return_info; + DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "request id:%d\n", req); + *data = &the_requests_pool[req].request_data; return 0; } /** - * frescan_requestqueue_enqueue() + * frescan_requests_enqueue() * **/ -int frescan_requestqueue_enqueue(frescan_request_id_t id) +int frescan_requests_enqueue(frescan_request_id_t req) { int err; @@ -281,7 +162,7 @@ int frescan_requestqueue_enqueue(frescan_request_id_t id) "is list empty A? %d\n", list_empty(&the_requests_list.request_list)); - list_add_tail(&the_requests_pool[id].request_list, + list_add_tail(&the_requests_pool[req].request_list, &the_requests_list.request_list); DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, @@ -306,7 +187,7 @@ locked_error: * **/ -int frescan_requestqueue_dequeue(frescan_request_id_t *id) +int frescan_requests_dequeue(frescan_request_id_t *req) { int err; struct list_head *pos; @@ -332,7 +213,7 @@ int frescan_requestqueue_dequeue(frescan_request_id_t *id) list_del(&request->request_list); - *id = request->pool_pos; + *req = request->pool_pos; DEBUG(FRESCAN_REQUESTS_ENABLE_DEBUG, "is list empty now? %d\n", diff --git a/src_frescan/frescan_bwres_requests.h b/src_frescan/frescan_bwres_requests.h new file mode 100644 index 0000000..2d442e3 --- /dev/null +++ b/src_frescan/frescan_bwres_requests.h @@ -0,0 +1,112 @@ +/*! + * @file frescan_bwres_requests.h + * + * @brief FRESCAN bandwith reservation layer: requests + * + * This module contains the request type definition and queues. They are used + * to ask negotiation, renegotiation or cancel contracts. + * + * @version 0.01 + * + * @date 1-Apr-2008 + * + * @author Daniel Sangorrin + * + */ + +#ifndef _FRESCAN_BWRES_REQUESTS_H_ +#define _FRESCAN_BWRES_REQUESTS_H_ + +#include +#include "frescan_data.h" // frescan_contract_t +#include "frescan_bwres_robjs.h" // frescan_robj_id_t + +typedef uint16_t frescan_request_id_t; /* 0 .. MX_REQUESTS */ + +/** + * frescan_request_data_t + * + * This are the data contained in a request to perform the negotiation of + * contracts. + * + * @type: indicates the type of the request + * @contract: a pointer to the contract to (re)negotiate + * @ss: the local sporadic server ID + * @request_node: the node that performed the request + * @req: the request id of the SLAVE to identify the request in the reply + * @return_value: the value returned in a Reply + * @robj: a reply object to wait until a negotiation is completed + * + */ + +typedef enum { + FRESCAN_REQ_NEG = 0, // Negotiate a contract + FRESCAN_REQ_RENEG = 1, // Renegotiate a contract + FRESCAN_REQ_CANCEL = 2, // Cancel a contract + FRESCAN_REP_NEG = 3, // Reply to (Re)Negotiate a contract +} frescan_request_type_t; + +typedef enum { + FRESCAN_REQ_ACCEPTED = 0, // the (re)negotiation is accepted + FRESCAN_REQ_NOT_ACCEPTED = 1, // the (re)negotiation is not accepted + FRESCAN_REQ_ERROR = 2, // error during the (re)negotiation +} frescan_request_retval_t; + +typedef struct { + frescan_request_type_t type; + frescan_contract_t *contract; + frescan_ss_t ss; + frescan_node_t request_node; + frescan_request_id_t req; + frescan_request_retval_t return_value; + frescan_robj_id_t robj; +} frescan_request_data_t; + +/** + * frescan_requests_init() - initializes the requests + * + * This function must be called at initialization time, before the rest of + * functions of this module. + * + * @max_ceiling: the max priority of the threads using this module + */ + +extern int frescan_requests_init(int max_ceiling); + +/** + * frescan_requests_alloc() - allocates a request + */ + +extern int frescan_requests_alloc(frescan_request_id_t *req); + +/** + * frescan_requests_free() - frees a request + */ + +extern int frescan_requests_free(frescan_request_id_t req); + +/** + * frescan_requests_get_data() - gets the data of the request + * + * @data: the data is obtained as a pointer an manipulated directly + * accesing to the members of the structure. Note that this is + * just an internal API so instead of using get/set functions we + * will just access to the members of the struc directly + */ + +extern int frescan_requests_get_data(frescan_request_id_t req, + frescan_request_data_t **data); + +/** + * frescan_requests_enqueue() - enqueue a request + */ + +extern int frescan_requests_enqueue(frescan_request_id_t req); + +/** + * frescan_requests_dequeue() - dequeue a request + */ + +extern int frescan_requests_dequeue(frescan_request_id_t *req); + +#endif // _FRESCAN_BWRES_REQUESTS_H_ diff --git a/src_frescan/frescan_reply_objs.c b/src_frescan/frescan_bwres_robjs.c similarity index 91% rename from src_frescan/frescan_reply_objs.c rename to src_frescan/frescan_bwres_robjs.c index 5d8a991..01e051b 100644 --- a/src_frescan/frescan_reply_objs.c +++ b/src_frescan/frescan_bwres_robjs.c @@ -1,7 +1,7 @@ /*! - * @file frescan_reply_objs.h + * @file frescan_bwres_robjs.c * - * @brief FRESCAN reply objects + * @brief FRESCAN bandwith reservation layer: reply objects * * This module contains the definition of the data object and operations to * create a pool of objects, obtain the id of an unused object, wait upon it, @@ -16,7 +16,7 @@ */ #include -#include "frescan_reply_objs.h" +#include "frescan_bwres_robjs.h" #include "fosa_mutexes_and_condvars.h" #include "frescan_config.h" #include "frescan_debug.h" @@ -47,7 +47,7 @@ static struct replyobj_t the_reply_objects[FRESCAN_MX_REPLY_OBJECTS]; static freelist_t freelist; /** - * frescan_replyobjects_init() + * frescan_bwres_robjs_init() * * Init the freelist and its mutex. The conditional variables are not * initialized here but when allocating a reply object. This can be more @@ -55,7 +55,7 @@ static freelist_t freelist; * **/ -int frescan_replyobjects_init(int max_ceiling) +int frescan_bwres_robjs_init(int max_ceiling) { int err; @@ -70,7 +70,7 @@ int frescan_replyobjects_init(int max_ceiling) } /** - * frescan_replyobject_alloc() + * frescan_bwres_robjs_alloc() * * Allocate an reply object with the freelist_mutex locked and then initialize * its cond variable, condition (predicate) and mutex. The ID of the allocated @@ -78,7 +78,7 @@ int frescan_replyobjects_init(int max_ceiling) * **/ -int frescan_replyobject_alloc(frescan_robj_id_t *id, int ceiling) +int frescan_bwres_robjs_alloc(frescan_robj_id_t *id, int ceiling) { int err, pos; @@ -116,13 +116,13 @@ locked_error: } /** - * frescan_replyobject_free() + * frescan_bwres_robjs_free() * * Destroy the cond variable and then free the replyobject * **/ -int frescan_replyobject_free(frescan_robj_id_t id) +int frescan_bwres_robjs_free(frescan_robj_id_t id) { int err; @@ -156,13 +156,13 @@ locked_error: } /** - * frescan_replyobject_signal() + * frescan_bwres_robjs_signal() * * Signal the cond variable * **/ -int frescan_replyobject_signal(frescan_robj_id_t id) +int frescan_bwres_robjs_signal(frescan_robj_id_t id) { int err; @@ -194,13 +194,13 @@ locked_error: } /** - * frescan_replyobject_wait() + * frescan_bwres_robjs_wait() * * Wait on the cond variable. * **/ -int frescan_replyobject_wait(frescan_robj_id_t id) +int frescan_bwres_robjs_wait(frescan_robj_id_t id) { int err; @@ -228,13 +228,13 @@ locked_error: } /** - * frescan_replyobject_timedwait() + * frescan_bwres_robjs_timedwait() * * Wait on the cond variable with a timeout. * **/ -int frescan_replyobject_timedwait(frescan_robj_id_t id, +int frescan_bwres_robjs_timedwait(frescan_robj_id_t id, const struct timespec *abstime) { int err; diff --git a/src_frescan/frescan_reply_objs.h b/src_frescan/frescan_bwres_robjs.h similarity index 52% rename from src_frescan/frescan_reply_objs.h rename to src_frescan/frescan_bwres_robjs.h index 07bcdcd..687be55 100644 --- a/src_frescan/frescan_reply_objs.h +++ b/src_frescan/frescan_bwres_robjs.h @@ -1,7 +1,7 @@ /*! - * @file frescan_reply_objs.h + * @file frescan_bwres_robjs.h * - * @brief FRESCAN reply objects + * @brief FRESCAN bandwith reservation layer: reply objects * * This module contains the definition of the data object and operations to * create a pool of objects, obtain the id of an unused object, wait upon it, @@ -15,8 +15,8 @@ * */ -#ifndef _FRESCAN_REPLY_OBJECTS_H_ -#define _FRESCAN_REPLY_OBJECTS_H_ +#ifndef _FRESCAN_BWRES_ROBJS_H_ +#define _FRESCAN_BWRES_ROBJS_H_ #include /* for timespec */ #include "fosa_opaque_types.h" /* for FOSA_ETIMEDOUT */ @@ -24,12 +24,12 @@ typedef unsigned int frescan_robj_id_t; /* 0 .. MX_REPLY_OBJECTS-1 */ #define FRESCAN_ETIMEDOUT FOSA_ETIMEDOUT -extern int frescan_replyobjects_init(int max_ceiling); -extern int frescan_replyobject_alloc(frescan_robj_id_t *id, int ceiling); -extern int frescan_replyobject_free(frescan_robj_id_t id); -extern int frescan_replyobject_signal(frescan_robj_id_t id); -extern int frescan_replyobject_wait(frescan_robj_id_t id); -extern int frescan_replyobject_timedwait(frescan_robj_id_t id, +extern int frescan_bwres_robjs_init(int max_ceiling); +extern int frescan_bwres_robjs_alloc(frescan_robj_id_t *id, int ceiling); +extern int frescan_bwres_robjs_free(frescan_robj_id_t id); +extern int frescan_bwres_robjs_signal(frescan_robj_id_t id); +extern int frescan_bwres_robjs_wait(frescan_robj_id_t id); +extern int frescan_bwres_robjs_timedwait(frescan_robj_id_t id, const struct timespec *abstime); -#endif // _FRESCAN_REPLY_OBJECTS_H_ +#endif // _FRESCAN_BWRES_ROBJS_H_ diff --git a/src_frescan/frescan_bwres_threads.c b/src_frescan/frescan_bwres_threads.c index da7623e..9d7aefe 100644 --- a/src_frescan/frescan_bwres_threads.c +++ b/src_frescan/frescan_bwres_threads.c @@ -17,21 +17,24 @@ #include #include "fosa_threads_and_signals.h" // fosa_thread_attr_init... #include "frescan_bwres_threads.h" +#include "frescan_bwres_messages.h" +#include "frescan_bwres_requests.h" #include "frescan_config.h" #include "frescan_debug.h" #include "frescan_data.h" -#include "frescan_bwres_messages.h" -#include "frescan_requests.h" #include "frescan_servers.h" +static void *frescan_manager_thread(void *arg); static void *frescan_acceptor_thread(void *arg); -static void *frescan_master_neg_thread(void *arg); /** - * frescan_acceptor_thread_create() + * frescan_manager_thread_create() + * + * This call creates the manager thread at each node which will be waiting + * in a request queue for LOCAL or EXTERNAL requests. */ -int frescan_acceptor_thread_create(frescan_network_t net) +int frescan_manager_thread_create(frescan_network_t net) { int ret; fosa_thread_attr_t attr; @@ -42,16 +45,16 @@ int frescan_acceptor_thread_create(frescan_network_t net) return ret; } - ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO); + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO); if (ret != 0) { - ERROR("could not set acceptor thread prio %d\n", - FRESCAN_ACCEPTOR_THREAD_PRIO); + ERROR("could not set neg thread prio %d\n", + FRESCAN_NEG_THREAD_PRIO); return ret; } - ret = fosa_thread_create(&the_networks[net].acceptor_thread_id, + ret = fosa_thread_create(&the_networks[net].neg_thread_id, &attr, - frescan_acceptor_thread, + frescan_master_neg_thread, (void *)(uint32_t)net); if (ret != 0) { @@ -69,10 +72,10 @@ int frescan_acceptor_thread_create(frescan_network_t net) } /** - * frescan_master_neg_thread_create() + * frescan_acceptor_thread_create() */ -int frescan_master_neg_thread_create(frescan_network_t net) +int frescan_acceptor_thread_create(frescan_network_t net) { int ret; fosa_thread_attr_t attr; @@ -83,16 +86,16 @@ int frescan_master_neg_thread_create(frescan_network_t net) return ret; } - ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO); + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO); if (ret != 0) { - ERROR("could not set neg thread prio %d\n", - FRESCAN_NEG_THREAD_PRIO); + ERROR("could not set acceptor thread prio %d\n", + FRESCAN_ACCEPTOR_THREAD_PRIO); return ret; } - ret = fosa_thread_create(&the_networks[net].neg_thread_id, + ret = fosa_thread_create(&the_networks[net].acceptor_thread_id, &attr, - frescan_master_neg_thread, + frescan_acceptor_thread, (void *)(uint32_t)net); if (ret != 0) { @@ -109,10 +112,80 @@ int frescan_master_neg_thread_create(frescan_network_t net) return 0; } +/** + * frescan_manager_thread + */ + +static void *frescan_manager_thread(void *arg) +{ + int err; + frescan_request_id_t request; + frescan_req_type_t type; + frescan_robj_id_t reply; + frescan_contract_t *contract; + frescan_neg_return_info_t *neg_return_info; + frescan_server_params_t server_params; + frescan_network_t net = (uint32_t)arg; + + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n"); + + while(1) { + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n"); + + err = frescan_requestqueue_dequeue(&request); + assert(err == 0); + + err = frescan_request_get_type(request, &type); + assert(err == 0); + + switch(type) { + case FRESCAN_NEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_NEGOTIATE request\n"); + err = frescan_request_get_contract(request, &contract); + assert(err == 0); + err = frescan_request_get_reply(request, &reply); + assert(err == 0); + err = frescan_request_get_return_info + (request, (void *)&neg_return_info); + assert(err == 0); + + // TODO: sched test + add contract to table + // so far always accepted witht he min values + neg_return_info->error = 0; + server_params.values = contract->min_values; + server_params.prio = contract->prio; + + err = frescan_servers_create + (net, + &server_params, + &neg_return_info->id); + assert(err == 0); + + err = frescan_replyobject_signal(reply); + assert(err == 0); + break; + + case FRESCAN_RENEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_RENEGOTIATE request\n"); + break; + + case FRESCAN_CANCEL: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_CANCEL request\n"); + break; + + default: + ERROR("wrong request type %d\n", type); + } + } + + return NULL; +} + /** * frescan_acceptor_thread() - * - * a loop waiting for messages on the network and parsing them. */ static void *frescan_acceptor_thread(void *arg) @@ -150,77 +223,3 @@ static void *frescan_acceptor_thread(void *arg) return NULL; } - -/** - * frescan_master_neg_thread - * - * the thread that negotiates LOCAL contracts in the MASTER node. - */ - -static void *frescan_master_neg_thread(void *arg) -{ - int err; - frescan_request_id_t request; - frescan_req_type_t type; - frescan_robj_id_t reply; - frescan_contract_t *contract; - frescan_neg_return_info_t *neg_return_info; - frescan_server_params_t server_params; - frescan_network_t net = (uint32_t)arg; - - DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n"); - - while(1) { - DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n"); - - err = frescan_requestqueue_dequeue(&request); - assert(err == 0); - - err = frescan_request_get_type(request, &type); - assert(err == 0); - - switch(type) { - case FRESCAN_NEGOTIATE: - DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, - "FRESCAN_NEGOTIATE request\n"); - err = frescan_request_get_contract(request, &contract); - assert(err == 0); - err = frescan_request_get_reply(request, &reply); - assert(err == 0); - err = frescan_request_get_return_info - (request, (void *)&neg_return_info); - assert(err == 0); - - // TODO: sched test + add contract to table - // so far always accepted witht he min values - neg_return_info->error = 0; - server_params.values = contract->min_values; - server_params.prio = contract->prio; - - err = frescan_servers_create - (net, - &server_params, - &neg_return_info->id); - assert(err == 0); - - err = frescan_replyobject_signal(reply); - assert(err == 0); - break; - - case FRESCAN_RENEGOTIATE: - DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, - "FRESCAN_RENEGOTIATE request\n"); - break; - - case FRESCAN_CANCEL: - DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, - "FRESCAN_CANCEL request\n"); - break; - - default: - ERROR("wrong request type %d\n", type); - } - } - - return NULL; -} diff --git a/src_frescan/frescan_bwres_threads.h b/src_frescan/frescan_bwres_threads.h index c3e78de..565314d 100644 --- a/src_frescan/frescan_bwres_threads.h +++ b/src_frescan/frescan_bwres_threads.h @@ -14,33 +14,27 @@ * */ -#ifndef _FRESCAN_NEGOTIATION_THREADS_H_ -#define _FRESCAN_NEGOTIATION_THREADS_H_ +#ifndef _FRESCAN_BWRES_THREADS_H_ +#define _FRESCAN_BWRES_THREADS_H_ #include "frescan.h" /** - * frescan_master_neg_thread_create() - * - * This call creates the thread in charge of LOCAL negotiations at the - * MASTER node, so in the rest of nodes it doesnt have to be called at - * initialization. This thread will await in a local request queue for - * LOCAL negotiation requests from threads in the same CPU. - * In the case of SLAVE nodes, the negotiation requests are simply performed - * by sending an appropiate message to the MASTER node and then awaiting - * in a reply object until an acceptor thread signals it. + * frescan_manager_thread_create() + * + * This call creates the manager thread at each node which will be waiting + * in a request queue for LOCAL or EXTERNAL requests. */ -extern int frescan_master_neg_thread_create(frescan_network_t net); +extern int frescan_manager_thread_create(frescan_network_t net); /** * frescan_acceptor_thread_create() * - * This call is called in every node (including the MASTER node) conforming - * a set of threads, one at each node, that awaits negotiation messages - * from a receive endpoint and perform the corresponding operations. + * This call creates the acceptor thread which will be waiting negotiation + * messages from the network and converting them into requests. */ extern int frescan_acceptor_thread_create(frescan_network_t net); -#endif // _FRESCAN_NEGOTIATION_THREADS_H_ +#endif // _FRESCAN_BWRES_THREADS_H_ diff --git a/src_frescan/frescan_data.h b/src_frescan/frescan_data.h index a6e150a..88ca6c3 100644 --- a/src_frescan/frescan_data.h +++ b/src_frescan/frescan_data.h @@ -114,15 +114,6 @@ typedef struct { frescan_prio_t prio; } frescan_contract_t; -/** - * return info - */ - -typedef struct { - int error; - frescan_ss_t id; -} frescan_neg_return_info_t; - /** * frescan_prio_queue_t - priority queue * diff --git a/src_frescan/frescan_requests.h b/src_frescan/frescan_requests.h deleted file mode 100644 index 7bbb6b5..0000000 --- a/src_frescan/frescan_requests.h +++ /dev/null @@ -1,76 +0,0 @@ -/*! - * @file frescan_requests.h - * - * @brief FRESCAN requests - * - * This module contains an operation to create the queue, an operation to - * enqueue a message (with a request), and an operation to - * dequeue a message. - * - * @version 0.01 - * - * @date 1-Apr-2008 - * - * @author Daniel Sangorrin - * - */ - -#ifndef _FRESCAN_REQUESTS_QUEUE_H_ -#define _FRESCAN_REQUESTS_QUEUE_H_ - -#include -#include "frescan_data.h" // frescan_contract_t -#include "frescan_reply_objs.h" // frescan_robj_id_t - -typedef uint16_t frescan_request_id_t; /* 0 .. MX_REQUESTS */ - -typedef enum { - FRESCAN_NEGOTIATE = 0, // Negotiate a contract - FRESCAN_RENEGOTIATE = 1, // Renegotiate a contract - FRESCAN_CANCEL = 2, // Cancel a contract - } frescan_req_type_t; - -extern int frescan_requests_init(int max_ceiling); - -extern int frescan_request_alloc(frescan_request_id_t *id); - -extern int frescan_request_free(frescan_request_id_t id); - -extern int frescan_request_set_type(frescan_request_id_t id, - frescan_req_type_t type); - -extern int frescan_request_set_reply(frescan_request_id_t id, - frescan_robj_id_t reply); - -extern int frescan_request_set_contract(frescan_request_id_t id, - const frescan_contract_t *contract); - -extern int frescan_request_set_ss(frescan_request_id_t id, - frescan_ss_t ss); - -extern int frescan_request_set_src(frescan_request_id_t id, - frescan_node_t src); - -extern int frescan_request_set_return_info(frescan_request_id_t id, - void *return_info); - -extern int frescan_request_get_type(frescan_request_id_t id, - frescan_req_type_t *type); - -extern int frescan_request_get_reply(frescan_request_id_t id, - frescan_robj_id_t *reply); - -extern int frescan_request_get_contract(frescan_request_id_t id, - frescan_contract_t **contract); - -extern int frescan_request_get_src(frescan_request_id_t id, - frescan_node_t *src); - -extern int frescan_request_get_return_info(frescan_request_id_t id, - void **return_info); - -extern int frescan_requestqueue_enqueue(frescan_request_id_t id); - -extern int frescan_requestqueue_dequeue(frescan_request_id_t *id); - -#endif // _FRESCAN_REQUESTS_QUEUE_H_ -- 2.39.2