#include "frescan_data.h"
#include "frescan_servers.h"
-static void *frescan_manager_thread(void *arg);
-static void *frescan_acceptor_thread(void *arg);
-
/**
* frescan_manager_thread_create()
*
* in a request queue for LOCAL or EXTERNAL requests.
*/
+static void *frescan_manager_thread(void *arg);
+
int frescan_manager_thread_create(frescan_network_t net)
{
int ret;
fosa_thread_attr_t attr;
ret = fosa_thread_attr_init(&attr);
- if (ret != 0) {
- ERROR("could not init thread attributes\n");
- return ret;
- }
+ if (ret != 0) return ret;
ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
- if (ret != 0) {
- ERROR("could not set neg thread prio %d\n",
- FRESCAN_NEG_THREAD_PRIO);
- return ret;
- }
+ if (ret != 0) return ret;
- ret = fosa_thread_create(&the_networks[net].neg_thread_id,
- &attr,
- frescan_master_neg_thread,
- (void *)(uint32_t)net);
-
- if (ret != 0) {
- ERROR("could not create the negotiator thread\n");
- return ret;
- }
+ ret = fosa_thread_create(&the_networks[net].manager_thread_id,
+ &attr,
+ frescan_manager_thread,
+ (void *)(uint32_t)net);
+ if (ret != 0) return ret;
ret = fosa_thread_attr_destroy(&attr);
- if (ret != 0) {
- ERROR("could not destroy thread attributes\n");
- return ret;
- }
+ if (ret != 0) return ret;
return 0;
}
/**
* frescan_acceptor_thread_create()
+ *
+ * This call creates the acceptor thread which will be waiting negotiation
+ * messages from the network and converting them into requests.
*/
+static void *frescan_acceptor_thread(void *arg);
+
int frescan_acceptor_thread_create(frescan_network_t net)
{
int ret;
fosa_thread_attr_t attr;
ret = fosa_thread_attr_init(&attr);
- if (ret != 0) {
- ERROR("could not init thread attributes\n");
- return ret;
- }
+ if (ret != 0) return ret;
ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
- if (ret != 0) {
- ERROR("could not set acceptor thread prio %d\n",
- FRESCAN_ACCEPTOR_THREAD_PRIO);
- return ret;
- }
+ if (ret != 0) return ret;
ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
- &attr,
- frescan_acceptor_thread,
- (void *)(uint32_t)net);
-
- if (ret != 0) {
- ERROR("could not create the negotiator thread\n");
- return ret;
- }
+ &attr,
+ frescan_acceptor_thread,
+ (void *)(uint32_t)net);
+ if (ret != 0) return ret;
ret = fosa_thread_attr_destroy(&attr);
- if (ret != 0) {
- ERROR("could not destroy thread attributes\n");
- return ret;
- }
+ if (ret != 0) return ret;
return 0;
}
* frescan_manager_thread
*/
+static int frescan_manager_neg(frescan_request_data_t *req_data);
+static int frescan_manager_reneg(frescan_request_data_t *req_data);
+static int frescan_manager_cancel(frescan_request_data_t *req_data);
+static int frescan_manager_repneg(frescan_request_data_t *req_data);
+
static void *frescan_manager_thread(void *arg)
{
- int err;
- frescan_request_id_t request;
- frescan_req_type_t type;
- frescan_robj_id_t reply;
- frescan_contract_t *contract;
- frescan_neg_return_info_t *neg_return_info;
- frescan_server_params_t server_params;
+ int ret;
+ frescan_request_id_t req;
+ frescan_request_data_t *req_data;
frescan_network_t net = (uint32_t)arg;
- DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
while(1) {
- DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
-
- err = frescan_requestqueue_dequeue(&request);
- assert(err == 0);
-
- err = frescan_request_get_type(request, &type);
- assert(err == 0);
-
- switch(type) {
- case FRESCAN_NEGOTIATE:
- DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
- "FRESCAN_NEGOTIATE request\n");
- err = frescan_request_get_contract(request, &contract);
- assert(err == 0);
- err = frescan_request_get_reply(request, &reply);
- assert(err == 0);
- err = frescan_request_get_return_info
- (request, (void *)&neg_return_info);
- assert(err == 0);
-
- // TODO: sched test + add contract to table
- // so far always accepted witht he min values
- neg_return_info->error = 0;
- server_params.values = contract->min_values;
- server_params.prio = contract->prio;
-
- err = frescan_servers_create
- (net,
- &server_params,
- &neg_return_info->id);
- assert(err == 0);
-
- err = frescan_replyobject_signal(reply);
- assert(err == 0);
- break;
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
- case FRESCAN_RENEGOTIATE:
- DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
- "FRESCAN_RENEGOTIATE request\n");
- break;
+ ret = frescan_requests_dequeue(&req);
+ assert(ret == 0);
- case FRESCAN_CANCEL:
- DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
- "FRESCAN_CANCEL request\n");
- break;
+ ret = frescan_requests_get_data(req, &req_data);
+ assert(ret == 0);
+ switch(req_data->type) {
+ case FRESCAN_REQ_NEG:
+ ret = frescan_manager_neg(req_data);
+ assert(ret == 0);
+ break;
+ case FRESCAN_REQ_RENEG:
+ ret = frescan_manager_reneg(req_data);
+ assert(ret == 0);
+ break;
+ case FRESCAN_REQ_CANCEL:
+ ret = frescan_manager_cancel(req_data);
+ assert(ret == 0);
+ break;
+ case FRESCAN_REP_NEG:
+ ret = frescan_manager_repneg(req_data);
+ assert(ret == 0);
+ break;
default:
- ERROR("wrong request type %d\n", type);
+ ERROR("request type not supported\n");
+ assert(0);
}
- }
- return NULL;
+ if(req_data->request_node != the_networks[net].local_node) {
+ ret = frescan_requests_free(req);
+ assert(ret == 0);
+ }
+ }
}
/**
static void *frescan_acceptor_thread(void *arg)
{
int ret;
- frescan_recv_params_t params;
- uint8_t msg[200];
+ frescan_recv_params_t recv_params;
+ uint8_t msg[2000]; // TODO: use a constant with the max neg message size
size_t recv_bytes;
frescan_node_t from;
frescan_prio_t prio;
+ frescan_request_id_t req;
+ frescan_request_data_t *req_data;
- DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
- "master acceptor thread starts\n");
+ DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG, "acceptor th starts\n");
- params.net = (frescan_network_t)(uint32_t)arg;
- params.channel = FRESCAN_NEG_CHANNEL;
- params.flags = FRESCAN_SYNC;
+ recv_params.net = (frescan_network_t)(uint32_t)arg;
+ recv_params.channel = FRESCAN_NEG_CHANNEL;
+ recv_params.flags = FRESCAN_SYNC;
while(1) {
DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
- "waiting for msg, net:%u chan:%u flags:%u\n",
- params.net, params.channel, params.flags);
-
- ret = frescan_recv(¶ms, msg, 200,
- &recv_bytes, &from, &prio);
+ "waiting for a msg, net:%u chan:%u flags:%u\n",
+ recv_params.net, recv_params.channel, recv_params.flags);
+
+ ret = frescan_recv(&recv_params,
+ msg,
+ sizeof(msg),
+ &recv_bytes,
+ &from,
+ &prio);
assert(ret == 0);
DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
"msg received, from:%u size:%u prio:%u\n",
from, recv_bytes, prio);
- ret = frescan_message_parse(params.net, msg, recv_bytes, from);
+ ret = frescan_requests_alloc(&req);
+ assert(ret == 0);
+
+ ret = frescan_requests_get_data(req, &req_data);
+ assert(ret == 0);
+
+ ret = frescan_message_to_request(msg, req_data);
+ assert(ret == 0);
+
+ req_data->request_node = from;
+
+ ret = frescan_requests_enqueue(req);
assert(ret == 0);
}
return NULL;
}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_neg(frescan_request_data_t *req_data)
+{
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
+ return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_reneg(frescan_request_data_t *req_data)
+{
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
+ return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_cancel(frescan_request_data_t *req_data)
+{
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
+ return 0;
+}
+
+/**
+ * frescan_manager_neg
+ */
+
+static int frescan_manager_repneg(frescan_request_data_t *req_data)
+{
+ DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
+ return 0;
+}