From 113fb5b90ebd5be03997740e0ab92d40d06ee4dd Mon Sep 17 00:00:00 2001 From: sangorrin Date: Wed, 16 Apr 2008 15:27:03 +0000 Subject: [PATCH] i break the svn for one day to change the name of files, redo the negotiation messages system and implement renegotiations and canceling of contracts git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1116 35b4ef3e-fd22-0410-ab77-dab3279adceb --- src_frescan/frescan_bandwidth_reservation.c | 156 +++++++++++++++++++- src_frescan/frescan_data.h | 2 + src_frescan/frescan_negotiation_messages.c | 38 +++++ src_frescan/frescan_negotiation_messages.h | 15 +- src_frescan/frescan_queues.c | 19 ++- src_frescan/frescan_requests_queue.c | 13 ++ src_frescan/frescan_requests_queue.h | 3 + 7 files changed, 226 insertions(+), 20 deletions(-) diff --git a/src_frescan/frescan_bandwidth_reservation.c b/src_frescan/frescan_bandwidth_reservation.c index b77f8c5..d112202 100644 --- a/src_frescan/frescan_bandwidth_reservation.c +++ b/src_frescan/frescan_bandwidth_reservation.c @@ -79,8 +79,8 @@ int frescan_bwres_init(frescan_network_t net) * frescan_bwres_negotiate() * * negotiate a contract. For that we allocate a reply object and then - * we enqueue our request in the master's requests queue (which can be - * local or require a network message) + * we enqueue our request in the master's requests queue (if we are in + * the master node) or send it to the master through the network. */ int frescan_bwres_negotiate(frescan_network_t net, @@ -163,8 +163,10 @@ int frescan_bwres_negotiate(frescan_network_t net, params.to = FRESCAN_NEG_MASTER_NODE; params.channel = FRESCAN_NEG_CHANNEL; -// params.flags = FRESCAN_FP | FRESCAN_ASYNC; -// params.prio = 8; + // NOTE: if we sent the negotiation msgs with fp: + // params.flags = FRESCAN_FP | FRESCAN_ASYNC; + // params.prio = 8; + params.flags = FRESCAN_SS | FRESCAN_ASYNC; params.ss = the_networks[net].neg_messages_ss_id; @@ -213,3 +215,149 @@ int frescan_bwres_negotiate(frescan_network_t net, return return_info.error; } + +/** + * frescan_bwres_renegotiate() + * + * renegotiate a contract. For that we allocate a reply object and then + * we enqueue our request in the master's requests queue (if we are in + * the master node) or send it to the master through the network. + */ + +int frescan_bwres_renegotiate(frescan_network_t net, + const frescan_contract_t *contract, + frescan_ss_t id) +{ + int ret; + frescan_robj_id_t reply; + frescan_request_id_t request; + frescan_neg_return_info_t return_info; + uint8_t msg[200]; + int size; + frescan_send_params_t params; + + ret = frescan_replyobject_alloc(&reply, FRESCAN_BWRES_MX_PRIO); + if (ret != 0) { + ERROR("could not allocate reply object\n"); + return ret; + } + + ret = frescan_request_alloc(&request); + if (ret != 0) { + ERROR("could not allocate request\n"); + return ret; + } + + ret = frescan_request_set_return_info(request, + (void *) &return_info); + if (ret != 0) { + ERROR("could not set return_info pointer\n"); + return ret; + } + + ret = frescan_request_set_reply(request, reply); + if (ret != 0) { + ERROR("could not set reply\n"); + return ret; + } + + ret = frescan_request_set_ss(request, id); + if (ret != 0) { + ERROR("could not set server id\n"); + return ret; + } + + if (the_networks[net].local_node == FRESCAN_NEG_MASTER_NODE) { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "I am master, renegotiation in local node\n"); + + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "set FRESCAN_RENEGOTIATE type: %d\n", + FRESCAN_RENEGOTIATE); + + ret = frescan_request_set_type(request, FRESCAN_RENEGOTIATE); + if (ret != 0) { + ERROR("could not set type\n"); + return ret; + } + + ret = frescan_request_set_contract(request, contract); + if (ret != 0) { + ERROR("could not set contract\n"); + return ret; + } + + ret = frescan_request_set_src(request, FRESCAN_NEG_MASTER_NODE); + if (ret != 0) { + ERROR("could not set src\n"); + return ret; + } + + ret = frescan_requestqueue_enqueue(request); + if (ret != 0) { + ERROR("could not enqueue the request\n"); + return ret; + } + } else { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "I am slave, renegotiation in master node\n"); + + size = frescan_reneg_message_create(msg, request, contract); + + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "created a reneg message, size: %d\n", size); + + params.net = net; + params.to = FRESCAN_NEG_MASTER_NODE; + params.channel = FRESCAN_NEG_CHANNEL; + + // NOTE: if we sent the negotiation msgs with fp: + // params.flags = FRESCAN_FP | FRESCAN_ASYNC; + // params.prio = 8; + + params.flags = FRESCAN_SS | FRESCAN_ASYNC; + params.ss = the_networks[net].neg_messages_ss_id; + + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "send msg to master, net:%u to:%u ss:%u\n", + params.net, params.to, params.ss); + + ret = frescan_send(¶ms, msg, size); + if (ret != 0) { + ERROR("error while sending neg request to master\n"); + return ret; + } + } + + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, "wait on reply object\n"); + + ret = frescan_replyobject_wait(reply); + if (ret != 0) { + ERROR("error while waiting on the reply object\n"); + return ret; + } + + ret = frescan_replyobject_free(reply); + if (ret != 0) { + ERROR("could not free reply object\n"); + return ret; + } + + if (return_info.error) { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "renegotiation was not accepted, error:%d\n", + return_info.error); + } else { + DEBUG(FRESCAN_BWRES_ENABLE_DEBUG, + "renegotiation finished succesfully for ss:%d\n", + id); + } + + ret = frescan_request_free(request); + if (ret != 0) { + ERROR("could not free request\n"); + return ret; + } + + return return_info.error; +} diff --git a/src_frescan/frescan_data.h b/src_frescan/frescan_data.h index dbae6cc..a6e150a 100644 --- a/src_frescan/frescan_data.h +++ b/src_frescan/frescan_data.h @@ -134,6 +134,7 @@ typedef struct { * synchronization is done using a semaphore. This is because the queues * are accesed concurrently from user threads and the IRQ handler. * + * @net: the network this priority queue belongs to (mainly for locking) * @fifo_queues: an array of packets for each priority where each packet * is just the head of a fifo_list. The array is allocated * from the heap, using malloc, at initialization with range @@ -143,6 +144,7 @@ typedef struct { */ typedef struct { + frescan_network_t net; frescan_packet_t *fifo_queues; uint32_t max_prio; sem_t sem; diff --git a/src_frescan/frescan_negotiation_messages.c b/src_frescan/frescan_negotiation_messages.c index 1bc8a00..5d22a33 100644 --- a/src_frescan/frescan_negotiation_messages.c +++ b/src_frescan/frescan_negotiation_messages.c @@ -147,6 +147,44 @@ int frescan_repneg_message_create(uint8_t *msg, return (int)bytes_written; } +int frescan_reneg_message_create(uint8_t *msg, + frescan_request_id_t id, + const frescan_contract_t *contract) +{ + size_t num, bytes_written; + uint8_t *tmp = msg; + + DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, + "creating a renegotiation request message\n"); + + *tmp = (uint8_t)FRESCAN_MSG_TYPE_RENEG; + tmp++; + DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, + "type: %d (1 byte)\n", FRESCAN_MSG_TYPE_RENEG); + + num = 2; + memcpy(tmp, &id, num); + tmp = tmp + num; + DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, + "request id: %d (2 bytes)\n", id); + + num = sizeof(frescan_contract_t); + memcpy(tmp, contract, num); + tmp = tmp + num; + + DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, + "contract (%d bytes)\n", num); + + // TODO: ADD server id? or look up the label?? + + bytes_written = tmp - msg; + + DEBUG(FRESCAN_NEG_MESSAGES_ENABLE_DEBUG, + "total bytes_written: %d\n", bytes_written); + + return (int)bytes_written; +} + int frescan_message_parse(frescan_network_t net, const uint8_t *msg, size_t size, diff --git a/src_frescan/frescan_negotiation_messages.h b/src_frescan/frescan_negotiation_messages.h index 886da0d..d869f68 100644 --- a/src_frescan/frescan_negotiation_messages.h +++ b/src_frescan/frescan_negotiation_messages.h @@ -21,18 +21,13 @@ #include "frescan_requests_queue.h" #include "frescan_data.h" -extern int frescan_neg_message_create(uint8_t *msg, +extern int frescan_request_to_message(uint8_t *msg, frescan_request_id_t id, const frescan_contract_t *contract); -extern int frescan_repneg_message_create(uint8_t *msg, - frescan_request_id_t id, - int accepted, - frescan_server_params_t *params); - -extern int frescan_message_parse(frescan_network_t net, - const uint8_t *msg, - size_t size, - frescan_node_t from); +extern int frescan_message_to_request(frescan_network_t net, + const uint8_t *msg, + size_t size, + frescan_node_t from); #endif // _FRESCAN_NEGOTIATION_MESSAGES_H_ diff --git a/src_frescan/frescan_queues.c b/src_frescan/frescan_queues.c index 6d1d150..e00fd83 100644 --- a/src_frescan/frescan_queues.c +++ b/src_frescan/frescan_queues.c @@ -34,7 +34,8 @@ * frescan_pqueue_create() - creates a priority queue */ -static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio) +static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio, + frescan_network_t net) { int ret, prio; frescan_prio_queue_t *pq; // priority queue @@ -46,6 +47,7 @@ static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio) } pq->max_prio = max_prio; + pq->net = net; ret = sem_init (&pq->sem, 0, 0); if (ret != 0) { @@ -80,7 +82,8 @@ int frescan_queues_init(frescan_queues_t *queues, uint32_t max_prio; // create transmission fixed priority queue - queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio); + queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio, + params->net); if (queues->tx_fp_queue == NULL) { ERROR("could not allocate memory for tx fp queue\n"); @@ -108,7 +111,8 @@ int frescan_queues_init(frescan_queues_t *queues, max_prio = params->rx_channel_max_prio[i]; } - queues->rx_channel_queues[i] = frescan_pqueue_create(max_prio); + queues->rx_channel_queues[i] = frescan_pqueue_create + (max_prio, params->net); if (queues->rx_channel_queues[i] == NULL) { ERROR("could not allocate memory for rx pq %d\n", i); @@ -203,8 +207,11 @@ int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue, DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio); - // TODO: the lock is currently hardwired to network 0!!! - FRESCAN_ACQUIRE_LOCK(&the_networks[0].lock); + // NOTE: we only acquire the lock if we block because non-blocking + // calls are made from a context with no interrupts (when updating + // the buffer at 'frescan_hw_buffer_update' which is always called + // with interrupts disabled) + if (blocking) FRESCAN_ACQUIRE_LOCK(&the_networks[pqueue->net].lock); for(prio=pqueue->max_prio-1; prio >= 0; prio--) { if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) { @@ -220,7 +227,7 @@ int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue, } } - FRESCAN_RELEASE_LOCK(&the_networks[0].lock); + if (blocking) FRESCAN_RELEASE_LOCK(&the_networks[pqueue->net].lock); DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio); *packet_prio = prio; diff --git a/src_frescan/frescan_requests_queue.c b/src_frescan/frescan_requests_queue.c index 0d61ad5..d17bb69 100644 --- a/src_frescan/frescan_requests_queue.c +++ b/src_frescan/frescan_requests_queue.c @@ -36,6 +36,7 @@ struct request_t { frescan_req_type_t type; frescan_robj_id_t reply; frescan_contract_t *contract; + frescan_ss_t ss; frescan_node_t src; void *return_info; struct list_head request_list; @@ -171,6 +172,18 @@ int frescan_request_set_contract(frescan_request_id_t id, return 0; } +/** + * frescan_request_set_ss() + * + **/ + +int frescan_request_set_ss(frescan_request_id_t id, + frescan_ss_t ss) +{ + the_requests_pool[id].ss = ss; + return 0; +} + /** * frescan_request_set_src() * diff --git a/src_frescan/frescan_requests_queue.h b/src_frescan/frescan_requests_queue.h index 80970a4..3571201 100644 --- a/src_frescan/frescan_requests_queue.h +++ b/src_frescan/frescan_requests_queue.h @@ -45,6 +45,9 @@ extern int frescan_request_set_reply(frescan_request_id_t id, extern int frescan_request_set_contract(frescan_request_id_t id, const frescan_contract_t *contract); +extern int frescan_request_set_ss(frescan_request_id_t id, + frescan_ss_t ss); + extern int frescan_request_set_src(frescan_request_id_t id, frescan_node_t src); -- 2.39.2