ret = frescan_requests_init(FRESCAN_REQUESTS_MX_CEILING);
if (ret != 0) return ret;
+ ret = frescan_messages_init(net);
+ if (ret != 0) return ret;
+
ret = frescan_manager_thread_create(net);
if (ret != 0) return ret;
req_data->req = req;
req_data->contract = (frescan_contract_t *)contract;
req_data->request_node = the_networks[net].local_node;
+ req_data->net = net;
ret = frescan_bwres_robjs_alloc(&req_data->robj, FRESCAN_BWRES_MX_PRIO);
if (ret != 0) return ret;
#include "frescan_config.h"
#include "frescan_debug.h"
+/**
+ * frescan_messages_init()
+ */
+
+static frescan_send_params_t send_params[FRESCAN_MX_NETWORKS];
+static frescan_recv_params_t recv_params[FRESCAN_MX_NETWORKS];
+
+int frescan_messages_init(frescan_network_t net)
+{
+ DEBUG(FRESCAN_MESSAGES_ENABLE_DEBUG, "initialization\n");
+
+ send_params[net].net = net;
+ send_params[net].channel = FRESCAN_NEG_CHANNEL;
+ send_params[net].flags = FRESCAN_SS | FRESCAN_ASYNC;
+ send_params[net].ss = the_networks[net].neg_messages_ss_id;
+ send_params[net].to = FRESCAN_NEG_MASTER_NODE;
+
+ recv_params[net].net = net;
+ recv_params[net].channel = FRESCAN_NEG_CHANNEL;
+ recv_params[net].flags = FRESCAN_SYNC;
+
+ return 0;
+}
+
/**
*
* FRESCAN_REQ_NEG MESSAGE
neg_msg->ss = data->ss;
neg_msg->contract = *(data->contract);
- return 0;
+ return sizeof(struct frescan_req_neg_message_t);
}
static int frescan_neg_message_to_request(const uint8_t *msg,
reneg_msg->ss = data->ss;
reneg_msg->contract = *(data->contract);
- return 0;
+ return sizeof(struct frescan_req_reneg_message_t);
}
static int frescan_reneg_message_to_request(const uint8_t *msg,
cancel_msg->type = FRESCAN_REQ_CANCEL;
cancel_msg->ss = data->ss;
- return 0;
+ return sizeof(struct frescan_req_cancel_message_t);
}
static int frescan_cancel_message_to_request(const uint8_t *msg,
repneg_msg->req = data->req;
repneg_msg->return_value = data->return_value;
- return 0;
+ return sizeof(struct frescan_rep_neg_message_t);
}
static int frescan_repneg_message_to_request(const uint8_t *msg,
}
/**
- * frescan_request_to_message() - converts a request into a network message
+ * frescan_messages_send_request()
*
* this function converts a request with the necessary data into a message
- * that can be sent through the network.
+ * and sends it.
*
- * @req_data: the request data to fill the message bytes (in)
- * @msg: buffer with the bytes that will be sent to the network (out)
+ * @req_data: the request to be sent (NOTE: the network is in req_data)
*
*/
-int frescan_request_to_message(const frescan_request_data_t *req_data,
- uint8_t *msg)
+int frescan_messages_send_request(const frescan_request_data_t *req_data)
{
+ int ret;
+ uint8_t msg[2000]; // TODO: use a constant for max neg message size
+ size_t size;
+
switch(req_data->type) {
case FRESCAN_REQ_NEG:
- return frescan_request_to_neg_message(req_data, msg);
+ size = frescan_request_to_neg_message(req_data, msg);
+ break;
case FRESCAN_REQ_RENEG:
- return frescan_request_to_reneg_message(req_data, msg);
+ size = frescan_request_to_reneg_message(req_data, msg);
+ break;
case FRESCAN_REQ_CANCEL:
- return frescan_request_to_cancel_message(req_data, msg);
+ size = frescan_request_to_cancel_message(req_data, msg);
+ break;
case FRESCAN_REP_NEG:
- return frescan_request_to_repneg_message(req_data, msg);
+ size = frescan_request_to_repneg_message(req_data, msg);
+ break;
default:
ERROR("request type not supported\n");
return -1;
}
+
+ ret = frescan_send(&send_params[req_data->net], msg, size);
+ if (ret != 0) return ret;
+
+ return 0;
}
/**
- * frescan_message_to_request() - converts a network message into a request
+ * frescan_messages_recv_request()
*
- * this function is the opposite to the previous one. It will be used by
- * the acceptor threads to transform messages received from the network
- * into requests.
+ * this function BLOCKS the calling thread until receives a message
+ * and transforms it into a request.
*
- * @msg: buffer with the bytes received from the network (in)
* @req_data: the request data to fill from the message bytes (out)
*
*/
-int frescan_message_to_request(const uint8_t *msg,
- frescan_request_data_t *req_data)
+int frescan_messages_recv_request(frescan_network_t net,
+ frescan_request_id_t *req)
{
+ int ret;
+ uint8_t msg[2000]; // TODO: use a constant with the max neg message size
+ size_t recv_bytes;
+ frescan_node_t from;
+ frescan_prio_t prio;
+ frescan_request_data_t *req_data;
+
+ ret = frescan_requests_alloc(req);
+ if (ret != 0) return ret;
+
+ ret = frescan_requests_get_data(*req, &req_data);
+ if (ret != 0) return ret;
+
+ DEBUG(FRESCAN_MESSAGES_ENABLE_DEBUG,
+ "wait for a msg, net:%u chan:%u flags:%u\n",
+ net, recv_params[net].channel, recv_params[net].flags);
+
+ ret = frescan_recv(&recv_params[net],
+ msg,
+ sizeof(msg),
+ &recv_bytes,
+ &from,
+ &prio);
+ if (ret != 0) return ret;
+
+ DEBUG(FRESCAN_MESSAGES_ENABLE_DEBUG,
+ "msg received, from:%u size:%u prio:%u\n",
+ from, recv_bytes, prio);
+
+ req_data->request_node = from;
+ req_data->net = net;
+
switch(*((frescan_request_type_t *)msg)) {
case FRESCAN_REQ_NEG:
return frescan_neg_message_to_request(msg, req_data);
*
*/
-#ifndef _FRESCAN_NEGOTIATION_MESSAGES_H_
-#define _FRESCAN_NEGOTIATION_MESSAGES_H_
+#ifndef _FRESCAN_BWRES_MESSAGES_H_
+#define _FRESCAN_BWRES_MESSAGES_H_
#include <stdint.h>
#include "frescan_bwres_requests.h"
#include "frescan_data.h"
/**
- * frescan_request_to_message() - converts a request into a network message
+ * frescan_messages_init() - initialization function for the module
+ */
+
+extern int frescan_messages_init(frescan_network_t net);
+
+/**
+ * frescan_messages_send_request()
*
* this function converts a request with the necessary data into a message
- * that can be sent through the network.
+ * and sends it.
*
- * @req_data: the request data to fill the message bytes (in)
- * @msg: buffer with the bytes that will be sent to the network (out)
+ * @req_data: the request to be sent (NOTE: the network is in req_data)
*
*/
-extern int frescan_request_to_message(const frescan_request_data_t *req_data,
- uint8_t *msg);
+extern int frescan_messages_send_request(const frescan_request_data_t *req_data);
/**
- * frescan_message_to_request() - converts a network message into a request
+ * frescan_messages_recv_request()
*
- * this function is the opposite to the previous one. It will be used by
- * the acceptor threads to transform messages received from the network
- * into requests.
+ * this function BLOCKS the calling thread until receives a message
+ * and transforms it into a request.
*
- * @msg: buffer with the bytes received from the network (in)
- * @req_data: the request data to fill from the message bytes (out)
+ * @net: network from where we want to wait messages
+ * @req: the request received (it must be freed)
*
*/
-extern int frescan_message_to_request(const uint8_t *msg,
- frescan_request_data_t *req_data);
+extern int frescan_messages_recv_request(frescan_network_t net,
+ frescan_request_id_t *req);
-#endif // _FRESCAN_NEGOTIATION_MESSAGES_H_
+#endif // _FRESCAN_BWRES_MESSAGES_H_
* @request_node: the node that performed the request
* @req: the request id of the SLAVE to identify the request in the reply
* @return_value: the value returned in a Reply
+ * @net: the network instance where this request belongs to
* @robj: a reply object to wait until a negotiation is completed
*
*/
frescan_node_t request_node;
frescan_request_id_t req;
frescan_request_retval_t return_value;
+ frescan_network_t net;
frescan_robj_id_t robj;
} frescan_request_data_t;
{
int err, pos;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"allocating reply object, is_initialized=%d\n", is_initialized);
if (is_initialized == false) return -1;
err = fosa_mutex_init(&the_reply_objects[pos].mutex, ceiling);
if (err != 0) return err;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"reply object allocated, id=%d\n", *id);
return 0;
{
int err;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"free reply id=%d, is_initialized=%d\n", id, is_initialized);
if (is_initialized == false) return -1;
{
int err;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"is_initialized=%d\n", is_initialized);
if (is_initialized == false) return -1;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"taking mutex of the reply id=%d\n", id);
err = fosa_mutex_lock(&the_reply_objects[id].mutex);
if (err != 0) return err;
the_reply_objects[id].is_work_done = true;
- DEBUG(FRESCAN_REPLYOBJ_ENABLE_DEBUG,
+ DEBUG(FRESCAN_ROBJS_ENABLE_DEBUG,
"signal the cond variable\n");
err = fosa_cond_signal(&the_reply_objects[id].cond);
if (err != 0) goto locked_error;
* frescan_manager_thread
*/
-static int frescan_manager_neg(frescan_request_data_t *req_data);
-static int frescan_manager_reneg(frescan_request_data_t *req_data);
-static int frescan_manager_cancel(frescan_request_data_t *req_data);
-static int frescan_manager_repneg(frescan_request_data_t *req_data);
+static void frescan_manager_neg(frescan_request_data_t *req_data);
+static void frescan_manager_reneg(frescan_request_data_t *req_data);
+static void frescan_manager_cancel(frescan_request_data_t *req_data);
+static void frescan_manager_repneg(frescan_request_data_t *req_data);
static void *frescan_manager_thread(void *arg)
{
switch(req_data->type) {
case FRESCAN_REQ_NEG:
- ret = frescan_manager_neg(req_data);
- assert(ret == 0);
+ frescan_manager_neg(req_data);
break;
case FRESCAN_REQ_RENEG:
- ret = frescan_manager_reneg(req_data);
- assert(ret == 0);
+ frescan_manager_reneg(req_data);
break;
case FRESCAN_REQ_CANCEL:
- ret = frescan_manager_cancel(req_data);
- assert(ret == 0);
+ frescan_manager_cancel(req_data);
break;
case FRESCAN_REP_NEG:
- ret = frescan_manager_repneg(req_data);
- assert(ret == 0);
+ frescan_manager_repneg(req_data);
break;
default:
ERROR("request type not supported\n");
static void *frescan_acceptor_thread(void *arg)
{
int ret;
- frescan_recv_params_t recv_params;
- uint8_t msg[2000]; // TODO: use a constant with the max neg message size
- size_t recv_bytes;
- frescan_node_t from;
- frescan_prio_t prio;
frescan_request_id_t req;
- frescan_request_data_t *req_data;
-
- DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG, "acceptor th starts\n");
+ frescan_network_t net = (uint32_t)arg;
- recv_params.net = (frescan_network_t)(uint32_t)arg;
- recv_params.channel = FRESCAN_NEG_CHANNEL;
- recv_params.flags = FRESCAN_SYNC;
+ DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
while(1) {
- DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
- "waiting for a msg, net:%u chan:%u flags:%u\n",
- recv_params.net, recv_params.channel, recv_params.flags);
-
- ret = frescan_recv(&recv_params,
- msg,
- sizeof(msg),
- &recv_bytes,
- &from,
- &prio);
- assert(ret == 0);
-
- DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
- "msg received, from:%u size:%u prio:%u\n",
- from, recv_bytes, prio);
-
- ret = frescan_requests_alloc(&req);
+ ret = frescan_messages_recv_request(net, &req);
assert(ret == 0);
- ret = frescan_requests_get_data(req, &req_data);
- assert(ret == 0);
-
- ret = frescan_message_to_request(msg, req_data);
- assert(ret == 0);
-
- req_data->request_node = from;
-
ret = frescan_requests_enqueue(req);
assert(ret == 0);
}
* frescan_manager_neg
*/
-static int frescan_manager_neg(frescan_request_data_t *req_data)
+static void frescan_manager_neg(frescan_request_data_t *req_data)
{
+ int ret;
+ bool accepted;
+
DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
- return 0;
+
+ if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
+
+ accepted = true; // TODO: sched analysis
+ ret = 0; // TODO: sched analysis
+
+ if (accepted) {
+ req_data->return_value = FRESCAN_REQ_ACCEPTED;
+ } else {
+ req_data->return_value = (ret == 0) ?
+ FRESCAN_REQ_NOT_ACCEPTED :
+ FRESCAN_REQ_ERROR;
+ }
+
+ if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "local\n");
+ ret = frescan_bwres_robjs_signal(req_data->robj);
+ assert(ret == 0);
+ } else {
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "external\n");
+ req_data->type = FRESCAN_REP_NEG;
+
+ ret = frescan_messages_send_request(req_data);
+ assert(ret == 0);
+ }
+ } else {
+ ret = frescan_messages_send_request(req_data);
+ assert(ret == 0);
+ }
}
/**
* frescan_manager_neg
*/
-static int frescan_manager_reneg(frescan_request_data_t *req_data)
+static void frescan_manager_reneg(frescan_request_data_t *req_data)
{
DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
- return 0;
}
/**
* frescan_manager_neg
*/
-static int frescan_manager_cancel(frescan_request_data_t *req_data)
+static void frescan_manager_cancel(frescan_request_data_t *req_data)
{
DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
- return 0;
}
/**
* frescan_manager_neg
*/
-static int frescan_manager_repneg(frescan_request_data_t *req_data)
+static void frescan_manager_repneg(frescan_request_data_t *req_data)
{
DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
- return 0;
}
* DEBUGGING FLAGS to enable/disable debugging messages
**/
-#define FRESCAN_SERVERS_ENABLE_DEBUG false
+#define FRESCAN_SERVERS_ENABLE_DEBUG false
#define FRESCAN_PACKETPOOL_ENABLE_DEBUG false
-#define FRESCAN_FRAG_ENABLE_DEBUG false
-#define FRESCAN_INIT_ENABLE_DEBUG false
-#define FRESCAN_SEND_ENABLE_DEBUG false
-#define FRESCAN_RECV_ENABLE_DEBUG false
-#define FRESCAN_RX_HOOK_ENABLE_DEBUG false
-#define FRESCAN_SENT_HOOK_ENABLE_DEBUG false
-#define FRESCAN_QUEUES_ENABLE_DEBUG false
-#define FRESCAN_HW_BUFFER_ENABLE_DEBUG false
-#define FRESCAN_REPL_ENABLE_DEBUG false
-#define FRESCAN_REPLYOBJ_ENABLE_DEBUG false
-#define FRESCAN_BWRES_ENABLE_DEBUG false
-#define FRESCAN_REQUESTS_ENABLE_DEBUG false
-#define FRESCAN_MANAGER_ENABLE_DEBUG false
-#define FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG false
-#define FRESCAN_NEG_MESSAGES_ENABLE_DEBUG false
-#define FRESCAN_FNA_ENABLE_DEBUG false
+#define FRESCAN_FRAG_ENABLE_DEBUG false
+#define FRESCAN_INIT_ENABLE_DEBUG false
+#define FRESCAN_SEND_ENABLE_DEBUG false
+#define FRESCAN_RECV_ENABLE_DEBUG false
+#define FRESCAN_RX_HOOK_ENABLE_DEBUG false
+#define FRESCAN_SENT_HOOK_ENABLE_DEBUG false
+#define FRESCAN_QUEUES_ENABLE_DEBUG false
+#define FRESCAN_HW_BUFFER_ENABLE_DEBUG false
+#define FRESCAN_REPLENSH_ENABLE_DEBUG false
+#define FRESCAN_ROBJS_ENABLE_DEBUG false
+#define FRESCAN_BWRES_ENABLE_DEBUG true
+#define FRESCAN_REQUESTS_ENABLE_DEBUG false
+#define FRESCAN_MANAGER_ENABLE_DEBUG true
+#define FRESCAN_ACCEPTOR_ENABLE_DEBUG true
+#define FRESCAN_FNA_ENABLE_DEBUG false
+#define FRESCAN_MESSAGES_ENABLE_DEBUG false
#endif // _MARTE_FRESCAN_DEBUG_H_
if (siginfo.si_signo != FRESCAN_REPL_SIGNAL_NUM) continue;
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG,
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG,
"net:%u signal:%d code:%d value(server_id):%d\n",
net,
siginfo.si_signo, // FRESCAN_REPL_SIGNAL_NUM
id = siginfo.si_value.sival_int;
server = &the_servers_pool[net][id];
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG,
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG,
"id:%u, current_budget:%u, budget:%u, current_prio:%u\n",
id,
server->current_budget,
server->current_priority = server->params.prio;
}
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG,
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG,
"now... current_budget:%u, current_prio:%u\n",
server->current_budget,
server->current_priority);
timerdata.it_value = repl->when;
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG,
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG,
"set timer to %d sec, %d nsec\n",
repl->when.tv_sec, repl->when.tv_nsec);
server = &the_servers_pool[net][ss];
if (server->current_priority == FRESCAN_BACKGROUND_PRIO) {
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG, "ss in background\n");
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG, "ss in background\n");
return 0;
}
repl->amount = 1;
empty = list_empty(&server->replenishments.repl_list);
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG, "ss:%u, empty:%u\n", ss, empty);
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG, "ss:%u, empty:%u\n", ss, empty);
list_add_tail(&repl->repl_list,
&server->replenishments.repl_list);
timerdata.it_interval.tv_nsec = 0;
timerdata.it_value = repl->when;
- DEBUG(FRESCAN_REPL_ENABLE_DEBUG,
+ DEBUG(FRESCAN_REPLENSH_ENABLE_DEBUG,
"set timer to %d sec, %d nsec\n",
repl->when.tv_sec, repl->when.tv_nsec);