From 4df94c066c706b7145e70759db83e749ef324e87 Mon Sep 17 00:00:00 2001 From: sangorrin Date: Tue, 15 Apr 2008 07:43:54 +0000 Subject: [PATCH] joining threads module git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@1106 35b4ef3e-fd22-0410-ab77-dab3279adceb --- src_frescan/frescan_acceptor_threads.c | 132 +++++++++++++++++++++++-- src_frescan/frescan_acceptor_threads.h | 35 +++++-- 2 files changed, 152 insertions(+), 15 deletions(-) diff --git a/src_frescan/frescan_acceptor_threads.c b/src_frescan/frescan_acceptor_threads.c index 0115321..65118b4 100644 --- a/src_frescan/frescan_acceptor_threads.c +++ b/src_frescan/frescan_acceptor_threads.c @@ -1,9 +1,10 @@ /*! - * @file frescan_acceptor_threads.c + * @file frescan_negotiation_threads.h * - * @brief FRESCAN acceptor threads + * @brief FRESCAN negotiation threads * - * This module contains the acceptor threads, with an operation to create them. + * This module contains the acceptor threads and the master thread for local + * negotiations, with functions to create them. * * @version 0.01 * @@ -15,13 +16,16 @@ #include #include "fosa_threads_and_signals.h" // fosa_thread_attr_init... -#include "frescan_acceptor_threads.h" +#include "frescan_negotiation_threads.h" #include "frescan_config.h" #include "frescan_debug.h" #include "frescan_data.h" #include "frescan_negotiation_messages.h" +#include "frescan_requests_queue.h" +#include "frescan_servers.h" static void *frescan_acceptor_thread(void *arg); +static void *frescan_master_neg_thread(void *arg); /** * frescan_acceptor_thread_create() @@ -45,7 +49,7 @@ int frescan_acceptor_thread_create(frescan_network_t net) return ret; } - ret = fosa_thread_create(&the_networks[net].neg_thread_id, + ret = fosa_thread_create(&the_networks[net].acceptor_thread_id, &attr, frescan_acceptor_thread, (void *)(uint32_t)net); @@ -64,13 +68,51 @@ int frescan_acceptor_thread_create(frescan_network_t net) return 0; } +/** + * frescan_master_neg_thread_create() + */ + +int frescan_master_neg_thread_create(frescan_network_t net) +{ + int ret; + fosa_thread_attr_t attr; + + ret = fosa_thread_attr_init(&attr); + if (ret != 0) { + ERROR("could not init thread attributes\n"); + return ret; + } + + ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO); + if (ret != 0) { + ERROR("could not set neg thread prio %d\n", + FRESCAN_NEG_THREAD_PRIO); + return ret; + } + + ret = fosa_thread_create(&the_networks[net].neg_thread_id, + &attr, + frescan_master_neg_thread, + (void *)(uint32_t)net); + + if (ret != 0) { + ERROR("could not create the negotiator thread\n"); + return ret; + } + + ret = fosa_thread_attr_destroy(&attr); + if (ret != 0) { + ERROR("could not destroy thread attributes\n"); + return ret; + } + + return 0; +} + /** * frescan_acceptor_thread() * - * a loop waiting for negotiation requests on the network. When it receives - * a request it acts like a normal thread requesting a negotiation to the - * negotiator thread. Once it gets the results it sends them back to the - * node that asked the negotiation. + * a loop waiting for messages on the network and parsing them. */ static void *frescan_acceptor_thread(void *arg) @@ -108,3 +150,75 @@ static void *frescan_acceptor_thread(void *arg) return NULL; } + +/** + * frescan_master_neg_thread + * + * the thread that negotiates LOCAL contracts in the MASTER node. + */ + +static void *frescan_master_neg_thread(void *arg) +{ + int err; + frescan_request_id_t request; + frescan_req_type_t type; + frescan_robj_id_t reply; + frescan_contract_t *contract; + frescan_neg_return_info_t *neg_return_info; + frescan_server_params_t server_params; + + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n"); + + while(1) { + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n"); + + err = frescan_requestqueue_dequeue(&request); + assert(err == 0); + + err = frescan_request_get_type(request, &type); + assert(err == 0); + + switch(type) { + case FRESCAN_NEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_NEGOTIATE request\n"); + err = frescan_request_get_contract(request, &contract); + assert(err == 0); + err = frescan_request_get_reply(request, &reply); + assert(err == 0); + err = frescan_request_get_return_info + (request, (void *)&neg_return_info); + assert(err == 0); + + // TODO: sched test + add contract to table + // so far always accepted witht he min values + neg_return_info->error = 0; + server_params.values = contract->min_values; + server_params.prio = contract->prio; + err = frescan_servers_create + ((frescan_network_t)(uint32_t)arg, + &server_params, + &neg_return_info->id); + assert(err == 0); + + err = frescan_replyobject_signal(reply); + assert(err == 0); + break; + + case FRESCAN_RENEGOTIATE: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_RENEGOTIATE request\n"); + break; + + case FRESCAN_CANCEL: + DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, + "FRESCAN_CANCEL request\n"); + break; + + default: + ERROR("wrong request type %d\n", type); + } + } + + return NULL; +} diff --git a/src_frescan/frescan_acceptor_threads.h b/src_frescan/frescan_acceptor_threads.h index d9aa1a6..d4ba225 100644 --- a/src_frescan/frescan_acceptor_threads.h +++ b/src_frescan/frescan_acceptor_threads.h @@ -1,9 +1,10 @@ /*! - * @file frescan_acceptor_threads.h + * @file frescan_negotiation_threads.h * - * @brief FRESCAN acceptor threads + * @brief FRESCAN negotiation threads * - * This module contains the acceptor threads, with an operation to create them. + * This module contains the acceptor threads and the master thread for local + * negotiations, with functions to create them. * * @version 0.01 * @@ -13,11 +14,33 @@ * */ -#ifndef _FRESCAN_ACCEPTOR_THREADS_H_ -#define _FRESCAN_ACCEPTOR_THREADS_H_ +#ifndef _FRESCAN_NEGOTIATION_THREADS_H_ +#define _FRESCAN_NEGOTIATION_THREADS_H_ #include "frescan.h" +/** + * frescan_master_neg_thread_create() + * + * This call creates the thread in charge of LOCAL negotiations at the + * MASTER node, so in the rest of nodes it doesnt have to be called at + * initialization. This thread will await in a local request queue for + * LOCAL negotiation requests from threads in the same CPU. + * In the case of SLAVE nodes, the negotiation requests are simply performed + * by sending an appropiate message to the MASTER node and then awaiting + * in a reply object until an acceptor thread signals it. + */ + +extern int frescan_master_neg_thread_create(frescan_network_t net); + +/** + * frescan_acceptor_thread_create() + * + * This call is called in every node (including the MASTER node) conforming + * a set of threads, one at each node, that awaits negotiation messages + * from a receive endpoint and perform the corresponding operations. + */ + extern int frescan_acceptor_thread_create(frescan_network_t net); -#endif // _FRESCAN_ACCEPTOR_THREADS_H_ +#endif // _FRESCAN_NEGOTIATION_THREADS_H_ -- 2.39.2