static contract_hash_table_t contract_hash_table; /* hash table for the contracts */
static vres_repo_t vres_repo; /* negotiated vres repository */
static disk_t disk[N_DISKS]; /* parameters for the HDs in the system */
-static thread_repo_t thread_repo; /* registered thread repository */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-static service_th_queue_t service_th_queue; /* job (form clients) queueing for multithreaded implementation */
-static worker_thread_pool_t worker_pool; /* thread pool for multithreaded implementation */
-#endif
-
-/***************************************************************************/
-/* H A R D D I S K H A N D L I N G U T I L I T Y F U N C T I O N S */
-/***************************************************************************/
-
-static bool disk_init()
-{
- int i;
-
- for (i = 0; i < N_DISKS; i++) {
- disk[i].aggregate_bw = DISK_DEFAULT_AGGREGATE_BW;
- disk[i].sched_budget = DISK_DEFAULT_SCHED_BUDGET;
- disk[i].weight_sum = 0;
- disk[i].thread_num = 0;
- }
-
- return true;
-}
-
-static inline unsigned int disk_sched_ioprio(unsigned int weight) {
-
- if (weight == 100)
- return 0;
- if (weight > 14)
- return 1;
- return 8 - 1 - (weight / 2);
-}
-
-static inline unsigned int disk_sched_ioweight(frsh_resource_id_t disk_id, struct timespec budget, struct timespec period) {
- float th, weight;
- int b, w_i;
- unsigned long int q_i, p_i;
-
- th = disk[disk_id_2_index(disk_id)].aggregate_bw / 10E9;
- b = disk[disk_id_2_index(disk_id)].sched_budget;
-
- /* nsec */
- q_i = timespec_to_usec(budget);
- p_i = timespec_to_usec(period);
- q_i *= 1000;
- p_i *= 1000;
-
- if (p_i < b / th)
- return -1;
-
- weight = (DISK_WEIGHT_MAX * q_i ) / (p_i - (b / th));
- w_i = floor(weight);
-
- if (weight != w_i)
- w_i++;
-
- return w_i;
-}
-
-static inline float disk_sched_iobandwidth(frsh_resource_id_t disk_id, unsigned int weight) {
- return (float) weight * (disk[disk_id_2_index(disk_id)].aggregate_bw / disk[disk_id_2_index(disk_id)].weight_sum);
-}
-
-
-/***********************************************************/
-/* H A S H T A B L E U T I L I T Y F U N C T I O N S */
-/***********************************************************/
-
-/*
- * initialize the table
- *
- * sets all pointers of the array table of the hash table to NULL
- * and init the mutex (if a multithreaded service thread has
- * been configured)
- *
- * possible return values:
- * true (all ok)
- * false (something wrong with the mutex)
- */
-static bool contract_hash_init()
-{
- int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- if (fosa_mutex_init(&contract_hash_table.mutex, 0) != 0)
- return false;
-#endif
- for (i = 0; i < HASH_N_ELEMENTS; i++)
- contract_hash_table.table[i] = NULL;
-
- return true;
-}
-
-/*
- * hash function
- *
- * obtains the hash (int) value associated to the (string) key provided
- * simply accessing each character as an integer value and summing them all
- *
- * always returns the hash value (no room for errors!)
- */
-static unsigned int contract_hash_hash(const char *key_str)
-{
- int i, step = CHAR_PER_INT;
- unsigned int hash = 0;
- unsigned int key_val = 0;
- char *key_val_ptr = (char*) &key_val;
-
- for (i = 0; i < strlen(key_str); i += step) {
- strncpy(key_val_ptr, (key_str + i), step);
- hash += key_val;
- }
-
- return hash % HASH_N_ELEMENTS;
-}
-
-/*
- * search an element
- *
- * calculates the hash value, accesses the hash table element and
- * follows the linked list if needed
- * data_ptr can be used to access/modify the element data field
- *
- * possible return values:
- * HASH_LABEL_FOUND
- * HASH_ERR_LABEL_NOT_FOUND
- */
-static int contract_hash_find(const char *key, contract_hash_data_t **data_ptr)
-{
- contract_hash_entry_t *curr_entry;
- unsigned int hash_value;
- int result;
-
- /* hash value of the the key */
- hash_value = contract_hash_hash(key);
-#ifdef frsh_enable_service_th_multithread
- /* lock the table */
- fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
- /* start traversing the linked list (if needed) */
- curr_entry = contract_hash_table.table[hash_value];
- while (curr_entry != NULL) {
- if (strncmp(key, curr_entry->contract_label, HASH_KEY_LENGTH) == 0) {
- /* key found! */
- *data_ptr = &(curr_entry->contract_data);
- result = HASH_LABEL_FOUND;
- break;
- }
- /* not found yet, let's try the next element */
- curr_entry = curr_entry->next;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the table */
- fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
- return result;
-}
-
-/*
- * add an element
- *
- * calculates the hash function and adds the new element at the end
- * of the linked list of the hash table entry, unless one with the same
- * key value already exists
- * data_ptr can be used to access/modify the element data field
- *
- * possible return values:
- * HASH_NO_ERROR
- * HASH_ERR_LABEL_FOUND
- */
-static int contract_hash_add(const char *key, contract_hash_data_t **data_ptr)
-{
- contract_hash_entry_t *new_entry, **curr_entry;
- unsigned int hash_value;
- int result;
-
- /* hash value of the key */
- hash_value = contract_hash_hash(key);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the table */
- fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
- /* traverse the linked list till the end
- * (stop if the key already exists) */
- curr_entry = &contract_hash_table.table[hash_value];
- result = HASH_NO_ERROR;
- while (*curr_entry != NULL) {
- if (strncmp(key, (*curr_entry)->contract_label, HASH_KEY_LENGTH) == 0) {
- result = HASH_ERR_LABEL_FOUND;
- break;
- }
- curr_entry = &(*curr_entry)->next;
- }
- if (result == HASH_NO_ERROR) {
- /* create the new element */
- new_entry = (contract_hash_entry_t*) malloc(sizeof(contract_hash_entry_t));
- strncpy(new_entry->contract_label, key, HASH_KEY_LENGTH);
- /* add it (we're already at the end of the linked list) */
- *data_ptr = &(new_entry->contract_data);
- *curr_entry = new_entry;
- new_entry->next = NULL;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the table */
- fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
- return result;
-}
-
-/*
- * remove an element
- *
- * calculates the hash function, searches the element in the linked
- * list and remove it
- *
- * possible return values:
- * HASH_NO_ERROR
- * HASH_ERR_LABEL_NOT_FOUND
- */
-static int contract_hash_del(const char *key)
-{
- contract_hash_entry_t *curr_entry, *curr_entry_prev;
- unsigned int hash_value;
- int result;
-
- /* hash value of the key */
- hash_value = contract_hash_hash(key);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the table */
- fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
- /* traverse the list searching for the key */
- curr_entry_prev = NULL;
- curr_entry = contract_hash_table.table[hash_value];
- result = HASH_ERR_LABEL_NOT_FOUND;
- while (curr_entry != NULL) {
- if (strncmp(key, curr_entry->contract_label, HASH_KEY_LENGTH) == 0) {
- /* update the list and remove the element */
- if (curr_entry_prev == NULL)
- contract_hash_table.table[hash_value] = curr_entry->next;
- else
- curr_entry_prev->next = curr_entry->next;
- free((void*) curr_entry);
- result = HASH_NO_ERROR;
- break;
- }
- curr_entry_prev = curr_entry;
- curr_entry = curr_entry->next;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the table */
- fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
- return result;
-}
-
-/*********************************************************************/
-/* V R E S R E P O S I T O R Y U T I L I T Y F U N C T I O N S */
-/*********************************************************************/
-
-/*
- * access functions to vres repository
- *
- * some simple subsitutions macro which help keep vres_repo_t opaque and
- * improve code cleanness and look & feel (exactly as such as
- * with thread_repo_t a few lines above)
- */
-
-/* given the id of a contracted vres stored in the repository returns
- * a pointer to its entry */
-#define get_vres_entry(vres_id) \
- ( & ( vres_repo.repo[vres_id_2_index(vres_id)] ) )
-/* gives the actual number of thread bound to a vres */
-#define get_vres_threads_number(vres_id) \
- ( vres_repo.repo[vres_id_2_index(vres_id)].threads_number )
-/* gives the actual number of vres in the repository */
-#define get_vres_number() \
- ( vres_repo.vres_number )
-/* gives the actual number of dummy vres in the repository */
-#define get_dummy_vres_number() \
- ( vres_repo.dummy_vres_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the access to the vres repository
- * (is the index value in the correct range?) and the status of each entry
- * (is it free?, non-free?, ecc.)
- */
-
-/* are we accessing the repository with a valid index value ? */
-#define check_vres_repo_ind(ind) \
- ( (ind >= 0 && ind < 2 * FRSH_MAX_N_VRES) ? true : false )
-/* is the vres repository entry (correctly accessed and) free ? */
-#define check_vres_repo_entry_free(ind) \
- ( check_vres_repo_ind(ind) && ( vres_repo.repo[(ind)].processor.sid == FRSH_QRES_NOT_VALID_SID ) && ( vres_repo.repo[(ind)].disk.weight == FRSH_QRES_NOT_VALID_DISK ) )
-
-/* is the vres repository entry (correctly accessed and) non free ? */
-#define check_vres_repo_entry_nonfree(ind) \
- ( check_vres_repo_ind(ind) && \
- ( ( vres_repo.repo[(ind)].processor.sid != FRSH_QRES_NOT_VALID_SID ) || \
- ( vres_repo.repo[(ind)].disk.weight != FRSH_QRES_NOT_VALID_DISK ) ) ) \
-
-/*
- * initialize the repository
- *
- * sets all entries to invalid (that is, free) and init the mutex
- * (if a multithreaded service thread has been configured)
- *
- * possible return values:
- * true (all ok)
- * false (something wrong with the mutex)
- */
-static inline bool vres_repo_init()
-{
- int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- if (fosa_mutex_init(&vres_repo.mutex, 0) != 0)
- return false;
-#endif
- vres_repo.vres_number = 0;
- vres_repo.dummy_vres_number = 0;
- for (i = 0; i < 2 * FRSH_MAX_N_VRES; i++) {
- /* vres_repo.repo[i].contract untouched */
- vres_repo.repo[i].processor.sid = FRSH_QRES_NOT_VALID_SID;
- vres_repo.repo[i].processor.threads_number = 0;
- vres_repo.repo[i].processor.first_thread_index = -1;
- vres_repo.repo[i].disk.weight = FRSH_QRES_NOT_VALID_DISK;
- vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
- }
-
- return true;
-}
-
-/*
- * add an entry
- *
- * adds an entry into the repository. If successful the index of the entry is
- * returned in vres_ind.
- *
- * We (again) "accept suggestions" and we implement this exaclty as previously
- * described for the thread repository utility functions
- *
- * possible return values:
- * true (entry correctly added)
- * false (entry not added, maybe no more room in the repository!)
- */
-static bool vres_repo_put(const frsh_contract_t *contract,
- const unsigned int vres_data,
- int *vres_ind)
-{
- int i;
- int start;
- bool result;
-
- /* depending by the contract type we must place the new entry
- * if the "first half" (0 <= vres_ind < FRSH_MAX_N_VRES) of in the
- * second half (FRSH_MAX_N_VRES <= vres_ind < 2*FRSH_MAX_N_VRES) of
- * the repository */
- switch (contract->contract_type) {
- default:
- case FRSH_CT_REGULAR:
- case FRSH_CT_BACKGROUND:
- {
- start = 0;
- break;
- }
- case FRSH_CT_DUMMY:
- {
- start = FRSH_MAX_N_VRES;
- break;
- }
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
- /* try to allocate the vres entry in the caller provided
- * position (if any) and, only if it's not free, scan all
- * the repository for a different suitable place */
- if (vres_ind != NULL &&
- check_vres_repo_entry_free((start + *vres_ind) % FRSH_MAX_N_VRES))
- i = (start + *vres_ind) % FRSH_MAX_N_VRES;
- else {
- i = start;
- while (i < FRSH_MAX_N_VRES && !check_vres_repo_entry_free(i))
- i++;
- }
- /* if we're sure it's valid and free we place the
- * vres entry into the repository in position 'i' */
- result = false;
- if (check_vres_repo_entry_free(i)) {
- vres_repo.repo[i].contract = *contract;
- switch (contract->resource_type) {
- case FRSH_RT_PROCESSOR:
- {
- vres_repo.repo[i].processor.sid = (qres_sid_t) vres_data;
- vres_repo.repo[i].processor.threads_number = 0;
- vres_repo.repo[i].processor.first_thread_index = -1;
- break;
- }
- case FRSH_RT_DISK:
- {
- vres_repo.repo[i].disk.weight = (unsigned int) vres_data;
- break;
- }
- default:
- return false;
- }
- vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
- if (vres_ind != NULL)
- *vres_ind = i;
- if (start == 0)
- vres_repo.vres_number++;
- else
- vres_repo.dummy_vres_number++;
- result = true;
- }
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-
- return result;
-}
-
-/*
- * remove an entry
- *
- * sets the entry as free (if it is not) and decrement the (dummy?) vres count
- *
- * possible return values:
- * true (all ok, entry war non-free and is now free)
- * false (something wrong, entry was already free or index is out of renge)
- */
-static bool vres_repo_free(const int vres_ind)
-{
- thread_repo_entry_t *curr_thread;
- int next_thread_ind;
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
- result = false;
- if (check_vres_repo_entry_nonfree(vres_ind)) {
- /* detach all the bound threads */
- curr_thread = &thread_repo.repo[vres_repo.repo[vres_ind].processor.first_thread_index];
- while (vres_repo.repo[vres_ind].processor.threads_number--) {
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* we need to lock the thread repository too */
- fosa_mutex_lock(&thread_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- /* if no hierarchical scheduling only one thread per vres is correct */
- assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
- vres_repo.repo[vres_ind].processor.threads_number == 0);
- /* no threads in DUMMY contracts */
- assert(vres_repo.repo[vres_ind].contract.contract_type != FRSH_CT_DUMMY);
-#endif
- /* only REGULAR contract type case handled since
- * dummy vres _always_ has no bound thread and BACKGROUND
- * thread need no detaching (they're not attachet to any server) */
- if (vres_repo.repo[vres_ind].contract.contract_type == FRSH_CT_REGULAR)
- /* do not care the return value since the thread
- * could exist no more and the call could fail
- * quite easily and frequently! */
- qres_detach_thread(vres_repo.repo[vres_ind].processor.sid,
- curr_thread->thread.linux_pid,
- curr_thread->thread.linux_tid);
- /* free the thread repository entry too */
- curr_thread->thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
- curr_thread->thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
- curr_thread->thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
- curr_thread->vres = FRSH_NOT_VALID_VRES_ID;
- next_thread_ind = curr_thread->next_in_vres;
- curr_thread->next_in_vres = -1;
- thread_repo.threads_number--;
- /* move along the chain */
- if (next_thread_ind != -1)
- curr_thread = &thread_repo.repo[next_thread_ind];
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the thread repository */
- fosa_mutex_unlock(&thread_repo.mutex);
-#endif
- }
- /* set the repository entry as free */
- /* vres_repo.repo[vres_id].contract untouched */
- vres_repo.repo[vres_ind].processor.sid = FRSH_QRES_NOT_VALID_SID;
- vres_repo.repo[vres_ind].processor.threads_number = 0;
- vres_repo.repo[vres_ind].processor.first_thread_index = -1;
- vres_repo.repo[vres_ind].renegotiation_status = FRSH_RS_NOT_REQUESTED;
- if (vres_ind < FRSH_MAX_N_VRES)
- vres_repo.vres_number--;
- else
- vres_repo.dummy_vres_number--;
- result = true;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-
- return result;
-}
-
-/*
- * an entry is free ?
- *
- * check if a specific entry of the repository can be considered free.
- * The needed check is very simple and reduces itself to one of the
- * test macro defined above... Note if a multithreaded service thread is
- * configured we do it with an atomic lock
- *
- * possible return values:
- * true (entry is free)
- * false (entry is not free or index is out of range)
- */
-static inline bool vres_repo_isfree(const int vres_ind)
-{
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
- result = check_vres_repo_entry_free(vres_ind);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
- return result;
-}
-
-/*
- * an entry is "dummy" ?
- *
- * check if a specific entry of the repository (is not free!) and contains
- * a dummy vres, created as a consequence of the negotiation of a CT_DUMMY
- * contract.
- * If a multithreaded service thread is configured the needed checkins are
- * performed holding an atomic lock
- *
- * possible return values:
- * true (entry contains a dummy vres)
- * false (entry does not contains a dummy vres or index is out of range)
- */
-static inline bool vres_repo_isdummy(const int vres_ind)
-{
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
- result = (check_vres_repo_ind(vres_ind) && vres_ind >= FRSH_MAX_N_VRES);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
- return result;
-}
-
-/***************************************************************************/
-/* T H R E A D S R E P O S I T O R Y U T I L I T Y F U N C T I O N S */
-/***************************************************************************/
-
-/*
- * access functions to thread repository
- *
- * some simple subsitutions macro which help keep thread_repo_t opaque and
- * improve code cleanness and look & feel
- */
-
-/* given the index of a registered thread in the repository returns
- * a pointer to its entry () */
-#define get_thread_entry(thread_ind) \
- ( & ( thread_repo.repo[(thread_ind)] ) )
-/* gives the actual number of threads in the repository */
-#define get_threads_number() \
- ( thread_repo.threads_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the access to the thread repository
- * (is the index value in the correct range?) and the status of each entry
- * (is it free?, non-free?, equal to an external one?, ecc.)
- */
-
-/* are we accessing the repository with a valid index value ? */
-#define check_thread_repo_ind(ind) \
- ( ( ind >= 0 && ind < FRSH_MAX_N_THREADS ) ? true : false )
-/* is the thread repository entry (correctly accessed and) free ? */
-#define check_thread_repo_entry_free(ind) \
- ( ( ( check_thread_repo_ind(ind) ) && \
- thread_repo.repo[(ind)].thread.pthread_id == FRSH_NOT_VALID_THREAD_ID && \
- thread_repo.repo[(ind)].thread.linux_pid == FRSH_NOT_VALID_THREAD_ID && \
- thread_repo.repo[(ind)].thread.linux_tid == FRSH_NOT_VALID_THREAD_ID ) ? true : false )
-/* is the thread repository entry (correctly accessed and) non free ? */
-#define check_thread_repo_entry_nonfree(ind) \
- ( ( ( check_thread_repo_ind(ind) ) && \
- thread_repo.repo[(ind)].thread.pthread_id != FRSH_NOT_VALID_THREAD_ID && \
- thread_repo.repo[(ind)].thread.linux_pid != FRSH_NOT_VALID_THREAD_ID && \
- thread_repo.repo[(ind)].thread.linux_tid != FRSH_NOT_VALID_THREAD_ID ) ? true : false )
-/* does the thread repository entry contain a thread equal to the one provided ? */
-#define check_thread_repo_entry_equal(th, th_ind) \
- ( ( ( check_thread_repo_ind(th_ind) ) && \
- ( thread_repo.repo[(th_ind)].thread.pthread_id == (th).pthread_id ) && \
- ( thread_repo.repo[(th_ind)].thread.linux_pid == (th).linux_pid ) && \
- ( thread_repo.repo[(th_ind)].thread.linux_tid == (th).linux_tid ) ) ? true : false ) \
-
-/*
- * initialize the repository
- *
- * sets all entries to invalid (that is, free) and init the mutex
- * (if a multithreaded service thread has been configured)
- *
- * possible return values:
- * true (all ok)
- * false (something wrong with the mutex)
- */
-static inline bool thread_repo_init()
-{
- int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- if (fosa_mutex_init(&thread_repo.mutex, 0) != 0)
- return false;
-#endif
- thread_repo.threads_number = 0;
- for (i = 0; i < FRSH_MAX_N_THREADS; i++) {
- thread_repo.repo[i].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[i].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[i].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[i].vres = FRSH_NOT_VALID_VRES_ID;
- thread_repo.repo[i].next_in_vres = -1; /* no other thread in vres */
- }
-
- return true;
-}
-
-/*
- * add an entry
- *
- * adds an entry into the repository. If successful the index of the entry is
- * returned in thread_ind.
- *
- * We "accept suggestions", i.e. we suppose the caller knows the location of
- * a free entry in the repository and ask us to insert the new entry exactly
- * there, at thread_ind position.
- * So we check if that location is really free and, if yes, we can add the
- * entry right there. If, on the other way, the entry is not free we need to
- * search the repository for a different place.
- *
- * Note we also keep updated the repository entry (see below) of the vres the
- * thread is bound to.
- *
- * possible return values:
- * true (entry correctly added)
- * false (entry not added, maybe no more room in the repository!)
- */
-static bool thread_repo_put(const frsh_thread_id_t *thread,
- const frsh_vres_id_t vres_id,
- int *thread_ind)
-{
- int i;
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&thread_repo.mutex);
-#endif
- /* try to place the new entry in the caller provided position (if any)
- * and, only if it's not free, scan all the repository for
- * a different and suitable place */
- if (thread_ind != NULL &&
- check_thread_repo_entry_free(*thread_ind % FRSH_MAX_N_THREADS))
- i = *thread_ind % FRSH_MAX_N_THREADS;
- else {
- i = 0;
- while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_free(i))
- i++;
- }
- /* if we're sure it's valid and free we place the
- * thread entry into the repository in position 'i' */
- result = false;
- if (check_thread_repo_entry_free(i)) {
- /* update the provided vres repository entry */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* we need to lock the vres repository too */
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
- get_vres_entry(vres_id)->processor.threads_number == 0);
-#endif
- get_vres_entry(vres_id)->processor.threads_number++;
- /* keep updated the chain of thread of the vres repository entry */
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
- get_vres_entry(vres_id)->processor.first_thread_index == -1);
-#endif
- thread_repo.repo[i].next_in_vres = get_vres_entry(vres_id)->processor.first_thread_index;
- get_vres_entry(vres_id)->processor.first_thread_index = i;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the vres repository */
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
- thread_repo.repo[i].thread = *thread;
- thread_repo.repo[i].vres = vres_id;
- if (thread_ind != NULL)
- *thread_ind = i;
- thread_repo.threads_number++;
- result = true;
- }
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
- return result;
-}
-
-/*
- * remove an entry
- *
- * sets the entry as free (if it is not) and decrement the thread count.
- *
- * Note we also keep updated the repository entry (see below) of the vres the
- * thread is bound to.
- *
- * possible return values:
- * true (all ok, entry war non-free and is now free)
- * false (something wrong, entry was already free or index is out of renge)
- */
-static bool thread_repo_free(const int thread_ind)
-{
- vres_repo_entry_t *thread_vres;
- thread_repo_entry_t *thread_p;
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&thread_repo.mutex);
-#endif
- result = false;
- if (check_thread_repo_entry_nonfree(thread_ind)) {
- thread_p = &thread_repo.repo[thread_ind];
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- assert(!vres_repo_isfree(vres_id_2_index(thread_p->vres)));
-#endif
- thread_vres = get_vres_entry(thread_p->vres);
- /* update the bound vres repository entry */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* we need to lock the vres repository too */
- fosa_mutex_lock(&vres_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
- get_vres_entry(thread_p->vres)->processor.threads_number == 1);
-#endif
- thread_vres->processor.threads_number--;
- /* keep the chain of thread in the vres updated */
- if (thread_vres->processor.first_thread_index == thread_ind)
- thread_vres->processor.first_thread_index = thread_p->next_in_vres;
- else
- while (thread_p->next_in_vres != thread_ind)
- thread_p = &thread_repo.repo[thread_p->next_in_vres];
- thread_p->next_in_vres = thread_repo.repo[thread_ind].next_in_vres;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the vres repository */
- fosa_mutex_unlock(&vres_repo.mutex);
-#endif
- /* set the repository entry as free */
- thread_repo.repo[thread_ind].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[thread_ind].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[thread_ind].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
- thread_repo.repo[thread_ind].vres = FRSH_NOT_VALID_VRES_ID;
- thread_repo.repo[thread_ind].next_in_vres = -1;
- thread_repo.threads_number--;
- result = true;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
- return result;
-}
-
-/*
- * is an entry free ?
- *
- * check if a specific entry of the repository can be considered free.
- * The needed check is very simple and reduces itself to one of the
- * test macro define d above... Note if a multithreaded service thread is
- * configured we do it with an atomic lock
- *
- * possible return values:
- * true (entry is free)
- * false (entry is not free or index is out of range)
- */
-static bool thread_repo_isfree(const int thread_ind)
-{
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&thread_repo.mutex);
-#endif
- result = check_thread_repo_entry_free(thread_ind);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&thread_repo.mutex);
-#endif
- return result;
-}
-
-/*
- * search for an entry
- *
- * tries to find an entry in the repository containing a the thread descriptor
- * provided as (first) argument. If successful the index of the entry is
- * returned in thread_ind.
- *
- * We "accept suggestions" (as in the entry add function), i.e. we suppose the
- * caller knows the entry he's searching for is in the repository and its
- * index is exactly thread_ind, so we check if this is true and, if yes,
- * we're done! If, on the other way, we can't find the searched entry at
- * thread_ind position we start a complete scan of the repository
- *
- * possible return values:
- * true (entry found)
- * false (entry not found)
- */
-static bool thread_repo_find(const frsh_thread_id_t *thread, int *thread_ind)
-{
- int i;
- bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* lock the repository */
- fosa_mutex_lock(&thread_repo.mutex);
-#endif
- /* check if the caller provides us (we hope _intentionally_!) the
- * correct position in the repository of the entry he's searching */
- if (thread_ind != NULL &&
- check_thread_repo_entry_equal(*thread, (*thread_ind) % FRSH_MAX_N_THREADS))
- /* if yes we've finished our search before starting it! */
- i = (*thread_ind) % FRSH_MAX_N_THREADS;
- else {
- /* if no we scan the repository till we find the entry (or till its end) */
- i = 0;
- while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_equal(*thread, i))
- i++;
- }
- /* check again and definitively decide the entry is in the repository or not */
- result = false;
- if (check_thread_repo_entry_equal(*thread, i)) {
- if (thread_ind != NULL)
- *thread_ind = i;
- result = true;
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
- /* unlock the repository */
- fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
- return result;
-}
-
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-
-/***********************************************************************************************/
-/* S E R V I C E T H R E A D R E Q U E S T Q U E U E U T I L I T Y F U N C T I O N S */
-/***********************************************************************************************/
-
-/*
- * access functions to service thread jobs queue
- */
-
-/* gives the actual number of queued jobs */
-#define get_queued_jobs_number() \
- ( service_th_queue.jobs_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the accesses to the queue (is the
- * index value in the correct range?), the status of each entry (is it free?,
- * non-free?) and of the whole queue (is it full?, is it empty?)
- */
-
-/* are we accessing the queue with a valid index value ? */
-#define check_service_th_queue_ind(ind) \
- ( (ind >=0 && ind < FRSH_MAX_N_SERVICE_JOBS) ? true : false )
-/* is the queue element (correctly accessed and) free ? */
-#define check_service_th_queue_isfree(ind) \
- ( ((check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 1) ? true : false )
-/* is the queue element (correctly accessed and) non free ? */
-#define check_service_th_queue_nonfree(ind) \
- ( (check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 0) ? true : false )
-/* is the queue still empty ? */
-#define check_service_th_queue_empty() \
- ( (service_th_queue.jobs_number == 0) ? true : false )
-/* is the queue already full ? */
-#define check_service_th_queue_full() \
- ( (service_th_queue.jobs_number >= FRSH_MAX_N_SERVICE_JOBS) ? true: false )
-
-/*
- * initialize the queue
- *
- * sets all elements to free and init the mutex
- *
- * possible return values:
- * true (all ok)
- * false (something wrong with the mutex)
- */
-static inline bool service_th_queue_init()
-{
- int i;
-
- service_th_queue.jobs_number = 0;
- service_th_queue.first = service_th_queue.last = 0;
- if (fosa_mutex_init(&service_th_queue.mutex, 0))
- return false;
- for (i = 0; i < FRSH_MAX_N_SERVICE_JOBS; i++) {
- service_th_queue.queue[i].conn = 0;
- service_th_queue.queue[i].valid = 0;
- }
-
- return true;
-}
-
-/*
- * add an element into the queue
- *
- * if possible (there is enough free space) adds a new element into the queue
- * according to the FIFO policy (so the new element it's inserted after the
- * actual last one).
- * Note all is done while holding an atomic lock on the whole queue
- *
- * possible return values:
- * true (all ok, element added)
- * false (element not added, no more room in the queue!)
- */
-static bool enqueue_service_th_request(int conn, frsh_in_msg_t *request)
-{
- int last;
-
- /* lock the queue */
- fosa_mutex_lock(&service_th_queue.mutex);
- if (check_service_th_queue_full()) {
- fosa_mutex_unlock(&service_th_queue.mutex);
- return false;
- }
- last = service_th_queue.last;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- /* if we need to be sure the choosen element is really free... */
- assert(service_th_queue.queue[last].valid == 0);
-#endif
- /* fill the new element with provided data and mark it as no longer free */
- service_th_queue.queue[last].msg = *request;
- service_th_queue.queue[last].conn = conn;
- service_th_queue.queue[last].valid = 1;
- /* update queue (last) pointer and counter */
- service_th_queue.last = (service_th_queue.last + 1) % FRSH_MAX_N_SERVICE_JOBS;
- service_th_queue.jobs_number++;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- /* if we need to be sure there was at least a free element in the queue... */
- assert(service_th_queue.jobs_number > 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
-#endif
- /* unlock the queue */
- fosa_mutex_unlock(&service_th_queue.mutex);
-
- return true;
-}
-
-/*
- * remove an element from the queue
- *
- * if possible (there is at least one element) remove an (the first, FIFO
- * policy) element from the queue.
- * Note all is done while holding an atomic lock on the whole queue
- *
- * possible return values:
- * true (all ok, element added)
- * false (element not added, no more room in the queue!)
- */
-static bool dequeue_service_th_request(int *conn, frsh_in_msg_t *request)
-{
- int first;
-
- /* lock the queue */
- fosa_mutex_lock(&service_th_queue.mutex);
- if (check_service_th_queue_empty()) {
- fosa_mutex_unlock(&service_th_queue.mutex);
- return false;
- }
- first = service_th_queue.first;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- /* if we need to be sure the element is really not free... */
- assert(service_th_queue.queue[first].valid == 1);
-#endif
- /* provide the user for the element datas and mark it as again free */
- *request = service_th_queue.queue[first].msg;
- *conn = service_th_queue.queue[first].conn;
- service_th_queue.queue[first].valid = 0;
- /* update queue (first) pointer and counter */
- service_th_queue.first = (service_th_queue.first + 1) % FRSH_MAX_N_SERVICE_JOBS;
- service_th_queue.jobs_number--;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
- /* if we need to be sure there was at least one element in the queue... */
- assert(service_th_queue.jobs_number >= 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
- /* if we need to be sure the pointers have always been correctly updated... */
- if (!check_service_th_queue_empty())
- assert(service_th_queue.first != service_th_queue.last);
- else
- assert(service_th_queue.first == service_th_queue.last);
-#endif
- /* unlock the queue */
- fosa_mutex_unlock(&service_th_queue.mutex);
-
- return true;
-}
-
-/***************************************************************************************************/
-/* W O R K E R T H R E A D S I N I T I A L I Z A T I O N A N D M A I N F U N C T I O N S */
-/***************************************************************************************************/
-
-/*
- * access functions to worker thread pool
- */
-
-/* gives the actual number of active threads in the pool */
-#define get_active_thread_number() \
- ( worker_pool.active_thread_number )
-/* gives the actual number of busy threads in the pool */
-#define get_busy_thread_number() \
- ( worker_pool.busy_thread_number )
-/* gives the actual number of sleeping threads in the pool */
-#define get_sleeping_thread_number() \
- ( worker_pool.active_thread_number - worker_pool.busy_thread_number )
-
-/*
- * FRSH utility function prototypes
- *
- * declared here to be visible to the worker thread code, see each definition
- * (below in the code) for specific infos about each one
- */
-static int negotiate_contract(const frsh_contract_t *contract, frsh_vres_id_t *vres);
-
-static int bind_thread(const frsh_thread_id_t *thread,const frsh_vres_id_t vres_id);
-
-static int unbind_thread(const frsh_thread_id_t *thread);
-
-static int get_thread_vres_id(const frsh_thread_id_t *thread, frsh_vres_id_t *vres);
-
-static int get_contract(const frsh_vres_id_t vres_id, frsh_contract_t *contract);
-
-static int get_label_vres_id(const frsh_contract_label_t contract_label,frsh_vres_id_t *vres);
-
-static int cancel_contract(const frsh_vres_id_t vres);
-
-static int renegotiate_contract(const frsh_contract_t *new_contract, const frsh_vres_id_t vres_id);
-
-static int get_renegotiation_status(const frsh_vres_id_t vres_id,frsh_renegotiation_status_t *renegotiation_status);
-
-static int negotiate_group(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
- int contracts_up_number, frsh_contract_t contract_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
-
-static int change_mode(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
- int contracts_touch_number, frsh_contract_t contracts_touch[], frsh_vres_id_t vres_touch[], int vres_touch_status[],
- int contracts_up_number, frsh_contract_t contracts_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
-
-static int get_cputime(const frsh_vres_id_t vres_id, struct timespec *cputime);
-
-static int get_current_budget(const frsh_vres_id_t vres_id, struct timespec *current_budget);
-
-static int get_budget_and_period(const frsh_vres_id_t vres_id,struct timespec *budget,struct timespec *period);
-
-static int reserve_feedback(frsh_contract_t *spare_contract);
-
-/*
- * thread pool initialization
- *
- * initialize the worker thread pool (a couple of counters, a mutex and a
- * condition variable) and create the minimum number of worker thread
- * configured
- *
- * possible return values:
- * true (all ok, pool initialized and all thread created)
- * false (something wrong in mutex or condition initialization or in thread creation)
- */
-
-static void* worker_thread_code();
-
-static inline bool worker_thread_pool_init()
-{
- int i;
- frsh_thread_id_t new_thread;
-
- worker_pool.active_thread_number = 0;
- worker_pool.busy_thread_number = 0;
- if (fosa_mutex_init(&worker_pool.mutex, 0) != 0)
- return false;
- if (fosa_cond_init(&worker_pool.sleep) != 0)
- return false;
- for (i = 0; i < MIN_N_WORKER_THREAD; i++)
- if (fosa_thread_create(&new_thread, NULL, worker_thread_code, NULL) != 0)
- return false;
-
- return true;
-}
-
-/*
- * worker thread code
- *
- * code executed by all the worker thread activated by the service thread to
- * effectively serve the client(s) requests.
- * Basically we check if there are queued requests and if the service thread
- * is useful or, if not, it has to die.
- * If "in service" a worker thread processes a request from a client and sends
- * back the results (if requested) then check again if it's time to sleep or
- * to die!
- *
- * possible exit status:
- * EXIT_SUCCESS (normal exit, the thread is no longer usefull)
- * EXIT_FAILURE (abnormal exit, something has gone severely wrong!)
- */
-static void* worker_thread_code(void *p)
-{
- frsh_in_msg_t request;
- frsh_out_msg_t reply;
- int conn;
- frsh_thread_id_t thread_self;
-
- /* lock the pool and say everyone we're a new
- * living (active) and working (busy) thread */
- fosa_mutex_lock(&worker_pool.mutex);
- worker_pool.active_thread_number++;
- worker_pool.busy_thread_number++;
- /* done, unlock the pool */
- fosa_mutex_unlock(&worker_pool.mutex);
- thread_self = fosa_thread_self();
- /* directly attach ourself to the service contract vres
- * (no need fo negotiation or any other FRSH operation!)*/
- if (qres_attach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
- thread_self.linux_pid,
- thread_self.linux_tid) != QOS_OK) {
- syslog(LOG_ERR, "can't attach the worker thread to the vres");
-
- return (void*)EXIT_FAILURE;
- }
- /* main worker thread cycle */
- while (1) {
- /* check if there are new requests to serve */
- while (!dequeue_service_th_request(&conn, &request)) {
- fosa_mutex_lock(&worker_pool.mutex);
- /* sleeping or exiting depends on how many worker
- * thread the system has been configured to keep alive */
- if (worker_pool.active_thread_number <= MIN_N_WORKER_THREAD) {
- /* we're still alive, we only go to sleep */
- worker_pool.busy_thread_number--;
- fosa_cond_wait(&worker_pool.sleep, &worker_pool.mutex);
- /* the service thread wake up us,
- * go and check again for queued requests */
- worker_pool.busy_thread_number++;
- fosa_mutex_unlock(&worker_pool.mutex);
- } else {
- /* our work is no longer needed, we're dying! */
- worker_pool.busy_thread_number--;
- worker_pool.active_thread_number--;
- fosa_mutex_unlock(&worker_pool.mutex);
- if (qres_detach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
- thread_self.linux_pid,
- thread_self.linux_tid) != QOS_OK) {
- syslog(LOG_ERR, "can't detach the worker thread from the vres");
-
- return (void*)EXIT_FAILURE;
- }
-
- return (void*)EXIT_SUCCESS;
- }
- }
- switch (request.type) {
- /* process client(s) requests according to the operation and
- * (simply?) colling one (or a little more) of the FRSH utility
- * function defined down in the code */
- case FRSH_MT_NEGOTIATE_CONTRACT:
- {
- syslog(LOG_DEBUG, "operation requested: NEGOTIATE_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error = negotiate_contract(&request.val.negotiate_contract.contract,
- &reply.val.negotiate_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
- goto answer_and_close;
- }
- case FRSH_MT_BIND_THREAD:
- {
- syslog(LOG_DEBUG, "operation requested: BIND_THREAD");
- reply.error = bind_thread(&request.val.bind_thread.thread_id,
- request.val.bind_thread.vres_id);
- goto answer_and_close;
- }
- case FRSH_MT_UNBIND_THREAD:
- {
- syslog(LOG_DEBUG, "operation requested: UNBIND_THREAD");
- reply.error = unbind_thread(&request.val.unbind_thread.thread_id);
- goto answer_and_close;
- }
- case FRSH_MT_GET_THREAD_VRES_ID:
- {
- syslog(LOG_DEBUG, "operation requested: GET_THREAD_VRES_ID");
- reply.error = get_thread_vres_id(&request.val.get_thread_vres_id.thread_id,
- &reply.val.get_thread_vres_id.vres_id);
- goto answer_and_close;
- }
- case FRSH_MT_GET_CONTRACT:
- {
- syslog(LOG_DEBUG, "operation requested: GET_CONTRACT");
- reply.error = get_contract(request.val.get_contract.vres_id,
- &reply.val.get_contract.contract);
- goto answer_and_close;
- }
- case FRSH_MT_GET_LABEL_VRES_ID:
- {
- syslog(LOG_DEBUG, "operation requested: GET_LABEL_VRES_ID");
- reply.error = get_label_vres_id(request.val.get_label_vres_id.contract_label,
- &reply.val.get_label_vres_id.vres_id);
- goto answer_and_close;
- }
- case FRSH_MT_CANCEL_CONTRACT:
- {
- syslog(LOG_DEBUG, "operation requested: CANCEL_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error = cancel_contract(request.val.cancel_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
- goto answer_and_close;
- }
- case FRSH_MT_RENEGOTIATE_CONTRACT:
- {
- syslog(LOG_DEBUG, "operation requested: RENEGOTIATE_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error =
- renegotiate_contract(&request.val.renegotiate_contract.new_contract,
- request.val.renegotiate_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
- goto answer_and_close;
- }
- case FRSH_MT_REQUEST_CONTRACT_RENEGOTIATION:
- {
- syslog(LOG_DEBUG, "operation requested: REQUEST_CONTRACT_RENEGOTIATION");
- close(conn);
- syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
- syslog(LOG_INFO, "connection closed");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error =
- renegotiate_contract(&request.val.request_contract_renegotiation.new_contract,
- request.val.request_contract_renegotiation.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
-#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
- /* signals the completion of the operation with a signal */
- if (request.val.request_contract_renegotiation.signal != FRSH_NULL_SIGNAL)
- if (fosa_signal_queue(request.val.request_contract_renegotiation.signal,
- request.val.request_contract_renegotiation.siginfo,
- request.val.request_contract_renegotiation.thread_to_signal) != 0)
- syslog(LOG_ERR,
- "can't signal process %d with signal %d to notify the completion of the renegotiation",
- request.val.request_contract_renegotiation.thread_to_signal.linux_pid,
- request.val.request_contract_renegotiation.signal);
-#endif
- goto end_cycle;
- }
- case FRSH_MT_GET_RENEGOTIATION_STATUS:
- {
- syslog(LOG_DEBUG, "operation requested: GET_RENEGOTIATION_STATUS");
- reply.error = get_renegotiation_status(request.val.get_renegotiation_status.vres_id,
- &reply.val.get_renegotiation_status.renegotiation_status);
- goto answer_and_close;
- }
- case FRSH_MT_NEGOTIATE_GROUP:
- {
- syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error =
- negotiate_group(request.val.negotiate_group.vres_down_number,
- request.val.negotiate_group.vres_down,
- reply.val.negotiate_group.vres_down_status,
- request.val.negotiate_group.contracts_up_number,
- request.val.negotiate_group.contracts_up,
- reply.val.negotiate_group.vres_up,
- reply.val.negotiate_group.contracts_up_status);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
-
- //reply.error = FRSH_NO_ERROR;
- //for (i = 0; i < request.val.negotiate_group.vres_down_number; i++)
- // reply.error = reply.val.negotiate_group.vres_down_status[i] =
- // cancel_contract(request.val.negotiate_group.vres_down[i]);
- //for (i = 0; i < request.val.negotiate_group.contracts_up_number; i++)
- // reply.error = reply.val.negotiate_group.contracts_up_status[i] =
- // negotiate_contract(&request.val.negotiate_group.contracts_up[i],
- // &reply.val.negotiate_group.vres_up[i]);
-
- goto answer_and_close;
- }
- case FRSH_MT_CHANGE_MODE_SYNC:
- case FRSH_MT_CHANGE_MODE_ASYNC:
- {
- syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
- reply.error = FRSH_NO_ERROR;
- if (request.type == FRSH_MT_CHANGE_MODE_ASYNC) {
- close(conn);
- syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
- syslog(LOG_INFO, "connection closed");
-
- }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error =
- change_mode(request.val.change_mode.vres_down_number,
- request.val.change_mode.vres_down,
- reply.val.change_mode.vres_down_status,
- request.val.change_mode.contracts_touch_number,
- request.val.change_mode.contracts_touch,
- request.val.change_mode.vres_touch,
- reply.val.change_mode.vres_touch_status,
- request.val.change_mode.contracts_up_number,
- request.val.change_mode.contracts_up,
- reply.val.change_mode.vres_up,
- reply.val.change_mode.contracts_up_status);
-
- //for (i = 0; i < request.val.change_mode.vres_down_number; i++)
- // reply.error = reply.val.change_mode.vres_down_status[i] =
- // cancel_contract(request.val.change_mode.vres_down[i]);
- //for (i = 0; i < request.val.change_mode.contracts_touch_number; i++)
- // reply.error = reply.val.change_mode.vres_touch_status[i] =
- // renegotiate_contract(&request.val.change_mode.contracts_touch[i],
- // request.val.change_mode.vres_touch[i]);
- //for (i = 0; i < request.val.change_mode.contracts_up_number; i++)
- // reply.error = reply.val.change_mode.contracts_up_status[i] =
- // negotiate_contract(&request.val.change_mode.contracts_up[i],
- // &reply.val.change_mode.vres_up[i]);
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
-#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
- /* signals the completion of the operation with a signal */
- if (request.val.change_mode.signal != FRSH_NULL_SIGNAL)
- if (fosa_signal_queue(request.val.change_mode.signal,
- request.val.change_mode.siginfo,
- request.val.change_mode.thread_to_signal) != 0)
- syslog(LOG_ERR,
- "can't signal process %d with signal %d to notify the completion of the renegotiation",
- request.val.change_mode.thread_to_signal.linux_pid,
- request.val.change_mode.signal);
-#endif
- if (request.type == FRSH_MT_CHANGE_MODE_SYNC)
- goto answer_and_close;
- else
- goto end_cycle;
- }
- case FRSH_MT_GET_CPUTIME:
- {
- syslog(LOG_DEBUG, "operation requested: GET_CPUTIME");
- reply.error = get_cputime(request.val.get_cputime.vres_id,
- &reply.val.get_cputime.cputime);
- goto answer_and_close;
- }
- case FRSH_MT_GET_CURRENTBUDGET:
- {
- syslog(LOG_DEBUG, "operation requested: GET_CURRENTBUDGET");
- reply.error = get_current_budget(request.val.get_currentbudget.vres_id,
- &reply.val.get_currentbudget.currentbudget);
- goto answer_and_close;
- }
- case FRSH_MT_GET_BUDGET_AND_PERIOD:
- {
- syslog(LOG_DEBUG, "operation requested: GET_BUDGET_AND_PERIOD");
- reply.error = get_budget_and_period(request.val.get_budget_and_period.vres_id,
- &reply.val.get_budget_and_period.budget,
- &reply.val.get_budget_and_period.period);
- goto answer_and_close;
- }
- case FRSH_MT_GET_SERVICE_THREAD_DATA:
- {
- syslog(LOG_DEBUG, "operation requested: GET_SERVICE_THREAD_DATA");
- reply.error = get_budget_and_period(service_th_vres_id,
- &reply.val.get_service_thread_data.budget,
- &reply.val.get_service_thread_data.period);
- goto answer_and_close;
- }
- case FRSH_MT_SET_SERVICE_THREAD_DATA:
- {
- struct timespec old_budget, old_period;
- syslog(LOG_DEBUG, "operation requested: SET_SERVICE_THREAD_DATA");
- old_budget = service_th_contract.budget_min;
- old_period = service_th_contract.period_max;
- service_th_contract.budget_min =
- request.val.set_service_thread_data.budget;
- service_th_contract.period_max =
- request.val.set_service_thread_data.period;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_lock(&contract_mutex);
-#endif
- reply.error =
- renegotiate_contract(&service_th_contract, service_th_vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
- fosa_mutex_unlock(&contract_mutex);
-#endif
- if (reply.error == FRSH_NO_ERROR)
- reply.val.set_service_thread_data.accepted = true;
- else {
- reply.val.set_service_thread_data.accepted = false;
- service_th_contract.budget_min = old_budget;
- service_th_contract.period_max = old_period;
- }
- goto answer_and_close;
- }
- case FRSH_MT_FEEDBACK_SET_SPARE:
- {
- syslog(LOG_DEBUG, "operation requested: FEEDBACK_SET_SPARE");
- reply.error = reserve_feedback(&request.val.set_feedback_spare.spare_contract);
- goto answer_and_close;
- }
- case FRSH_MT_FEEDBACK_GET_SPARE:
- {
- syslog(LOG_DEBUG, "operation requested: FEEDBACK_GET_SPARE");
- if (feedback_spare_contract == NULL)
- reply.error = FRSH_ERR_NOT_CONTRACTED_VRES;
- else {
- reply.val.get_feedback_spare.spare_contract = *feedback_spare_contract;
- reply.error = FRSH_NO_ERROR;
- }
- goto answer_and_close;
- }
- default:
- syslog(LOG_ERR, "operation requested: unknown !!");
- reply.error = FRSH_SERVICE_TH_ERR_SOCKET;
- goto answer_and_close;
- }
-answer_and_close:
- if (send(conn, (void*) &reply, sizeof(frsh_out_msg_t), 0) < sizeof(frsh_out_msg_t))
- syslog(LOG_ERR, "can't send reply message (ERROR 0x%x)", FRSH_SERVICE_TH_ERR_SOCKET);
- else
- syslog(LOG_DEBUG, "command reply sent (exit status 0x%x)", reply.error);
- close(conn);
- syslog(LOG_INFO, "connection closed");
-end_cycle:
- ;
- }
-
- return EXIT_SUCCESS;
-}
-#endif /* FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD */
/***********************************************/
/* F R S H U T I L I T Y F U N C T I O N S */
--- /dev/null
+static thread_repo_t thread_repo; /* registered thread repository */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+static worker_thread_pool_t worker_pool; /* thread pool for multithreaded implementation */
+#endif
+
+/***************************************************************************/
+/* T H R E A D S R E P O S I T O R Y U T I L I T Y F U N C T I O N S */
+/***************************************************************************/
+
+/*
+ * access functions to thread repository
+ *
+ * some simple subsitutions macro which help keep thread_repo_t opaque and
+ * improve code cleanness and look & feel
+ */
+
+/* given the index of a registered thread in the repository returns
+ * a pointer to its entry () */
+#define get_thread_entry(thread_ind) \
+ ( & ( thread_repo.repo[(thread_ind)] ) )
+/* gives the actual number of threads in the repository */
+#define get_threads_number() \
+ ( thread_repo.threads_number )
+
+/*
+ * consistency and status checks
+ *
+ * common used checkings and tests about the access to the thread repository
+ * (is the index value in the correct range?) and the status of each entry
+ * (is it free?, non-free?, equal to an external one?, ecc.)
+ */
+
+/* are we accessing the repository with a valid index value ? */
+#define check_thread_repo_ind(ind) \
+ ( ( ind >= 0 && ind < FRSH_MAX_N_THREADS ) ? true : false )
+/* is the thread repository entry (correctly accessed and) free ? */
+#define check_thread_repo_entry_free(ind) \
+ ( ( ( check_thread_repo_ind(ind) ) && \
+ thread_repo.repo[(ind)].thread.pthread_id == FRSH_NOT_VALID_THREAD_ID && \
+ thread_repo.repo[(ind)].thread.linux_pid == FRSH_NOT_VALID_THREAD_ID && \
+ thread_repo.repo[(ind)].thread.linux_tid == FRSH_NOT_VALID_THREAD_ID ) ? true : false )
+/* is the thread repository entry (correctly accessed and) non free ? */
+#define check_thread_repo_entry_nonfree(ind) \
+ ( ( ( check_thread_repo_ind(ind) ) && \
+ thread_repo.repo[(ind)].thread.pthread_id != FRSH_NOT_VALID_THREAD_ID && \
+ thread_repo.repo[(ind)].thread.linux_pid != FRSH_NOT_VALID_THREAD_ID && \
+ thread_repo.repo[(ind)].thread.linux_tid != FRSH_NOT_VALID_THREAD_ID ) ? true : false )
+/* does the thread repository entry contain a thread equal to the one provided ? */
+#define check_thread_repo_entry_equal(th, th_ind) \
+ ( ( ( check_thread_repo_ind(th_ind) ) && \
+ ( thread_repo.repo[(th_ind)].thread.pthread_id == (th).pthread_id ) && \
+ ( thread_repo.repo[(th_ind)].thread.linux_pid == (th).linux_pid ) && \
+ ( thread_repo.repo[(th_ind)].thread.linux_tid == (th).linux_tid ) ) ? true : false ) \
+
+/*
+ * initialize the repository
+ *
+ * sets all entries to invalid (that is, free) and init the mutex
+ * (if a multithreaded service thread has been configured)
+ *
+ * possible return values:
+ * true (all ok)
+ * false (something wrong with the mutex)
+ */
+static inline bool thread_repo_init()
+{
+ int i;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ if (fosa_mutex_init(&thread_repo.mutex, 0) != 0)
+ return false;
+#endif
+ thread_repo.threads_number = 0;
+ for (i = 0; i < FRSH_MAX_N_THREADS; i++) {
+ thread_repo.repo[i].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[i].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[i].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[i].vres = FRSH_NOT_VALID_VRES_ID;
+ thread_repo.repo[i].next_in_vres = -1; /* no other thread in vres */
+ }
+
+ return true;
+}
+
+/*
+ * add an entry
+ *
+ * adds an entry into the repository. If successful the index of the entry is
+ * returned in thread_ind.
+ *
+ * We "accept suggestions", i.e. we suppose the caller knows the location of
+ * a free entry in the repository and ask us to insert the new entry exactly
+ * there, at thread_ind position.
+ * So we check if that location is really free and, if yes, we can add the
+ * entry right there. If, on the other way, the entry is not free we need to
+ * search the repository for a different place.
+ *
+ * Note we also keep updated the repository entry (see below) of the vres the
+ * thread is bound to.
+ *
+ * possible return values:
+ * true (entry correctly added)
+ * false (entry not added, maybe no more room in the repository!)
+ */
+static bool thread_repo_put(const frsh_thread_id_t *thread,
+ const frsh_vres_id_t vres_id,
+ int *thread_ind)
+{
+ int i;
+ bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* lock the repository */
+ fosa_mutex_lock(&thread_repo.mutex);
+#endif
+ /* try to place the new entry in the caller provided position (if any)
+ * and, only if it's not free, scan all the repository for
+ * a different and suitable place */
+ if (thread_ind != NULL &&
+ check_thread_repo_entry_free(*thread_ind % FRSH_MAX_N_THREADS))
+ i = *thread_ind % FRSH_MAX_N_THREADS;
+ else {
+ i = 0;
+ while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_free(i))
+ i++;
+ }
+ /* if we're sure it's valid and free we place the
+ * thread entry into the repository in position 'i' */
+ result = false;
+ if (check_thread_repo_entry_free(i)) {
+ /* update the provided vres repository entry */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* we need to lock the vres repository too */
+ fosa_mutex_lock(&vres_repo.mutex);
+#endif
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+ assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+ get_vres_entry(vres_id)->processor.threads_number == 0);
+#endif
+ get_vres_entry(vres_id)->processor.threads_number++;
+ /* keep updated the chain of thread of the vres repository entry */
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+ assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+ get_vres_entry(vres_id)->processor.first_thread_index == -1);
+#endif
+ thread_repo.repo[i].next_in_vres = get_vres_entry(vres_id)->processor.first_thread_index;
+ get_vres_entry(vres_id)->processor.first_thread_index = i;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the vres repository */
+ fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+ thread_repo.repo[i].thread = *thread;
+ thread_repo.repo[i].vres = vres_id;
+ if (thread_ind != NULL)
+ *thread_ind = i;
+ thread_repo.threads_number++;
+ result = true;
+ }
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the repository */
+ fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+ return result;
+}
+
+/*
+ * remove an entry
+ *
+ * sets the entry as free (if it is not) and decrement the thread count.
+ *
+ * Note we also keep updated the repository entry (see below) of the vres the
+ * thread is bound to.
+ *
+ * possible return values:
+ * true (all ok, entry war non-free and is now free)
+ * false (something wrong, entry was already free or index is out of renge)
+ */
+static bool thread_repo_free(const int thread_ind)
+{
+ vres_repo_entry_t *thread_vres;
+ thread_repo_entry_t *thread_p;
+ bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* lock the repository */
+ fosa_mutex_lock(&thread_repo.mutex);
+#endif
+ result = false;
+ if (check_thread_repo_entry_nonfree(thread_ind)) {
+ thread_p = &thread_repo.repo[thread_ind];
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+ assert(!vres_repo_isfree(vres_id_2_index(thread_p->vres)));
+#endif
+ thread_vres = get_vres_entry(thread_p->vres);
+ /* update the bound vres repository entry */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* we need to lock the vres repository too */
+ fosa_mutex_lock(&vres_repo.mutex);
+#endif
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+ assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+ get_vres_entry(thread_p->vres)->processor.threads_number == 1);
+#endif
+ thread_vres->processor.threads_number--;
+ /* keep the chain of thread in the vres updated */
+ if (thread_vres->processor.first_thread_index == thread_ind)
+ thread_vres->processor.first_thread_index = thread_p->next_in_vres;
+ else
+ while (thread_p->next_in_vres != thread_ind)
+ thread_p = &thread_repo.repo[thread_p->next_in_vres];
+ thread_p->next_in_vres = thread_repo.repo[thread_ind].next_in_vres;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the vres repository */
+ fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+ /* set the repository entry as free */
+ thread_repo.repo[thread_ind].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[thread_ind].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[thread_ind].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
+ thread_repo.repo[thread_ind].vres = FRSH_NOT_VALID_VRES_ID;
+ thread_repo.repo[thread_ind].next_in_vres = -1;
+ thread_repo.threads_number--;
+ result = true;
+ }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the repository */
+ fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+ return result;
+}
+
+/*
+ * is an entry free ?
+ *
+ * check if a specific entry of the repository can be considered free.
+ * The needed check is very simple and reduces itself to one of the
+ * test macro define d above... Note if a multithreaded service thread is
+ * configured we do it with an atomic lock
+ *
+ * possible return values:
+ * true (entry is free)
+ * false (entry is not free or index is out of range)
+ */
+static bool thread_repo_isfree(const int thread_ind)
+{
+ bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* lock the repository */
+ fosa_mutex_lock(&thread_repo.mutex);
+#endif
+ result = check_thread_repo_entry_free(thread_ind);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the repository */
+ fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+ return result;
+}
+
+/*
+ * search for an entry
+ *
+ * tries to find an entry in the repository containing a the thread descriptor
+ * provided as (first) argument. If successful the index of the entry is
+ * returned in thread_ind.
+ *
+ * We "accept suggestions" (as in the entry add function), i.e. we suppose the
+ * caller knows the entry he's searching for is in the repository and its
+ * index is exactly thread_ind, so we check if this is true and, if yes,
+ * we're done! If, on the other way, we can't find the searched entry at
+ * thread_ind position we start a complete scan of the repository
+ *
+ * possible return values:
+ * true (entry found)
+ * false (entry not found)
+ */
+static bool thread_repo_find(const frsh_thread_id_t *thread, int *thread_ind)
+{
+ int i;
+ bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* lock the repository */
+ fosa_mutex_lock(&thread_repo.mutex);
+#endif
+ /* check if the caller provides us (we hope _intentionally_!) the
+ * correct position in the repository of the entry he's searching */
+ if (thread_ind != NULL &&
+ check_thread_repo_entry_equal(*thread, (*thread_ind) % FRSH_MAX_N_THREADS))
+ /* if yes we've finished our search before starting it! */
+ i = (*thread_ind) % FRSH_MAX_N_THREADS;
+ else {
+ /* if no we scan the repository till we find the entry (or till its end) */
+ i = 0;
+ while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_equal(*thread, i))
+ i++;
+ }
+ /* check again and definitively decide the entry is in the repository or not */
+ result = false;
+ if (check_thread_repo_entry_equal(*thread, i)) {
+ if (thread_ind != NULL)
+ *thread_ind = i;
+ result = true;
+ }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+ /* unlock the repository */
+ fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+ return result;
+}
+
+/*
+ * thread pool initialization
+ *
+ * initialize the worker thread pool (a couple of counters, a mutex and a
+ * condition variable) and create the minimum number of worker thread
+ * configured
+ *
+ * possible return values:
+ * true (all ok, pool initialized and all thread created)
+ * false (something wrong in mutex or condition initialization or in thread creation)
+ */
+
+static void* worker_thread_code();
+
+static inline bool worker_thread_pool_init()
+{
+ int i;
+ frsh_thread_id_t new_thread;
+
+ worker_pool.active_thread_number = 0;
+ worker_pool.busy_thread_number = 0;
+ if (fosa_mutex_init(&worker_pool.mutex, 0) != 0)
+ return false;
+ if (fosa_cond_init(&worker_pool.sleep) != 0)
+ return false;
+ for (i = 0; i < MIN_N_WORKER_THREAD; i++)
+ if (fosa_thread_create(&new_thread, NULL, worker_thread_code, NULL) != 0)
+ return false;
+
+ return true;
+}
+
+/*
+ * worker thread code
+ *
+ * code executed by all the worker thread activated by the service thread to
+ * effectively serve the client(s) requests.
+ * Basically we check if there are queued requests and if the service thread
+ * is useful or, if not, it has to die.
+ * If "in service" a worker thread processes a request from a client and sends
+ * back the results (if requested) then check again if it's time to sleep or
+ * to die!
+ *
+ * possible exit status:
+ * EXIT_SUCCESS (normal exit, the thread is no longer usefull)
+ * EXIT_FAILURE (abnormal exit, something has gone severely wrong!)
+ */
+static void* worker_thread_code(void *p)
+{
+ frsh_in_msg_t request;
+ frsh_out_msg_t reply;
+ int conn;
+ frsh_thread_id_t thread_self;
+
+ /* lock the pool and say everyone we're a new
+ * living (active) and working (busy) thread */
+ fosa_mutex_lock(&worker_pool.mutex);
+ worker_pool.active_thread_number++;
+ worker_pool.busy_thread_number++;
+ /* done, unlock the pool */
+ fosa_mutex_unlock(&worker_pool.mutex);
+ thread_self = fosa_thread_self();
+ /* directly attach ourself to the service contract vres
+ * (no need fo negotiation or any other FRSH operation!)*/
+ if (qres_attach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
+ thread_self.linux_pid,
+ thread_self.linux_tid) != QOS_OK) {
+ syslog(LOG_ERR, "can't attach the worker thread to the vres");
+
+ return (void*)EXIT_FAILURE;
+ }
+ /* main worker thread cycle */
+ while (1) {
+ /* check if there are new requests to serve */
+ while (!dequeue_service_th_request(&conn, &request)) {
+ fosa_mutex_lock(&worker_pool.mutex);
+ /* sleeping or exiting depends on how many worker
+ * thread the system has been configured to keep alive */
+ if (worker_pool.active_thread_number <= MIN_N_WORKER_THREAD) {
+ /* we're still alive, we only go to sleep */
+ worker_pool.busy_thread_number--;
+ fosa_cond_wait(&worker_pool.sleep, &worker_pool.mutex);
+ /* the service thread wake up us,
+ * go and check again for queued requests */
+ worker_pool.busy_thread_number++;
+ fosa_mutex_unlock(&worker_pool.mutex);
+ } else {
+ /* our work is no longer needed, we're dying! */
+ worker_pool.busy_thread_number--;
+ worker_pool.active_thread_number--;
+ fosa_mutex_unlock(&worker_pool.mutex);
+ if (qres_detach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
+ thread_self.linux_pid,
+ thread_self.linux_tid) != QOS_OK) {
+ syslog(LOG_ERR, "can't detach the worker thread from the vres");
+
+ return (void*)EXIT_FAILURE;
+ }
+
+ return (void*)EXIT_SUCCESS;
+ }
+ }
+ switch (request.type) {
+ /* process client(s) requests according to the operation and
+ * (simply?) colling one (or a little more) of the FRSH utility
+ * function defined down in the code */
+ case FRSH_MT_NEGOTIATE_CONTRACT:
+ {
+ syslog(LOG_DEBUG, "operation requested: NEGOTIATE_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error = negotiate_contract(&request.val.negotiate_contract.contract,
+ &reply.val.negotiate_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+ goto answer_and_close;
+ }
+ case FRSH_MT_BIND_THREAD:
+ {
+ syslog(LOG_DEBUG, "operation requested: BIND_THREAD");
+ reply.error = bind_thread(&request.val.bind_thread.thread_id,
+ request.val.bind_thread.vres_id);
+ goto answer_and_close;
+ }
+ case FRSH_MT_UNBIND_THREAD:
+ {
+ syslog(LOG_DEBUG, "operation requested: UNBIND_THREAD");
+ reply.error = unbind_thread(&request.val.unbind_thread.thread_id);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_THREAD_VRES_ID:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_THREAD_VRES_ID");
+ reply.error = get_thread_vres_id(&request.val.get_thread_vres_id.thread_id,
+ &reply.val.get_thread_vres_id.vres_id);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_CONTRACT:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_CONTRACT");
+ reply.error = get_contract(request.val.get_contract.vres_id,
+ &reply.val.get_contract.contract);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_LABEL_VRES_ID:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_LABEL_VRES_ID");
+ reply.error = get_label_vres_id(request.val.get_label_vres_id.contract_label,
+ &reply.val.get_label_vres_id.vres_id);
+ goto answer_and_close;
+ }
+ case FRSH_MT_CANCEL_CONTRACT:
+ {
+ syslog(LOG_DEBUG, "operation requested: CANCEL_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error = cancel_contract(request.val.cancel_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+ goto answer_and_close;
+ }
+ case FRSH_MT_RENEGOTIATE_CONTRACT:
+ {
+ syslog(LOG_DEBUG, "operation requested: RENEGOTIATE_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error =
+ renegotiate_contract(&request.val.renegotiate_contract.new_contract,
+ request.val.renegotiate_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+ goto answer_and_close;
+ }
+ case FRSH_MT_REQUEST_CONTRACT_RENEGOTIATION:
+ {
+ syslog(LOG_DEBUG, "operation requested: REQUEST_CONTRACT_RENEGOTIATION");
+ close(conn);
+ syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
+ syslog(LOG_INFO, "connection closed");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error =
+ renegotiate_contract(&request.val.request_contract_renegotiation.new_contract,
+ request.val.request_contract_renegotiation.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
+ /* signals the completion of the operation with a signal */
+ if (request.val.request_contract_renegotiation.signal != FRSH_NULL_SIGNAL)
+ if (fosa_signal_queue(request.val.request_contract_renegotiation.signal,
+ request.val.request_contract_renegotiation.siginfo,
+ request.val.request_contract_renegotiation.thread_to_signal) != 0)
+ syslog(LOG_ERR,
+ "can't signal process %d with signal %d to notify the completion of the renegotiation",
+ request.val.request_contract_renegotiation.thread_to_signal.linux_pid,
+ request.val.request_contract_renegotiation.signal);
+#endif
+ goto end_cycle;
+ }
+ case FRSH_MT_GET_RENEGOTIATION_STATUS:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_RENEGOTIATION_STATUS");
+ reply.error = get_renegotiation_status(request.val.get_renegotiation_status.vres_id,
+ &reply.val.get_renegotiation_status.renegotiation_status);
+ goto answer_and_close;
+ }
+ case FRSH_MT_NEGOTIATE_GROUP:
+ {
+ syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error =
+ negotiate_group(request.val.negotiate_group.vres_down_number,
+ request.val.negotiate_group.vres_down,
+ reply.val.negotiate_group.vres_down_status,
+ request.val.negotiate_group.contracts_up_number,
+ request.val.negotiate_group.contracts_up,
+ reply.val.negotiate_group.vres_up,
+ reply.val.negotiate_group.contracts_up_status);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+
+ //reply.error = FRSH_NO_ERROR;
+ //for (i = 0; i < request.val.negotiate_group.vres_down_number; i++)
+ // reply.error = reply.val.negotiate_group.vres_down_status[i] =
+ // cancel_contract(request.val.negotiate_group.vres_down[i]);
+ //for (i = 0; i < request.val.negotiate_group.contracts_up_number; i++)
+ // reply.error = reply.val.negotiate_group.contracts_up_status[i] =
+ // negotiate_contract(&request.val.negotiate_group.contracts_up[i],
+ // &reply.val.negotiate_group.vres_up[i]);
+
+ goto answer_and_close;
+ }
+ case FRSH_MT_CHANGE_MODE_SYNC:
+ case FRSH_MT_CHANGE_MODE_ASYNC:
+ {
+ syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
+ reply.error = FRSH_NO_ERROR;
+ if (request.type == FRSH_MT_CHANGE_MODE_ASYNC) {
+ close(conn);
+ syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
+ syslog(LOG_INFO, "connection closed");
+
+ }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error =
+ change_mode(request.val.change_mode.vres_down_number,
+ request.val.change_mode.vres_down,
+ reply.val.change_mode.vres_down_status,
+ request.val.change_mode.contracts_touch_number,
+ request.val.change_mode.contracts_touch,
+ request.val.change_mode.vres_touch,
+ reply.val.change_mode.vres_touch_status,
+ request.val.change_mode.contracts_up_number,
+ request.val.change_mode.contracts_up,
+ reply.val.change_mode.vres_up,
+ reply.val.change_mode.contracts_up_status);
+
+ //for (i = 0; i < request.val.change_mode.vres_down_number; i++)
+ // reply.error = reply.val.change_mode.vres_down_status[i] =
+ // cancel_contract(request.val.change_mode.vres_down[i]);
+ //for (i = 0; i < request.val.change_mode.contracts_touch_number; i++)
+ // reply.error = reply.val.change_mode.vres_touch_status[i] =
+ // renegotiate_contract(&request.val.change_mode.contracts_touch[i],
+ // request.val.change_mode.vres_touch[i]);
+ //for (i = 0; i < request.val.change_mode.contracts_up_number; i++)
+ // reply.error = reply.val.change_mode.contracts_up_status[i] =
+ // negotiate_contract(&request.val.change_mode.contracts_up[i],
+ // &reply.val.change_mode.vres_up[i]);
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
+ /* signals the completion of the operation with a signal */
+ if (request.val.change_mode.signal != FRSH_NULL_SIGNAL)
+ if (fosa_signal_queue(request.val.change_mode.signal,
+ request.val.change_mode.siginfo,
+ request.val.change_mode.thread_to_signal) != 0)
+ syslog(LOG_ERR,
+ "can't signal process %d with signal %d to notify the completion of the renegotiation",
+ request.val.change_mode.thread_to_signal.linux_pid,
+ request.val.change_mode.signal);
+#endif
+ if (request.type == FRSH_MT_CHANGE_MODE_SYNC)
+ goto answer_and_close;
+ else
+ goto end_cycle;
+ }
+ case FRSH_MT_GET_CPUTIME:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_CPUTIME");
+ reply.error = get_cputime(request.val.get_cputime.vres_id,
+ &reply.val.get_cputime.cputime);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_CURRENTBUDGET:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_CURRENTBUDGET");
+ reply.error = get_current_budget(request.val.get_currentbudget.vres_id,
+ &reply.val.get_currentbudget.currentbudget);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_BUDGET_AND_PERIOD:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_BUDGET_AND_PERIOD");
+ reply.error = get_budget_and_period(request.val.get_budget_and_period.vres_id,
+ &reply.val.get_budget_and_period.budget,
+ &reply.val.get_budget_and_period.period);
+ goto answer_and_close;
+ }
+ case FRSH_MT_GET_SERVICE_THREAD_DATA:
+ {
+ syslog(LOG_DEBUG, "operation requested: GET_SERVICE_THREAD_DATA");
+ reply.error = get_budget_and_period(service_th_vres_id,
+ &reply.val.get_service_thread_data.budget,
+ &reply.val.get_service_thread_data.period);
+ goto answer_and_close;
+ }
+ case FRSH_MT_SET_SERVICE_THREAD_DATA:
+ {
+ struct timespec old_budget, old_period;
+ syslog(LOG_DEBUG, "operation requested: SET_SERVICE_THREAD_DATA");
+ old_budget = service_th_contract.budget_min;
+ old_period = service_th_contract.period_max;
+ service_th_contract.budget_min =
+ request.val.set_service_thread_data.budget;
+ service_th_contract.period_max =
+ request.val.set_service_thread_data.period;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_lock(&contract_mutex);
+#endif
+ reply.error =
+ renegotiate_contract(&service_th_contract, service_th_vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+ fosa_mutex_unlock(&contract_mutex);
+#endif
+ if (reply.error == FRSH_NO_ERROR)
+ reply.val.set_service_thread_data.accepted = true;
+ else {
+ reply.val.set_service_thread_data.accepted = false;
+ service_th_contract.budget_min = old_budget;
+ service_th_contract.period_max = old_period;
+ }
+ goto answer_and_close;
+ }
+ case FRSH_MT_FEEDBACK_SET_SPARE:
+ {
+ syslog(LOG_DEBUG, "operation requested: FEEDBACK_SET_SPARE");
+ reply.error = reserve_feedback(&request.val.set_feedback_spare.spare_contract);
+ goto answer_and_close;
+ }
+ case FRSH_MT_FEEDBACK_GET_SPARE:
+ {
+ syslog(LOG_DEBUG, "operation requested: FEEDBACK_GET_SPARE");
+ if (feedback_spare_contract == NULL)
+ reply.error = FRSH_ERR_NOT_CONTRACTED_VRES;
+ else {
+ reply.val.get_feedback_spare.spare_contract = *feedback_spare_contract;
+ reply.error = FRSH_NO_ERROR;
+ }
+ goto answer_and_close;
+ }
+ default:
+ syslog(LOG_ERR, "operation requested: unknown !!");
+ reply.error = FRSH_SERVICE_TH_ERR_SOCKET;
+ goto answer_and_close;
+ }
+answer_and_close:
+ if (send(conn, (void*) &reply, sizeof(frsh_out_msg_t), 0) < sizeof(frsh_out_msg_t))
+ syslog(LOG_ERR, "can't send reply message (ERROR 0x%x)", FRSH_SERVICE_TH_ERR_SOCKET);
+ else
+ syslog(LOG_DEBUG, "command reply sent (exit status 0x%x)", reply.error);
+ close(conn);
+ syslog(LOG_INFO, "connection closed");
+end_cycle:
+ ;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+#endif /* FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD */