]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Splitted frsh_servicethe.c
authorMartin Molnar <molnam1@fel.cvut.cz>
Sun, 14 Sep 2008 14:15:23 +0000 (16:15 +0200)
committerMartin Molnar <molnam1@fel.cvut.cz>
Sun, 14 Sep 2008 14:15:23 +0000 (16:15 +0200)
frsh_aquosa/core.c
frsh_aquosa/mngr/frsh_service_th.c
frsh_aquosa/mngr/mngr.h [new file with mode: 0644]
frsh_aquosa/mngr/request.c [new file with mode: 0644]
frsh_aquosa/mngr/thread_repo.c [new file with mode: 0644]
frsh_aquosa/mngr/vres_repo.c [new file with mode: 0644]

index 08356ac5e884c98bab58b8ec33af951d04645a64..c1cfe116a8f0503166f1a135cec2a75c4a5e79a5 100644 (file)
@@ -129,12 +129,6 @@ int frsh_initialized = 0;          /* framework initialization flag */
 pid_t frsh_service_th_pid;             /* service thread pid cache */
 //#endif
 
-
-/*********************************************************/
-/* U T I L I T Y   F U N C T I O N   A N D   M A C R O S */
-/*********************************************************/
-
-
 /****************************************/
 /*     CORE API IMPLEMENTATION         */
 /****************************************/
@@ -165,8 +159,8 @@ pid_t frsh_service_th_pid;          /* service thread pid cache */
  */
 
 /* forward declarations, see below ... */
-static void frsh_scheduler_signal_handler(int signum, siginfo_t *siginfo, void *cntx);
-
+static void frsh_scheduler_signal_handler(int signum, siginfo_t *siginfo, 
+                                               void *cntx);
 static void frsh_qres_cleanup_wrapper();
 
 int frsh_aquosa_init()
index 97c67afb5d09ef62c71a60df00f9e722c7ec9a56..267c7a0cd1d34265a5df1001f97af267851c9570 100644 (file)
@@ -171,1489 +171,7 @@ frsh_contract_t *feedback_spare_contract;
 static contract_hash_table_t contract_hash_table;      /* hash table for the contracts */
 static vres_repo_t vres_repo;                          /* negotiated vres repository */
 static disk_t disk[N_DISKS];                           /* parameters for the HDs in the system */
-static thread_repo_t thread_repo;                      /* registered thread repository */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-static service_th_queue_t service_th_queue;            /* job (form clients) queueing for multithreaded implementation */
-static worker_thread_pool_t worker_pool;               /* thread pool for multithreaded implementation */
-#endif
-
-/***************************************************************************/
-/* H A R D   D I S K   H A N D L I N G   U T I L I T Y   F U N C T I O N S */
-/***************************************************************************/
-
-static bool disk_init()
-{
-       int i;
-
-       for (i = 0; i < N_DISKS; i++) {
-               disk[i].aggregate_bw = DISK_DEFAULT_AGGREGATE_BW;
-               disk[i].sched_budget = DISK_DEFAULT_SCHED_BUDGET;
-               disk[i].weight_sum = 0;
-               disk[i].thread_num = 0;
-       }
-
-       return true;
-}
-
-static inline unsigned int disk_sched_ioprio(unsigned int weight) {
-
-       if (weight == 100)
-               return 0;
-       if (weight > 14)
-               return 1;
-       return 8 - 1 - (weight / 2);
-}
-
-static inline unsigned int disk_sched_ioweight(frsh_resource_id_t disk_id, struct timespec budget, struct timespec period) {
-       float th, weight;
-       int b, w_i;
-       unsigned long int q_i, p_i;
-
-       th = disk[disk_id_2_index(disk_id)].aggregate_bw / 10E9;
-       b = disk[disk_id_2_index(disk_id)].sched_budget;
-
-       /* nsec */
-       q_i = timespec_to_usec(budget);
-       p_i = timespec_to_usec(period);
-       q_i *= 1000;
-       p_i *= 1000;
-
-       if (p_i < b / th)
-               return -1;
-
-       weight = (DISK_WEIGHT_MAX * q_i ) / (p_i - (b / th));
-       w_i = floor(weight);
-       
-       if (weight != w_i)
-               w_i++;
-
-       return w_i;
-}
-
-static inline float disk_sched_iobandwidth(frsh_resource_id_t disk_id, unsigned int weight) {
-       return (float) weight * (disk[disk_id_2_index(disk_id)].aggregate_bw / disk[disk_id_2_index(disk_id)].weight_sum);
-}
-
-
-/***********************************************************/
-/* H A S H   T A B L E   U T I L I T Y   F U N C T I O N S */
-/***********************************************************/
-
-/*
- * initialize the table
- *
- * sets all pointers of the array table of the hash table to NULL
- * and init the mutex (if a multithreaded service thread has
- * been configured)
- *
- * possible return values:
- *  true (all ok)
- *  false (something wrong with the mutex)
- */
-static bool contract_hash_init()
-{
-       int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       if (fosa_mutex_init(&contract_hash_table.mutex, 0) != 0)
-               return false;
-#endif
-       for (i = 0; i < HASH_N_ELEMENTS; i++)
-               contract_hash_table.table[i] = NULL;
-
-       return true;
-}
-
-/* 
- * hash function
- *
- * obtains the hash (int) value associated to the (string) key provided
- * simply accessing each character as an integer value and summing them all
- *
- * always returns the hash value (no room for errors!)
- */
-static unsigned int contract_hash_hash(const char *key_str)
-{
-       int i, step = CHAR_PER_INT;
-       unsigned int hash = 0;
-       unsigned int key_val = 0;
-       char *key_val_ptr = (char*) &key_val;
-
-       for (i = 0; i < strlen(key_str); i += step) {
-               strncpy(key_val_ptr, (key_str + i), step);
-               hash += key_val;
-       }
-
-       return hash % HASH_N_ELEMENTS;
-}
-
-/* 
- * search an element
- *
- * calculates the hash value, accesses the hash table element and
- * follows the linked list if needed
- * data_ptr can be used to access/modify the element data field
- *
- * possible return values:
- *  HASH_LABEL_FOUND
- *  HASH_ERR_LABEL_NOT_FOUND
- */
-static int contract_hash_find(const char *key, contract_hash_data_t **data_ptr)
-{
-       contract_hash_entry_t *curr_entry;
-       unsigned int hash_value;
-       int result;
-
-       /* hash value of the the key */
-       hash_value = contract_hash_hash(key);
-#ifdef frsh_enable_service_th_multithread
-       /* lock the table */
-       fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
-       /* start traversing the linked list (if needed) */
-       curr_entry = contract_hash_table.table[hash_value];
-       while (curr_entry != NULL) {
-               if (strncmp(key, curr_entry->contract_label, HASH_KEY_LENGTH) == 0) {
-                       /* key found! */
-                       *data_ptr = &(curr_entry->contract_data);
-                       result =  HASH_LABEL_FOUND;
-                       break;
-               }
-               /* not found yet, let's try the next element */
-               curr_entry = curr_entry->next;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the table */
-       fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * add an element
- *
- * calculates the hash function and adds the new element at the end
- * of the linked list of the hash table entry, unless one with the same
- * key value already exists
- * data_ptr can be used to access/modify the element data field
- *
- * possible return values:
- *  HASH_NO_ERROR
- *  HASH_ERR_LABEL_FOUND
- */
-static int contract_hash_add(const char *key, contract_hash_data_t **data_ptr)
-{
-       contract_hash_entry_t *new_entry, **curr_entry;
-       unsigned int hash_value;
-       int result;
-
-       /* hash value of the key */
-       hash_value = contract_hash_hash(key);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the table */
-       fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
-       /* traverse the linked list till the end
-        * (stop if the key already exists) */
-       curr_entry = &contract_hash_table.table[hash_value];
-       result = HASH_NO_ERROR;
-       while (*curr_entry != NULL) {
-               if (strncmp(key, (*curr_entry)->contract_label, HASH_KEY_LENGTH) == 0) {
-                       result = HASH_ERR_LABEL_FOUND;
-                       break;
-               }
-               curr_entry = &(*curr_entry)->next;
-       }
-       if (result == HASH_NO_ERROR) {
-               /* create the new element */
-               new_entry = (contract_hash_entry_t*) malloc(sizeof(contract_hash_entry_t));
-               strncpy(new_entry->contract_label, key, HASH_KEY_LENGTH);
-               /* add it (we're already at the end of the linked list) */
-               *data_ptr = &(new_entry->contract_data);
-               *curr_entry = new_entry;
-               new_entry->next = NULL;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the table */
-       fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * remove an element
- *
- * calculates the hash function, searches the element in the linked
- * list and remove it
- *
- * possible return values:
- *  HASH_NO_ERROR
- *  HASH_ERR_LABEL_NOT_FOUND
- */
-static int contract_hash_del(const char *key)
-{
-       contract_hash_entry_t *curr_entry, *curr_entry_prev;
-       unsigned int hash_value;
-       int result;
-
-       /* hash value of the key */
-       hash_value = contract_hash_hash(key);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the table */
-       fosa_mutex_lock(&contract_hash_table.mutex);
-#endif
-       /* traverse the list searching for the key */
-       curr_entry_prev = NULL;
-       curr_entry = contract_hash_table.table[hash_value];
-       result = HASH_ERR_LABEL_NOT_FOUND;
-       while (curr_entry != NULL) {
-               if (strncmp(key, curr_entry->contract_label, HASH_KEY_LENGTH) == 0) {
-                       /* update the list and remove the element */
-                       if (curr_entry_prev == NULL)
-                               contract_hash_table.table[hash_value] = curr_entry->next;
-                       else
-                               curr_entry_prev->next = curr_entry->next;
-                       free((void*) curr_entry);
-                       result = HASH_NO_ERROR;
-                       break;
-               }
-               curr_entry_prev = curr_entry;
-               curr_entry = curr_entry->next;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the table */
-       fosa_mutex_unlock(&contract_hash_table.mutex);
-#endif
-
-       return result;  
-}
-
-/*********************************************************************/
-/* V R E S   R E P O S I T O R Y   U T I L I T Y   F U N C T I O N S */
-/*********************************************************************/
-
-/*
- * access functions to vres repository
- *
- * some simple subsitutions macro which help keep vres_repo_t opaque and
- * improve code cleanness and look & feel (exactly as such as
- * with thread_repo_t a few lines above)
- */
-
-/* given the id of a contracted vres stored in the repository returns
- * a pointer to its entry */
-#define get_vres_entry(vres_id)                                \
- ( & ( vres_repo.repo[vres_id_2_index(vres_id)] ) )
-/* gives the actual number of thread bound to a vres */
-#define get_vres_threads_number(vres_id)                       \
- ( vres_repo.repo[vres_id_2_index(vres_id)].threads_number )
-/* gives the actual number of vres in the repository */
-#define get_vres_number()      \
- ( vres_repo.vres_number )
-/* gives the actual number of dummy vres in the repository */
-#define get_dummy_vres_number()        \
- ( vres_repo.dummy_vres_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the access to the vres repository
- * (is the index value in the correct range?) and the status of each entry
- * (is it free?, non-free?, ecc.)
- */
-
-/* are we accessing the repository with a valid index value ? */
-#define check_vres_repo_ind(ind) \
- ( (ind >= 0 && ind < 2 * FRSH_MAX_N_VRES) ? true : false )
-/* is the vres repository entry (correctly accessed and) free ? */
-#define check_vres_repo_entry_free(ind)                                \
- ( check_vres_repo_ind(ind) && ( vres_repo.repo[(ind)].processor.sid == FRSH_QRES_NOT_VALID_SID ) && ( vres_repo.repo[(ind)].disk.weight == FRSH_QRES_NOT_VALID_DISK ) )
-
-/* is the vres repository entry (correctly accessed and) non free ? */
-#define check_vres_repo_entry_nonfree(ind)                                     \
- ( check_vres_repo_ind(ind) &&                                                 \
-  ( ( vres_repo.repo[(ind)].processor.sid != FRSH_QRES_NOT_VALID_SID ) ||      \
-  ( vres_repo.repo[(ind)].disk.weight != FRSH_QRES_NOT_VALID_DISK ) ) )                \
-
-/*
- * initialize the repository
- *
- * sets all entries to invalid (that is, free) and init the mutex
- * (if a multithreaded service thread has been configured)
- *
- * possible return values:
- *  true (all ok)
- *  false (something wrong with the mutex)
- */
-static inline bool vres_repo_init()
-{
-       int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       if (fosa_mutex_init(&vres_repo.mutex, 0) != 0)
-               return false;
-#endif
-       vres_repo.vres_number = 0;
-       vres_repo.dummy_vres_number = 0;
-       for (i = 0; i < 2 * FRSH_MAX_N_VRES; i++) {
-               /* vres_repo.repo[i].contract untouched */
-               vres_repo.repo[i].processor.sid = FRSH_QRES_NOT_VALID_SID;
-               vres_repo.repo[i].processor.threads_number = 0;
-               vres_repo.repo[i].processor.first_thread_index = -1;
-               vres_repo.repo[i].disk.weight = FRSH_QRES_NOT_VALID_DISK;
-               vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
-       }
-
-       return true;
-}
-
-/*
- * add an entry
- *
- * adds an entry into the repository. If successful the index of the entry is
- * returned in vres_ind.
- *
- * We (again) "accept suggestions" and we implement this exaclty as previously
- * described for the thread repository utility functions
- *
- * possible return values:
- *  true (entry correctly added)
- *  false (entry not added, maybe no more room in the repository!)
- */
-static bool vres_repo_put(const frsh_contract_t *contract,
-       const unsigned int vres_data,
-       int *vres_ind)
-{
-       int i;
-       int start;
-       bool result;
-
-       /* depending by the contract type we must place the new entry
-        * if the "first half" (0 <= vres_ind < FRSH_MAX_N_VRES) of in the
-        * second half (FRSH_MAX_N_VRES <= vres_ind < 2*FRSH_MAX_N_VRES) of
-        * the repository */
-       switch (contract->contract_type) {
-               default:
-               case FRSH_CT_REGULAR:
-               case FRSH_CT_BACKGROUND:
-               {
-                       start = 0;
-                       break;
-               }
-               case FRSH_CT_DUMMY:
-               {
-                       start = FRSH_MAX_N_VRES;
-                       break;
-               }
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&vres_repo.mutex);
-#endif
-       /* try to allocate the vres entry in the caller provided
-        * position (if any) and, only if it's not free, scan all
-        * the repository for a different suitable place */
-       if (vres_ind != NULL &&
-                       check_vres_repo_entry_free((start + *vres_ind) % FRSH_MAX_N_VRES))
-               i = (start + *vres_ind) % FRSH_MAX_N_VRES;
-       else {
-               i = start;
-               while (i < FRSH_MAX_N_VRES && !check_vres_repo_entry_free(i))
-                       i++;
-       }
-       /* if we're sure it's valid and free we place the
-        * vres entry into the repository in position 'i' */
-       result = false;
-       if (check_vres_repo_entry_free(i)) {
-               vres_repo.repo[i].contract = *contract;
-               switch (contract->resource_type) {
-                       case FRSH_RT_PROCESSOR:
-                       {
-                               vres_repo.repo[i].processor.sid = (qres_sid_t) vres_data;
-                               vres_repo.repo[i].processor.threads_number = 0;
-                               vres_repo.repo[i].processor.first_thread_index = -1;
-                               break;
-                       }
-                       case FRSH_RT_DISK:
-                       {
-                               vres_repo.repo[i].disk.weight = (unsigned int) vres_data;
-                               break;
-                       }
-                       default:
-                               return false;
-               }
-               vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
-               if (vres_ind != NULL)
-                       *vres_ind = i;
-               if (start == 0)
-                       vres_repo.vres_number++;
-               else
-                       vres_repo.dummy_vres_number++;
-               result = true;
-       }
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * remove an entry
- *
- * sets the entry as free (if it is not) and decrement the (dummy?) vres count
- *
- * possible return values:
- *  true (all ok, entry war non-free and is now free)
- *  false (something wrong, entry was already free or index is out of renge)
- */
-static bool vres_repo_free(const int vres_ind)
-{
-       thread_repo_entry_t *curr_thread;
-       int next_thread_ind;
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&vres_repo.mutex);
-#endif
-       result = false;
-       if (check_vres_repo_entry_nonfree(vres_ind)) {
-               /* detach all the bound threads */
-               curr_thread = &thread_repo.repo[vres_repo.repo[vres_ind].processor.first_thread_index];
-               while (vres_repo.repo[vres_ind].processor.threads_number--) {
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-                       /* we need to lock the thread repository too */
-                       fosa_mutex_lock(&thread_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-                       /* if no hierarchical scheduling only one thread per vres is correct */
-                       assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
-                               vres_repo.repo[vres_ind].processor.threads_number == 0);
-                       /* no threads in DUMMY contracts */
-                       assert(vres_repo.repo[vres_ind].contract.contract_type != FRSH_CT_DUMMY);
-#endif
-                       /* only REGULAR contract type case handled since
-                        * dummy vres _always_ has no bound thread and BACKGROUND
-                        * thread need no detaching (they're not attachet to any server) */
-                       if (vres_repo.repo[vres_ind].contract.contract_type == FRSH_CT_REGULAR)
-                               /* do not care the return value since the thread
-                                * could exist no more and the call could fail
-                                * quite easily and frequently! */
-                               qres_detach_thread(vres_repo.repo[vres_ind].processor.sid,
-                                               curr_thread->thread.linux_pid,
-                                               curr_thread->thread.linux_tid);
-                       /* free the thread repository entry too */
-                       curr_thread->thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
-                       curr_thread->thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
-                       curr_thread->thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
-                       curr_thread->vres = FRSH_NOT_VALID_VRES_ID;
-                       next_thread_ind = curr_thread->next_in_vres;
-                       curr_thread->next_in_vres = -1;
-                       thread_repo.threads_number--;
-                       /* move along the chain */
-                       if (next_thread_ind != -1)
-                               curr_thread = &thread_repo.repo[next_thread_ind];
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-                       /* unlock the thread repository */
-                       fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-               }
-               /* set the repository entry as free */
-               /* vres_repo.repo[vres_id].contract untouched */
-               vres_repo.repo[vres_ind].processor.sid = FRSH_QRES_NOT_VALID_SID;
-               vres_repo.repo[vres_ind].processor.threads_number = 0;
-               vres_repo.repo[vres_ind].processor.first_thread_index = -1;
-               vres_repo.repo[vres_ind].renegotiation_status = FRSH_RS_NOT_REQUESTED;
-               if (vres_ind < FRSH_MAX_N_VRES)
-                       vres_repo.vres_number--;
-               else
-                       vres_repo.dummy_vres_number--;
-               result = true;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * an entry is free ?
- *
- * check if a specific entry of the repository can be considered free.
- * The needed check is very simple and reduces itself to one of the
- * test macro defined above... Note if a multithreaded service thread is
- * configured we do it with an atomic lock
- *
- * possible return values:
- *  true (entry is free)
- *  false (entry is not free or index is out of range)
- */
-static inline bool vres_repo_isfree(const int vres_ind)
-{
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       fosa_mutex_lock(&vres_repo.mutex);
-#endif
-       result = check_vres_repo_entry_free(vres_ind);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-       return result;
-}
-
-/*
- * an entry is "dummy" ?
- *
- * check if a specific entry of the repository (is not free!) and contains
- * a dummy vres, created as a consequence of the negotiation of a CT_DUMMY
- * contract.
- * If a multithreaded service thread is configured the needed checkins are
- * performed holding an atomic lock
- *
- * possible return values:
- *  true (entry contains a dummy vres)
- *  false (entry does not contains a dummy vres or index is out of range)
- */
-static inline bool vres_repo_isdummy(const int vres_ind)
-{
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       fosa_mutex_lock(&vres_repo.mutex);
-#endif
-       result = (check_vres_repo_ind(vres_ind) && vres_ind >= FRSH_MAX_N_VRES);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-       return result;
-}
-
-/***************************************************************************/
-/* T H R E A D S   R E P O S I T O R Y   U T I L I T Y   F U N C T I O N S */
-/***************************************************************************/
-
-/*
- * access functions to thread repository
- *
- * some simple subsitutions macro which help keep thread_repo_t opaque and
- * improve code cleanness and look & feel
- */
-
-/* given the index of a registered thread in the repository returns
- * a pointer to its entry () */
-#define get_thread_entry(thread_ind)   \
- ( & ( thread_repo.repo[(thread_ind)] ) )
-/* gives the actual number of threads in the repository */
-#define get_threads_number()   \
- ( thread_repo.threads_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the access to the thread repository
- * (is the index value in the correct range?) and the status of each entry
- * (is it free?, non-free?, equal to an external one?, ecc.)
- */
-
-/* are we accessing the repository with a valid index value ? */
-#define check_thread_repo_ind(ind)                                     \
- ( ( ind >= 0 && ind < FRSH_MAX_N_THREADS ) ? true : false )
-/* is the thread repository entry (correctly accessed and) free ? */
-#define check_thread_repo_entry_free(ind)                                              \
- ( ( ( check_thread_repo_ind(ind) ) &&                                                 \
-  thread_repo.repo[(ind)].thread.pthread_id == FRSH_NOT_VALID_THREAD_ID &&             \
-  thread_repo.repo[(ind)].thread.linux_pid == FRSH_NOT_VALID_THREAD_ID &&              \
-  thread_repo.repo[(ind)].thread.linux_tid == FRSH_NOT_VALID_THREAD_ID ) ? true : false )
-/* is the thread repository entry (correctly accessed and) non free ? */
-#define check_thread_repo_entry_nonfree(ind)                                           \
- ( ( ( check_thread_repo_ind(ind) ) &&                                                 \
-  thread_repo.repo[(ind)].thread.pthread_id != FRSH_NOT_VALID_THREAD_ID &&             \
-  thread_repo.repo[(ind)].thread.linux_pid != FRSH_NOT_VALID_THREAD_ID &&              \
-  thread_repo.repo[(ind)].thread.linux_tid != FRSH_NOT_VALID_THREAD_ID ) ? true : false )
-/* does the thread repository entry contain a thread equal to the one provided ? */
-#define check_thread_repo_entry_equal(th, th_ind)                                      \
- ( ( ( check_thread_repo_ind(th_ind) ) &&                                              \
-  ( thread_repo.repo[(th_ind)].thread.pthread_id == (th).pthread_id ) &&               \
-  ( thread_repo.repo[(th_ind)].thread.linux_pid == (th).linux_pid ) &&                 \
-  ( thread_repo.repo[(th_ind)].thread.linux_tid == (th).linux_tid ) ) ? true : false ) \
-
-/*
- * initialize the repository
- *
- * sets all entries to invalid (that is, free) and init the mutex
- * (if a multithreaded service thread has been configured)
- *
- * possible return values:
- *  true (all ok)
- *  false (something wrong with the mutex)
- */
-static inline bool thread_repo_init()
-{
-       int i;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       if (fosa_mutex_init(&thread_repo.mutex, 0) != 0)
-               return false;
-#endif
-       thread_repo.threads_number = 0;
-       for (i = 0; i < FRSH_MAX_N_THREADS; i++) {
-               thread_repo.repo[i].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[i].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[i].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[i].vres = FRSH_NOT_VALID_VRES_ID;
-               thread_repo.repo[i].next_in_vres = -1;  /* no other thread in vres */
-       }
-
-       return true;
-}
-
-/*
- * add an entry
- *
- * adds an entry into the repository. If successful the index of the entry is
- * returned in thread_ind.
- *
- * We "accept suggestions", i.e. we suppose the caller knows the location of
- * a free entry in the repository and ask us to insert the new entry exactly
- * there, at thread_ind position.
- * So we check if that location is really free and, if yes, we can add the
- * entry right there. If, on the other way, the entry is not free we need to
- * search the repository for a different place.
- *
- * Note we also keep updated the repository entry (see below) of the vres the
- * thread is bound to.
- *
- * possible return values:
- *  true (entry correctly added)
- *  false (entry not added, maybe no more room in the repository!)
- */
-static bool thread_repo_put(const frsh_thread_id_t *thread,
-       const frsh_vres_id_t vres_id,
-       int *thread_ind)
-{
-       int i;
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&thread_repo.mutex);
-#endif
-       /* try to place the new entry in the caller provided position (if any)
-        * and, only if it's not free, scan all the repository for
-        * a different and suitable place */
-       if (thread_ind != NULL &&
-                       check_thread_repo_entry_free(*thread_ind % FRSH_MAX_N_THREADS))
-               i = *thread_ind % FRSH_MAX_N_THREADS;
-       else {
-               i = 0;
-               while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_free(i))
-                       i++;
-       }
-       /* if we're sure it's valid and free we place the
-        * thread entry into the repository in position 'i' */
-       result = false;
-       if (check_thread_repo_entry_free(i)) {
-               /* update the provided vres repository entry */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-               /* we need to lock the vres repository too */
-               fosa_mutex_lock(&vres_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
-                       get_vres_entry(vres_id)->processor.threads_number == 0);
-#endif
-               get_vres_entry(vres_id)->processor.threads_number++;
-               /* keep updated the chain of thread of the vres repository entry */
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
-                       get_vres_entry(vres_id)->processor.first_thread_index == -1);
-#endif
-               thread_repo.repo[i].next_in_vres = get_vres_entry(vres_id)->processor.first_thread_index;
-               get_vres_entry(vres_id)->processor.first_thread_index = i;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-               /* unlock the vres repository */
-               fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-               thread_repo.repo[i].thread = *thread;
-               thread_repo.repo[i].vres = vres_id;
-               if (thread_ind != NULL)
-                       *thread_ind = i;
-               thread_repo.threads_number++;
-               result = true;
-       }
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * remove an entry
- *
- * sets the entry as free (if it is not) and decrement the thread count.
- *
- * Note we also keep updated the repository entry (see below) of the vres the
- * thread is bound to.
- *
- * possible return values:
- *  true (all ok, entry war non-free and is now free)
- *  false (something wrong, entry was already free or index is out of renge)
- */
-static bool thread_repo_free(const int thread_ind)
-{
-       vres_repo_entry_t *thread_vres;
-       thread_repo_entry_t *thread_p;
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&thread_repo.mutex);
-#endif
-       result = false;
-       if (check_thread_repo_entry_nonfree(thread_ind)) {
-               thread_p = &thread_repo.repo[thread_ind];
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-               assert(!vres_repo_isfree(vres_id_2_index(thread_p->vres)));
-#endif
-               thread_vres = get_vres_entry(thread_p->vres);
-               /* update the bound vres repository entry */
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-               /* we need to lock the vres repository too */
-               fosa_mutex_lock(&vres_repo.mutex);
-#endif
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
-                       get_vres_entry(thread_p->vres)->processor.threads_number == 1);
-#endif
-               thread_vres->processor.threads_number--;
-               /* keep the chain of thread in the vres updated */
-               if (thread_vres->processor.first_thread_index == thread_ind)
-                       thread_vres->processor.first_thread_index = thread_p->next_in_vres;
-               else
-                       while (thread_p->next_in_vres != thread_ind)
-                               thread_p = &thread_repo.repo[thread_p->next_in_vres];
-               thread_p->next_in_vres = thread_repo.repo[thread_ind].next_in_vres;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-               /* unlock the vres repository */
-               fosa_mutex_unlock(&vres_repo.mutex);
-#endif
-               /* set the repository entry as free */
-               thread_repo.repo[thread_ind].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[thread_ind].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[thread_ind].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
-               thread_repo.repo[thread_ind].vres = FRSH_NOT_VALID_VRES_ID;
-               thread_repo.repo[thread_ind].next_in_vres = -1;
-               thread_repo.threads_number--;
-               result = true;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
-       return result;
-}
-
-/*
- * is an entry free ?
- *
- * check if a specific entry of the repository can be considered free.
- * The needed check is very simple and reduces itself to one of the
- * test macro define d above... Note if a multithreaded service thread is
- * configured we do it with an atomic lock
- *
- * possible return values:
- *  true (entry is free)
- *  false (entry is not free or index is out of range)
- */
-static bool thread_repo_isfree(const int thread_ind)
-{
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&thread_repo.mutex);
-#endif
-       result = check_thread_repo_entry_free(thread_ind);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-       return result;
-}
-
-/* 
- * search for an entry
- *
- * tries to find an entry in the repository containing a the thread descriptor
- * provided as (first) argument. If successful the index of the entry is
- * returned in thread_ind.
- *
- * We "accept suggestions" (as in the entry add function), i.e. we suppose the
- * caller knows the entry he's searching for is in the repository and its
- * index is exactly thread_ind, so we check if this is true and, if yes,
- * we're done! If, on the other way, we can't find the searched entry at
- * thread_ind position we start a complete scan of the repository
- *
- * possible return values:
- *  true (entry found)
- *  false (entry not found)
- */
-static bool thread_repo_find(const frsh_thread_id_t *thread, int *thread_ind)
-{
-       int i;
-       bool result;
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* lock the repository */
-       fosa_mutex_lock(&thread_repo.mutex);
-#endif
-       /* check if the caller provides us (we hope _intentionally_!) the
-        * correct position in the repository of the entry he's searching */
-       if (thread_ind != NULL &&
-                       check_thread_repo_entry_equal(*thread, (*thread_ind) % FRSH_MAX_N_THREADS))
-               /* if yes we've finished our search before starting it! */
-               i = (*thread_ind) % FRSH_MAX_N_THREADS;
-       else {
-               /* if no we scan the repository till we find the entry (or till its end) */
-               i = 0;
-               while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_equal(*thread, i))
-                       i++;
-       }
-       /* check again and definitively decide the entry is in the repository or not */
-       result = false;
-       if (check_thread_repo_entry_equal(*thread, i)) {
-               if (thread_ind != NULL)
-                       *thread_ind = i;
-               result = true;
-       }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-       /* unlock the repository */
-       fosa_mutex_unlock(&thread_repo.mutex);
-#endif
-
-       return result;
-}
-
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
-
-/***********************************************************************************************/
-/* S E R V I C E   T H R E A D   R E Q U E S T   Q U E U E   U T I L I T Y   F U N C T I O N S */
-/***********************************************************************************************/
-
-/*
- * access functions to service thread jobs queue
- */
-
-/* gives the actual number of queued jobs */
-#define get_queued_jobs_number()       \
- ( service_th_queue.jobs_number )
-
-/*
- * consistency and status checks
- *
- * common used checkings and tests about the accesses to the queue (is the
- * index value in the correct range?), the status of each entry (is it free?,
- * non-free?) and of the whole queue (is it full?, is it empty?)
- */
-
-/* are we accessing the queue with a valid index value ? */
-#define check_service_th_queue_ind(ind) \
- ( (ind >=0 && ind < FRSH_MAX_N_SERVICE_JOBS) ? true : false )
-/* is the queue element (correctly accessed and) free ? */
-#define check_service_th_queue_isfree(ind) \
- ( ((check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 1) ? true : false )
-/* is the queue element (correctly accessed and) non free ? */
-#define check_service_th_queue_nonfree(ind) \
- ( (check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 0) ? true : false )
-/* is the queue still empty ? */
-#define check_service_th_queue_empty() \
- ( (service_th_queue.jobs_number == 0) ? true : false )
-/* is the queue already full ? */
-#define check_service_th_queue_full() \
- ( (service_th_queue.jobs_number >= FRSH_MAX_N_SERVICE_JOBS) ? true: false )
-
-/*
- * initialize the queue
- *
- * sets all elements to free and init the mutex
- *
- * possible return values:
- *  true (all ok)
- *  false (something wrong with the mutex)
- */
-static inline bool service_th_queue_init()
-{
-       int i;
-
-       service_th_queue.jobs_number = 0;
-       service_th_queue.first = service_th_queue.last = 0;
-       if (fosa_mutex_init(&service_th_queue.mutex, 0))
-               return false;
-       for (i = 0; i < FRSH_MAX_N_SERVICE_JOBS; i++) {
-               service_th_queue.queue[i].conn = 0;
-               service_th_queue.queue[i].valid = 0;
-       }
-
-       return true;
-}
-
-/*
- * add an element into the queue
- *
- * if possible (there is enough free space) adds a new element into the queue
- * according to the FIFO policy (so the new element it's inserted after the
- * actual last one).
- * Note all is done while holding an atomic lock on the whole queue
- *
- * possible return values:
- *  true (all ok, element added)
- *  false (element not added, no more room in the queue!)
- */
-static bool enqueue_service_th_request(int conn, frsh_in_msg_t *request)
-{
-       int last;
-
-       /* lock the queue */
-       fosa_mutex_lock(&service_th_queue.mutex);
-       if (check_service_th_queue_full()) {
-               fosa_mutex_unlock(&service_th_queue.mutex);
-               return false;
-       }
-       last = service_th_queue.last;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-       /* if we need to be sure the choosen element is really free... */
-       assert(service_th_queue.queue[last].valid == 0);
-#endif
-       /* fill the new element with provided data and mark it as no longer free */
-       service_th_queue.queue[last].msg = *request;
-       service_th_queue.queue[last].conn = conn;
-       service_th_queue.queue[last].valid = 1;
-       /* update queue (last) pointer and counter */
-       service_th_queue.last = (service_th_queue.last + 1) % FRSH_MAX_N_SERVICE_JOBS;
-       service_th_queue.jobs_number++;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-       /* if we need to be sure there was at least a free element in the queue... */
-       assert(service_th_queue.jobs_number > 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
-#endif
-       /* unlock the queue */
-       fosa_mutex_unlock(&service_th_queue.mutex);
-
-       return true;
-}
-
-/*
- * remove an element from the queue
- *
- * if possible (there is at least one element) remove an (the first, FIFO
- * policy) element from the queue.
- * Note all is done while holding an atomic lock on the whole queue
- *
- * possible return values:
- *  true (all ok, element added)
- *  false (element not added, no more room in the queue!)
- */
-static bool dequeue_service_th_request(int *conn, frsh_in_msg_t *request)
-{
-       int first;
-
-       /* lock the queue */
-       fosa_mutex_lock(&service_th_queue.mutex);
-       if (check_service_th_queue_empty()) {
-               fosa_mutex_unlock(&service_th_queue.mutex);
-               return false;
-       }
-       first = service_th_queue.first;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-       /* if we need to be sure the element is really not free... */
-       assert(service_th_queue.queue[first].valid == 1);
-#endif
-       /* provide the user for the element datas and mark it as again free */
-       *request = service_th_queue.queue[first].msg;
-       *conn = service_th_queue.queue[first].conn;
-       service_th_queue.queue[first].valid = 0;
-       /* update queue (first) pointer and counter */
-       service_th_queue.first = (service_th_queue.first + 1) % FRSH_MAX_N_SERVICE_JOBS;
-       service_th_queue.jobs_number--;
-#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
-       /* if we need to be sure there was at least one element in the queue... */
-       assert(service_th_queue.jobs_number >= 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
-       /* if we need to be sure the pointers have always been correctly updated... */
-       if (!check_service_th_queue_empty())
-               assert(service_th_queue.first != service_th_queue.last);
-       else
-               assert(service_th_queue.first == service_th_queue.last);
-#endif
-       /* unlock the queue */
-       fosa_mutex_unlock(&service_th_queue.mutex);
-
-       return true;
-}
-
-/***************************************************************************************************/
-/* W O R K E R   T H R E A D S   I N I T I A L I Z A T I O N   A N D   M A I N   F U N C T I O N S */
-/***************************************************************************************************/
-
-/*
- * access functions to worker thread pool
- */
-
-/* gives the actual number of active threads in the pool */
-#define get_active_thread_number()     \
- ( worker_pool.active_thread_number )
-/* gives the actual number of busy threads in the pool */
-#define get_busy_thread_number()       \
- ( worker_pool.busy_thread_number )
-/* gives the actual number of sleeping threads in the pool */
-#define get_sleeping_thread_number()   \
- ( worker_pool.active_thread_number - worker_pool.busy_thread_number )
-
-/* 
- * FRSH utility function prototypes
- *
- * declared here to be visible to the worker thread code, see each definition
- * (below in the code) for specific infos about each one
- */
-static int negotiate_contract(const frsh_contract_t *contract, frsh_vres_id_t *vres);
-
-static int bind_thread(const frsh_thread_id_t *thread,const frsh_vres_id_t vres_id);
-
-static int unbind_thread(const frsh_thread_id_t *thread);
-
-static int get_thread_vres_id(const frsh_thread_id_t *thread, frsh_vres_id_t *vres);
-
-static int get_contract(const frsh_vres_id_t vres_id, frsh_contract_t *contract);
-
-static int get_label_vres_id(const frsh_contract_label_t contract_label,frsh_vres_id_t *vres);
-
-static int cancel_contract(const frsh_vres_id_t vres);
-
-static int renegotiate_contract(const frsh_contract_t *new_contract, const frsh_vres_id_t vres_id);
-
-static int get_renegotiation_status(const frsh_vres_id_t vres_id,frsh_renegotiation_status_t *renegotiation_status);
-
-static int negotiate_group(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
-                               int contracts_up_number, frsh_contract_t contract_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
-
-static int change_mode(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
-                       int contracts_touch_number, frsh_contract_t contracts_touch[], frsh_vres_id_t vres_touch[], int vres_touch_status[],
-                       int contracts_up_number, frsh_contract_t contracts_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
-
-static int get_cputime(const frsh_vres_id_t vres_id, struct timespec *cputime);
-
-static int get_current_budget(const frsh_vres_id_t vres_id, struct timespec *current_budget);
-
-static int get_budget_and_period(const frsh_vres_id_t vres_id,struct timespec *budget,struct timespec *period);
-
-static int reserve_feedback(frsh_contract_t *spare_contract);
-
-/*
- * thread pool initialization
- *
- * initialize the worker thread pool (a couple of counters, a mutex and a
- * condition variable) and create the minimum number of worker thread
- * configured
- *
- * possible return values:
- *  true (all ok, pool initialized and all thread created)
- *  false (something wrong in mutex or condition initialization or in thread creation)
- */
-
-static void* worker_thread_code();
-
-static inline bool worker_thread_pool_init()
-{
-       int i;
-       frsh_thread_id_t new_thread;
-
-       worker_pool.active_thread_number = 0;
-       worker_pool.busy_thread_number = 0;
-       if (fosa_mutex_init(&worker_pool.mutex, 0) != 0)
-               return false;
-       if (fosa_cond_init(&worker_pool.sleep) != 0)
-               return false;
-       for (i = 0; i < MIN_N_WORKER_THREAD; i++)
-               if (fosa_thread_create(&new_thread, NULL, worker_thread_code, NULL) != 0)
-                       return false;
-
-       return true;
-}
-
-/*
- * worker thread code
- *
- * code executed by all the worker thread activated by the service thread to
- * effectively serve the client(s) requests.
- * Basically we check if there are queued requests and if the service thread
- * is useful or, if not, it has to die.
- * If "in service" a worker thread processes a request from a client and sends
- * back the results (if requested) then check again if it's time to sleep or
- * to die!
- *
- * possible exit status:
- *  EXIT_SUCCESS (normal exit, the thread is no longer usefull)
- *  EXIT_FAILURE (abnormal exit, something has gone severely wrong!)
- */
-static void* worker_thread_code(void *p)
-{
-       frsh_in_msg_t request;
-       frsh_out_msg_t reply;
-       int conn;
-       frsh_thread_id_t thread_self;
-
-       /* lock the pool and say everyone we're a new
-        * living (active) and working (busy) thread */
-       fosa_mutex_lock(&worker_pool.mutex);
-       worker_pool.active_thread_number++;
-       worker_pool.busy_thread_number++;
-       /* done, unlock the pool */
-       fosa_mutex_unlock(&worker_pool.mutex);
-       thread_self = fosa_thread_self();
-       /* directly attach ourself to the service contract vres
-        * (no need fo negotiation or any other FRSH operation!)*/
-       if (qres_attach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
-                       thread_self.linux_pid,
-                       thread_self.linux_tid) != QOS_OK) {
-               syslog(LOG_ERR, "can't attach the worker thread to the vres");
-
-               return (void*)EXIT_FAILURE;
-       }
-       /* main worker thread cycle */
-       while (1) {
-               /* check if there are new requests to serve */
-               while (!dequeue_service_th_request(&conn, &request)) {
-                       fosa_mutex_lock(&worker_pool.mutex);
-                       /* sleeping or exiting depends on how many worker
-                        * thread the system has been configured to keep alive */
-                       if (worker_pool.active_thread_number <= MIN_N_WORKER_THREAD) {
-                               /* we're still alive, we only go to sleep */
-                               worker_pool.busy_thread_number--;
-                               fosa_cond_wait(&worker_pool.sleep, &worker_pool.mutex);
-                               /* the service thread wake up us,
-                                * go and check again for queued requests */
-                               worker_pool.busy_thread_number++;
-                               fosa_mutex_unlock(&worker_pool.mutex);
-                       } else {
-                               /* our work is no longer needed, we're dying! */
-                               worker_pool.busy_thread_number--;
-                               worker_pool.active_thread_number--;
-                               fosa_mutex_unlock(&worker_pool.mutex);
-                               if (qres_detach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
-                                               thread_self.linux_pid,
-                                               thread_self.linux_tid) != QOS_OK) {
-                                       syslog(LOG_ERR, "can't detach the worker thread from the vres");
-
-                                       return (void*)EXIT_FAILURE;
-                               }
-
-                               return (void*)EXIT_SUCCESS;
-                       }
-               }
-               switch (request.type) {
-                       /* process client(s) requests according to the operation and
-                        * (simply?) colling one (or a little more) of the FRSH utility
-                        * function defined down in the code */
-                       case FRSH_MT_NEGOTIATE_CONTRACT:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error = negotiate_contract(&request.val.negotiate_contract.contract,
-                                       &reply.val.negotiate_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_BIND_THREAD:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: BIND_THREAD");
-                               reply.error = bind_thread(&request.val.bind_thread.thread_id,
-                                       request.val.bind_thread.vres_id);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_UNBIND_THREAD:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: UNBIND_THREAD");
-                               reply.error = unbind_thread(&request.val.unbind_thread.thread_id);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_THREAD_VRES_ID:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_THREAD_VRES_ID");
-                               reply.error = get_thread_vres_id(&request.val.get_thread_vres_id.thread_id,
-                                       &reply.val.get_thread_vres_id.vres_id);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_CONTRACT:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_CONTRACT");
-                               reply.error = get_contract(request.val.get_contract.vres_id,
-                                       &reply.val.get_contract.contract);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_LABEL_VRES_ID:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_LABEL_VRES_ID");
-                               reply.error = get_label_vres_id(request.val.get_label_vres_id.contract_label,
-                                       &reply.val.get_label_vres_id.vres_id);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_CANCEL_CONTRACT:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: CANCEL_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error = cancel_contract(request.val.cancel_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_RENEGOTIATE_CONTRACT:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: RENEGOTIATE_CONTRACT");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error =
-                                       renegotiate_contract(&request.val.renegotiate_contract.new_contract,
-                                               request.val.renegotiate_contract.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_REQUEST_CONTRACT_RENEGOTIATION:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: REQUEST_CONTRACT_RENEGOTIATION");
-                               close(conn);
-                               syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
-                               syslog(LOG_INFO, "connection closed");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error =
-                                       renegotiate_contract(&request.val.request_contract_renegotiation.new_contract,
-                                               request.val.request_contract_renegotiation.vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
-                               /* signals the completion of the operation with a signal */
-                               if (request.val.request_contract_renegotiation.signal != FRSH_NULL_SIGNAL)
-                                       if (fosa_signal_queue(request.val.request_contract_renegotiation.signal,
-                                                       request.val.request_contract_renegotiation.siginfo,
-                                                       request.val.request_contract_renegotiation.thread_to_signal) != 0)
-                                               syslog(LOG_ERR,
-                                                       "can't signal process %d with signal %d to notify the completion of the renegotiation",
-                                                       request.val.request_contract_renegotiation.thread_to_signal.linux_pid,
-                                                       request.val.request_contract_renegotiation.signal);
-#endif
-                               goto end_cycle;
-                       }
-                       case FRSH_MT_GET_RENEGOTIATION_STATUS:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_RENEGOTIATION_STATUS");
-                               reply.error = get_renegotiation_status(request.val.get_renegotiation_status.vres_id,
-                                       &reply.val.get_renegotiation_status.renegotiation_status);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_NEGOTIATE_GROUP:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error =
-                                       negotiate_group(request.val.negotiate_group.vres_down_number,
-                                                       request.val.negotiate_group.vres_down,
-                                                       reply.val.negotiate_group.vres_down_status,
-                                                       request.val.negotiate_group.contracts_up_number,
-                                                       request.val.negotiate_group.contracts_up,
-                                                       reply.val.negotiate_group.vres_up,
-                                                       reply.val.negotiate_group.contracts_up_status);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-
-                               //reply.error = FRSH_NO_ERROR;
-                               //for (i = 0; i < request.val.negotiate_group.vres_down_number; i++)
-                               //      reply.error = reply.val.negotiate_group.vres_down_status[i] =
-                               //                      cancel_contract(request.val.negotiate_group.vres_down[i]);
-                               //for (i = 0; i < request.val.negotiate_group.contracts_up_number; i++)
-                               //      reply.error = reply.val.negotiate_group.contracts_up_status[i] =
-                               //                      negotiate_contract(&request.val.negotiate_group.contracts_up[i],
-                               //                                      &reply.val.negotiate_group.vres_up[i]);
-
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_CHANGE_MODE_SYNC:
-                       case FRSH_MT_CHANGE_MODE_ASYNC:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
-                               reply.error = FRSH_NO_ERROR;
-                               if (request.type == FRSH_MT_CHANGE_MODE_ASYNC) {
-                                       close(conn);
-                                       syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
-                                       syslog(LOG_INFO, "connection closed");
-
-                               }
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error =
-                                       change_mode(request.val.change_mode.vres_down_number,
-                                                       request.val.change_mode.vres_down,
-                                                       reply.val.change_mode.vres_down_status,
-                                                       request.val.change_mode.contracts_touch_number,
-                                                       request.val.change_mode.contracts_touch,
-                                                       request.val.change_mode.vres_touch,
-                                                       reply.val.change_mode.vres_touch_status,
-                                                       request.val.change_mode.contracts_up_number,
-                                                       request.val.change_mode.contracts_up,
-                                                       reply.val.change_mode.vres_up,
-                                                       reply.val.change_mode.contracts_up_status);
-
-                               //for (i = 0; i < request.val.change_mode.vres_down_number; i++)
-                               //      reply.error = reply.val.change_mode.vres_down_status[i] =
-                               //                      cancel_contract(request.val.change_mode.vres_down[i]);
-                               //for (i = 0; i < request.val.change_mode.contracts_touch_number; i++)
-                               //      reply.error = reply.val.change_mode.vres_touch_status[i] =
-                               //                      renegotiate_contract(&request.val.change_mode.contracts_touch[i],
-                               //                                      request.val.change_mode.vres_touch[i]);
-                               //for (i = 0; i < request.val.change_mode.contracts_up_number; i++)
-                               //      reply.error = reply.val.change_mode.contracts_up_status[i] =
-                               //                      negotiate_contract(&request.val.change_mode.contracts_up[i],
-                               //                                      &reply.val.change_mode.vres_up[i]);
-
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
-                               /* signals the completion of the operation with a signal */
-                               if (request.val.change_mode.signal != FRSH_NULL_SIGNAL)
-                                       if (fosa_signal_queue(request.val.change_mode.signal,
-                                                       request.val.change_mode.siginfo,
-                                                       request.val.change_mode.thread_to_signal) != 0)
-                                               syslog(LOG_ERR,
-                                                       "can't signal process %d with signal %d to notify the completion of the renegotiation",
-                                                       request.val.change_mode.thread_to_signal.linux_pid,
-                                                       request.val.change_mode.signal);
-#endif
-                               if (request.type == FRSH_MT_CHANGE_MODE_SYNC)
-                                       goto answer_and_close;
-                               else
-                                       goto end_cycle;
-                       }
-                       case FRSH_MT_GET_CPUTIME:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_CPUTIME");
-                               reply.error = get_cputime(request.val.get_cputime.vres_id,
-                                       &reply.val.get_cputime.cputime);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_CURRENTBUDGET:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_CURRENTBUDGET");
-                               reply.error = get_current_budget(request.val.get_currentbudget.vres_id,
-                                       &reply.val.get_currentbudget.currentbudget);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_BUDGET_AND_PERIOD:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_BUDGET_AND_PERIOD");
-                               reply.error = get_budget_and_period(request.val.get_budget_and_period.vres_id,
-                                       &reply.val.get_budget_and_period.budget,
-                                       &reply.val.get_budget_and_period.period);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_GET_SERVICE_THREAD_DATA:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: GET_SERVICE_THREAD_DATA");
-                               reply.error = get_budget_and_period(service_th_vres_id,
-                                       &reply.val.get_service_thread_data.budget,
-                                       &reply.val.get_service_thread_data.period);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_SET_SERVICE_THREAD_DATA:
-                       {
-                               struct timespec old_budget, old_period;
-                                       syslog(LOG_DEBUG, "operation requested: SET_SERVICE_THREAD_DATA");
-                               old_budget = service_th_contract.budget_min;
-                               old_period = service_th_contract.period_max;
-                               service_th_contract.budget_min =
-                                       request.val.set_service_thread_data.budget;
-                               service_th_contract.period_max =
-                                       request.val.set_service_thread_data.period;
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_lock(&contract_mutex);
-#endif
-                               reply.error =
-                                       renegotiate_contract(&service_th_contract, service_th_vres_id);
-#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
-                               fosa_mutex_unlock(&contract_mutex);
-#endif
-                               if (reply.error == FRSH_NO_ERROR)
-                                       reply.val.set_service_thread_data.accepted = true;
-                               else {
-                                       reply.val.set_service_thread_data.accepted = false;
-                                       service_th_contract.budget_min = old_budget;
-                                       service_th_contract.period_max = old_period;
-                               }
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_FEEDBACK_SET_SPARE:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: FEEDBACK_SET_SPARE");
-                               reply.error = reserve_feedback(&request.val.set_feedback_spare.spare_contract);
-                               goto answer_and_close;
-                       }
-                       case FRSH_MT_FEEDBACK_GET_SPARE:
-                       {
-                               syslog(LOG_DEBUG, "operation requested: FEEDBACK_GET_SPARE");
-                               if (feedback_spare_contract == NULL)
-                                       reply.error = FRSH_ERR_NOT_CONTRACTED_VRES;
-                               else {
-                                       reply.val.get_feedback_spare.spare_contract = *feedback_spare_contract;
-                                       reply.error = FRSH_NO_ERROR;
-                               }       
-                               goto answer_and_close;
-                       }
-                       default:
-                               syslog(LOG_ERR, "operation requested: unknown !!");
-                               reply.error = FRSH_SERVICE_TH_ERR_SOCKET;
-                               goto answer_and_close;
-               }
-answer_and_close:
-               if (send(conn, (void*) &reply, sizeof(frsh_out_msg_t), 0) < sizeof(frsh_out_msg_t))
-                       syslog(LOG_ERR, "can't send reply message (ERROR 0x%x)", FRSH_SERVICE_TH_ERR_SOCKET);
-               else
-                       syslog(LOG_DEBUG, "command reply sent (exit status 0x%x)", reply.error);
-               close(conn);
-               syslog(LOG_INFO, "connection closed");
-end_cycle:
-               ;
-       }
-
-       return EXIT_SUCCESS;
-}
 
-#endif         /* FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD */
 
 /***********************************************/
 /* F R S H   U T I L I T Y   F U N C T I O N S */
diff --git a/frsh_aquosa/mngr/mngr.h b/frsh_aquosa/mngr/mngr.h
new file mode 100644 (file)
index 0000000..6fe0923
--- /dev/null
@@ -0,0 +1,60 @@
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+/***************************************************************************************************/
+/* W O R K E R   T H R E A D S   I N I T I A L I Z A T I O N   A N D   M A I N   F U N C T I O N S */
+/***************************************************************************************************/
+
+/*
+ * access functions to worker thread pool
+ */
+
+/* gives the actual number of active threads in the pool */
+#define get_active_thread_number()     \
+ ( worker_pool.active_thread_number )
+/* gives the actual number of busy threads in the pool */
+#define get_busy_thread_number()       \
+ ( worker_pool.busy_thread_number )
+/* gives the actual number of sleeping threads in the pool */
+#define get_sleeping_thread_number()   \
+ ( worker_pool.active_thread_number - worker_pool.busy_thread_number )
+
+/* 
+ * FRSH utility function prototypes
+ *
+ * declared here to be visible to the worker thread code, see each definition
+ * (below in the code) for specific infos about each one
+ */
+static int negotiate_contract(const frsh_contract_t *contract, frsh_vres_id_t *vres);
+
+static int bind_thread(const frsh_thread_id_t *thread,const frsh_vres_id_t vres_id);
+
+static int unbind_thread(const frsh_thread_id_t *thread);
+
+static int get_thread_vres_id(const frsh_thread_id_t *thread, frsh_vres_id_t *vres);
+
+static int get_contract(const frsh_vres_id_t vres_id, frsh_contract_t *contract);
+
+static int get_label_vres_id(const frsh_contract_label_t contract_label,frsh_vres_id_t *vres);
+
+static int cancel_contract(const frsh_vres_id_t vres);
+
+static int renegotiate_contract(const frsh_contract_t *new_contract, const frsh_vres_id_t vres_id);
+
+static int get_renegotiation_status(const frsh_vres_id_t vres_id,frsh_renegotiation_status_t *renegotiation_status);
+
+static int negotiate_group(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
+                               int contracts_up_number, frsh_contract_t contract_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
+
+static int change_mode(int vres_down_number, frsh_vres_id_t vres_down[], int vres_down_status[],
+                       int contracts_touch_number, frsh_contract_t contracts_touch[], frsh_vres_id_t vres_touch[], int vres_touch_status[],
+                       int contracts_up_number, frsh_contract_t contracts_up[], frsh_vres_id_t vres_up[], int contracts_up_status[]);
+
+static int get_cputime(const frsh_vres_id_t vres_id, struct timespec *cputime);
+
+static int get_current_budget(const frsh_vres_id_t vres_id, struct timespec *current_budget);
+
+static int get_budget_and_period(const frsh_vres_id_t vres_id,struct timespec *budget,struct timespec *period);
+
+static int reserve_feedback(frsh_contract_t *spare_contract);
+
+#endif
diff --git a/frsh_aquosa/mngr/request.c b/frsh_aquosa/mngr/request.c
new file mode 100644 (file)
index 0000000..7d125ab
--- /dev/null
@@ -0,0 +1,162 @@
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+static service_th_queue_t service_th_queue;            /* job (form clients) queueing for multithreaded implementation */
+#endif
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+
+/***********************************************************************************************/
+/* S E R V I C E   T H R E A D   R E Q U E S T   Q U E U E   U T I L I T Y   F U N C T I O N S */
+/***********************************************************************************************/
+
+/*
+ * access functions to service thread jobs queue
+ */
+
+/* gives the actual number of queued jobs */
+#define get_queued_jobs_number()       \
+ ( service_th_queue.jobs_number )
+
+/*
+ * consistency and status checks
+ *
+ * common used checkings and tests about the accesses to the queue (is the
+ * index value in the correct range?), the status of each entry (is it free?,
+ * non-free?) and of the whole queue (is it full?, is it empty?)
+ */
+
+/* are we accessing the queue with a valid index value ? */
+#define check_service_th_queue_ind(ind) \
+ ( (ind >=0 && ind < FRSH_MAX_N_SERVICE_JOBS) ? true : false )
+/* is the queue element (correctly accessed and) free ? */
+#define check_service_th_queue_isfree(ind) \
+ ( ((check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 1) ? true : false )
+/* is the queue element (correctly accessed and) non free ? */
+#define check_service_th_queue_nonfree(ind) \
+ ( (check_service_th_queue(ind) && service_th_queue.queue[ind].valid == 0) ? true : false )
+/* is the queue still empty ? */
+#define check_service_th_queue_empty() \
+ ( (service_th_queue.jobs_number == 0) ? true : false )
+/* is the queue already full ? */
+#define check_service_th_queue_full() \
+ ( (service_th_queue.jobs_number >= FRSH_MAX_N_SERVICE_JOBS) ? true: false )
+
+/*
+ * initialize the queue
+ *
+ * sets all elements to free and init the mutex
+ *
+ * possible return values:
+ *  true (all ok)
+ *  false (something wrong with the mutex)
+ */
+static inline bool service_th_queue_init()
+{
+       int i;
+
+       service_th_queue.jobs_number = 0;
+       service_th_queue.first = service_th_queue.last = 0;
+       if (fosa_mutex_init(&service_th_queue.mutex, 0))
+               return false;
+       for (i = 0; i < FRSH_MAX_N_SERVICE_JOBS; i++) {
+               service_th_queue.queue[i].conn = 0;
+               service_th_queue.queue[i].valid = 0;
+       }
+
+       return true;
+}
+
+/*
+ * add an element into the queue
+ *
+ * if possible (there is enough free space) adds a new element into the queue
+ * according to the FIFO policy (so the new element it's inserted after the
+ * actual last one).
+ * Note all is done while holding an atomic lock on the whole queue
+ *
+ * possible return values:
+ *  true (all ok, element added)
+ *  false (element not added, no more room in the queue!)
+ */
+static bool enqueue_service_th_request(int conn, frsh_in_msg_t *request)
+{
+       int last;
+
+       /* lock the queue */
+       fosa_mutex_lock(&service_th_queue.mutex);
+       if (check_service_th_queue_full()) {
+               fosa_mutex_unlock(&service_th_queue.mutex);
+               return false;
+       }
+       last = service_th_queue.last;
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+       /* if we need to be sure the choosen element is really free... */
+       assert(service_th_queue.queue[last].valid == 0);
+#endif
+       /* fill the new element with provided data and mark it as no longer free */
+       service_th_queue.queue[last].msg = *request;
+       service_th_queue.queue[last].conn = conn;
+       service_th_queue.queue[last].valid = 1;
+       /* update queue (last) pointer and counter */
+       service_th_queue.last = (service_th_queue.last + 1) % FRSH_MAX_N_SERVICE_JOBS;
+       service_th_queue.jobs_number++;
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+       /* if we need to be sure there was at least a free element in the queue... */
+       assert(service_th_queue.jobs_number > 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
+#endif
+       /* unlock the queue */
+       fosa_mutex_unlock(&service_th_queue.mutex);
+
+       return true;
+}
+
+/*
+ * remove an element from the queue
+ *
+ * if possible (there is at least one element) remove an (the first, FIFO
+ * policy) element from the queue.
+ * Note all is done while holding an atomic lock on the whole queue
+ *
+ * possible return values:
+ *  true (all ok, element added)
+ *  false (element not added, no more room in the queue!)
+ */
+static bool dequeue_service_th_request(int *conn, frsh_in_msg_t *request)
+{
+       int first;
+
+       /* lock the queue */
+       fosa_mutex_lock(&service_th_queue.mutex);
+       if (check_service_th_queue_empty()) {
+               fosa_mutex_unlock(&service_th_queue.mutex);
+               return false;
+       }
+       first = service_th_queue.first;
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+       /* if we need to be sure the element is really not free... */
+       assert(service_th_queue.queue[first].valid == 1);
+#endif
+       /* provide the user for the element datas and mark it as again free */
+       *request = service_th_queue.queue[first].msg;
+       *conn = service_th_queue.queue[first].conn;
+       service_th_queue.queue[first].valid = 0;
+       /* update queue (first) pointer and counter */
+       service_th_queue.first = (service_th_queue.first + 1) % FRSH_MAX_N_SERVICE_JOBS;
+       service_th_queue.jobs_number--;
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+       /* if we need to be sure there was at least one element in the queue... */
+       assert(service_th_queue.jobs_number >= 0 && service_th_queue.jobs_number <= FRSH_MAX_N_SERVICE_JOBS);
+       /* if we need to be sure the pointers have always been correctly updated... */
+       if (!check_service_th_queue_empty())
+               assert(service_th_queue.first != service_th_queue.last);
+       else
+               assert(service_th_queue.first == service_th_queue.last);
+#endif
+       /* unlock the queue */
+       fosa_mutex_unlock(&service_th_queue.mutex);
+
+       return true;
+}
+#endif
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+
diff --git a/frsh_aquosa/mngr/thread_repo.c b/frsh_aquosa/mngr/thread_repo.c
new file mode 100644 (file)
index 0000000..0b999b0
--- /dev/null
@@ -0,0 +1,710 @@
+static thread_repo_t thread_repo;                      /* registered thread repository */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+static worker_thread_pool_t worker_pool;               /* thread pool for multithreaded implementation */
+#endif
+
+/***************************************************************************/
+/* T H R E A D S   R E P O S I T O R Y   U T I L I T Y   F U N C T I O N S */
+/***************************************************************************/
+
+/*
+ * access functions to thread repository
+ *
+ * some simple subsitutions macro which help keep thread_repo_t opaque and
+ * improve code cleanness and look & feel
+ */
+
+/* given the index of a registered thread in the repository returns
+ * a pointer to its entry () */
+#define get_thread_entry(thread_ind)   \
+ ( & ( thread_repo.repo[(thread_ind)] ) )
+/* gives the actual number of threads in the repository */
+#define get_threads_number()   \
+ ( thread_repo.threads_number )
+
+/*
+ * consistency and status checks
+ *
+ * common used checkings and tests about the access to the thread repository
+ * (is the index value in the correct range?) and the status of each entry
+ * (is it free?, non-free?, equal to an external one?, ecc.)
+ */
+
+/* are we accessing the repository with a valid index value ? */
+#define check_thread_repo_ind(ind)                                     \
+ ( ( ind >= 0 && ind < FRSH_MAX_N_THREADS ) ? true : false )
+/* is the thread repository entry (correctly accessed and) free ? */
+#define check_thread_repo_entry_free(ind)                                              \
+ ( ( ( check_thread_repo_ind(ind) ) &&                                                 \
+  thread_repo.repo[(ind)].thread.pthread_id == FRSH_NOT_VALID_THREAD_ID &&             \
+  thread_repo.repo[(ind)].thread.linux_pid == FRSH_NOT_VALID_THREAD_ID &&              \
+  thread_repo.repo[(ind)].thread.linux_tid == FRSH_NOT_VALID_THREAD_ID ) ? true : false )
+/* is the thread repository entry (correctly accessed and) non free ? */
+#define check_thread_repo_entry_nonfree(ind)                                           \
+ ( ( ( check_thread_repo_ind(ind) ) &&                                                 \
+  thread_repo.repo[(ind)].thread.pthread_id != FRSH_NOT_VALID_THREAD_ID &&             \
+  thread_repo.repo[(ind)].thread.linux_pid != FRSH_NOT_VALID_THREAD_ID &&              \
+  thread_repo.repo[(ind)].thread.linux_tid != FRSH_NOT_VALID_THREAD_ID ) ? true : false )
+/* does the thread repository entry contain a thread equal to the one provided ? */
+#define check_thread_repo_entry_equal(th, th_ind)                                      \
+ ( ( ( check_thread_repo_ind(th_ind) ) &&                                              \
+  ( thread_repo.repo[(th_ind)].thread.pthread_id == (th).pthread_id ) &&               \
+  ( thread_repo.repo[(th_ind)].thread.linux_pid == (th).linux_pid ) &&                 \
+  ( thread_repo.repo[(th_ind)].thread.linux_tid == (th).linux_tid ) ) ? true : false ) \
+
+/*
+ * initialize the repository
+ *
+ * sets all entries to invalid (that is, free) and init the mutex
+ * (if a multithreaded service thread has been configured)
+ *
+ * possible return values:
+ *  true (all ok)
+ *  false (something wrong with the mutex)
+ */
+static inline bool thread_repo_init()
+{
+       int i;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       if (fosa_mutex_init(&thread_repo.mutex, 0) != 0)
+               return false;
+#endif
+       thread_repo.threads_number = 0;
+       for (i = 0; i < FRSH_MAX_N_THREADS; i++) {
+               thread_repo.repo[i].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[i].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[i].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[i].vres = FRSH_NOT_VALID_VRES_ID;
+               thread_repo.repo[i].next_in_vres = -1;  /* no other thread in vres */
+       }
+
+       return true;
+}
+
+/*
+ * add an entry
+ *
+ * adds an entry into the repository. If successful the index of the entry is
+ * returned in thread_ind.
+ *
+ * We "accept suggestions", i.e. we suppose the caller knows the location of
+ * a free entry in the repository and ask us to insert the new entry exactly
+ * there, at thread_ind position.
+ * So we check if that location is really free and, if yes, we can add the
+ * entry right there. If, on the other way, the entry is not free we need to
+ * search the repository for a different place.
+ *
+ * Note we also keep updated the repository entry (see below) of the vres the
+ * thread is bound to.
+ *
+ * possible return values:
+ *  true (entry correctly added)
+ *  false (entry not added, maybe no more room in the repository!)
+ */
+static bool thread_repo_put(const frsh_thread_id_t *thread,
+       const frsh_vres_id_t vres_id,
+       int *thread_ind)
+{
+       int i;
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&thread_repo.mutex);
+#endif
+       /* try to place the new entry in the caller provided position (if any)
+        * and, only if it's not free, scan all the repository for
+        * a different and suitable place */
+       if (thread_ind != NULL &&
+                       check_thread_repo_entry_free(*thread_ind % FRSH_MAX_N_THREADS))
+               i = *thread_ind % FRSH_MAX_N_THREADS;
+       else {
+               i = 0;
+               while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_free(i))
+                       i++;
+       }
+       /* if we're sure it's valid and free we place the
+        * thread entry into the repository in position 'i' */
+       result = false;
+       if (check_thread_repo_entry_free(i)) {
+               /* update the provided vres repository entry */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+               /* we need to lock the vres repository too */
+               fosa_mutex_lock(&vres_repo.mutex);
+#endif
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+                       get_vres_entry(vres_id)->processor.threads_number == 0);
+#endif
+               get_vres_entry(vres_id)->processor.threads_number++;
+               /* keep updated the chain of thread of the vres repository entry */
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+                       get_vres_entry(vres_id)->processor.first_thread_index == -1);
+#endif
+               thread_repo.repo[i].next_in_vres = get_vres_entry(vres_id)->processor.first_thread_index;
+               get_vres_entry(vres_id)->processor.first_thread_index = i;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+               /* unlock the vres repository */
+               fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+               thread_repo.repo[i].thread = *thread;
+               thread_repo.repo[i].vres = vres_id;
+               if (thread_ind != NULL)
+                       *thread_ind = i;
+               thread_repo.threads_number++;
+               result = true;
+       }
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+       return result;
+}
+
+/*
+ * remove an entry
+ *
+ * sets the entry as free (if it is not) and decrement the thread count.
+ *
+ * Note we also keep updated the repository entry (see below) of the vres the
+ * thread is bound to.
+ *
+ * possible return values:
+ *  true (all ok, entry war non-free and is now free)
+ *  false (something wrong, entry was already free or index is out of renge)
+ */
+static bool thread_repo_free(const int thread_ind)
+{
+       vres_repo_entry_t *thread_vres;
+       thread_repo_entry_t *thread_p;
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&thread_repo.mutex);
+#endif
+       result = false;
+       if (check_thread_repo_entry_nonfree(thread_ind)) {
+               thread_p = &thread_repo.repo[thread_ind];
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+               assert(!vres_repo_isfree(vres_id_2_index(thread_p->vres)));
+#endif
+               thread_vres = get_vres_entry(thread_p->vres);
+               /* update the bound vres repository entry */
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+               /* we need to lock the vres repository too */
+               fosa_mutex_lock(&vres_repo.mutex);
+#endif
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+               assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+                       get_vres_entry(thread_p->vres)->processor.threads_number == 1);
+#endif
+               thread_vres->processor.threads_number--;
+               /* keep the chain of thread in the vres updated */
+               if (thread_vres->processor.first_thread_index == thread_ind)
+                       thread_vres->processor.first_thread_index = thread_p->next_in_vres;
+               else
+                       while (thread_p->next_in_vres != thread_ind)
+                               thread_p = &thread_repo.repo[thread_p->next_in_vres];
+               thread_p->next_in_vres = thread_repo.repo[thread_ind].next_in_vres;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+               /* unlock the vres repository */
+               fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+               /* set the repository entry as free */
+               thread_repo.repo[thread_ind].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[thread_ind].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[thread_ind].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
+               thread_repo.repo[thread_ind].vres = FRSH_NOT_VALID_VRES_ID;
+               thread_repo.repo[thread_ind].next_in_vres = -1;
+               thread_repo.threads_number--;
+               result = true;
+       }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+       return result;
+}
+
+/*
+ * is an entry free ?
+ *
+ * check if a specific entry of the repository can be considered free.
+ * The needed check is very simple and reduces itself to one of the
+ * test macro define d above... Note if a multithreaded service thread is
+ * configured we do it with an atomic lock
+ *
+ * possible return values:
+ *  true (entry is free)
+ *  false (entry is not free or index is out of range)
+ */
+static bool thread_repo_isfree(const int thread_ind)
+{
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&thread_repo.mutex);
+#endif
+       result = check_thread_repo_entry_free(thread_ind);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+       return result;
+}
+
+/* 
+ * search for an entry
+ *
+ * tries to find an entry in the repository containing a the thread descriptor
+ * provided as (first) argument. If successful the index of the entry is
+ * returned in thread_ind.
+ *
+ * We "accept suggestions" (as in the entry add function), i.e. we suppose the
+ * caller knows the entry he's searching for is in the repository and its
+ * index is exactly thread_ind, so we check if this is true and, if yes,
+ * we're done! If, on the other way, we can't find the searched entry at
+ * thread_ind position we start a complete scan of the repository
+ *
+ * possible return values:
+ *  true (entry found)
+ *  false (entry not found)
+ */
+static bool thread_repo_find(const frsh_thread_id_t *thread, int *thread_ind)
+{
+       int i;
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&thread_repo.mutex);
+#endif
+       /* check if the caller provides us (we hope _intentionally_!) the
+        * correct position in the repository of the entry he's searching */
+       if (thread_ind != NULL &&
+                       check_thread_repo_entry_equal(*thread, (*thread_ind) % FRSH_MAX_N_THREADS))
+               /* if yes we've finished our search before starting it! */
+               i = (*thread_ind) % FRSH_MAX_N_THREADS;
+       else {
+               /* if no we scan the repository till we find the entry (or till its end) */
+               i = 0;
+               while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_equal(*thread, i))
+                       i++;
+       }
+       /* check again and definitively decide the entry is in the repository or not */
+       result = false;
+       if (check_thread_repo_entry_equal(*thread, i)) {
+               if (thread_ind != NULL)
+                       *thread_ind = i;
+               result = true;
+       }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+
+       return result;
+}
+
+/*
+ * thread pool initialization
+ *
+ * initialize the worker thread pool (a couple of counters, a mutex and a
+ * condition variable) and create the minimum number of worker thread
+ * configured
+ *
+ * possible return values:
+ *  true (all ok, pool initialized and all thread created)
+ *  false (something wrong in mutex or condition initialization or in thread creation)
+ */
+
+static void* worker_thread_code();
+
+static inline bool worker_thread_pool_init()
+{
+       int i;
+       frsh_thread_id_t new_thread;
+
+       worker_pool.active_thread_number = 0;
+       worker_pool.busy_thread_number = 0;
+       if (fosa_mutex_init(&worker_pool.mutex, 0) != 0)
+               return false;
+       if (fosa_cond_init(&worker_pool.sleep) != 0)
+               return false;
+       for (i = 0; i < MIN_N_WORKER_THREAD; i++)
+               if (fosa_thread_create(&new_thread, NULL, worker_thread_code, NULL) != 0)
+                       return false;
+
+       return true;
+}
+
+/*
+ * worker thread code
+ *
+ * code executed by all the worker thread activated by the service thread to
+ * effectively serve the client(s) requests.
+ * Basically we check if there are queued requests and if the service thread
+ * is useful or, if not, it has to die.
+ * If "in service" a worker thread processes a request from a client and sends
+ * back the results (if requested) then check again if it's time to sleep or
+ * to die!
+ *
+ * possible exit status:
+ *  EXIT_SUCCESS (normal exit, the thread is no longer usefull)
+ *  EXIT_FAILURE (abnormal exit, something has gone severely wrong!)
+ */
+static void* worker_thread_code(void *p)
+{
+       frsh_in_msg_t request;
+       frsh_out_msg_t reply;
+       int conn;
+       frsh_thread_id_t thread_self;
+
+       /* lock the pool and say everyone we're a new
+        * living (active) and working (busy) thread */
+       fosa_mutex_lock(&worker_pool.mutex);
+       worker_pool.active_thread_number++;
+       worker_pool.busy_thread_number++;
+       /* done, unlock the pool */
+       fosa_mutex_unlock(&worker_pool.mutex);
+       thread_self = fosa_thread_self();
+       /* directly attach ourself to the service contract vres
+        * (no need fo negotiation or any other FRSH operation!)*/
+       if (qres_attach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
+                       thread_self.linux_pid,
+                       thread_self.linux_tid) != QOS_OK) {
+               syslog(LOG_ERR, "can't attach the worker thread to the vres");
+
+               return (void*)EXIT_FAILURE;
+       }
+       /* main worker thread cycle */
+       while (1) {
+               /* check if there are new requests to serve */
+               while (!dequeue_service_th_request(&conn, &request)) {
+                       fosa_mutex_lock(&worker_pool.mutex);
+                       /* sleeping or exiting depends on how many worker
+                        * thread the system has been configured to keep alive */
+                       if (worker_pool.active_thread_number <= MIN_N_WORKER_THREAD) {
+                               /* we're still alive, we only go to sleep */
+                               worker_pool.busy_thread_number--;
+                               fosa_cond_wait(&worker_pool.sleep, &worker_pool.mutex);
+                               /* the service thread wake up us,
+                                * go and check again for queued requests */
+                               worker_pool.busy_thread_number++;
+                               fosa_mutex_unlock(&worker_pool.mutex);
+                       } else {
+                               /* our work is no longer needed, we're dying! */
+                               worker_pool.busy_thread_number--;
+                               worker_pool.active_thread_number--;
+                               fosa_mutex_unlock(&worker_pool.mutex);
+                               if (qres_detach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
+                                               thread_self.linux_pid,
+                                               thread_self.linux_tid) != QOS_OK) {
+                                       syslog(LOG_ERR, "can't detach the worker thread from the vres");
+
+                                       return (void*)EXIT_FAILURE;
+                               }
+
+                               return (void*)EXIT_SUCCESS;
+                       }
+               }
+               switch (request.type) {
+                       /* process client(s) requests according to the operation and
+                        * (simply?) colling one (or a little more) of the FRSH utility
+                        * function defined down in the code */
+                       case FRSH_MT_NEGOTIATE_CONTRACT:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error = negotiate_contract(&request.val.negotiate_contract.contract,
+                                       &reply.val.negotiate_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_BIND_THREAD:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: BIND_THREAD");
+                               reply.error = bind_thread(&request.val.bind_thread.thread_id,
+                                       request.val.bind_thread.vres_id);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_UNBIND_THREAD:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: UNBIND_THREAD");
+                               reply.error = unbind_thread(&request.val.unbind_thread.thread_id);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_THREAD_VRES_ID:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_THREAD_VRES_ID");
+                               reply.error = get_thread_vres_id(&request.val.get_thread_vres_id.thread_id,
+                                       &reply.val.get_thread_vres_id.vres_id);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_CONTRACT:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_CONTRACT");
+                               reply.error = get_contract(request.val.get_contract.vres_id,
+                                       &reply.val.get_contract.contract);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_LABEL_VRES_ID:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_LABEL_VRES_ID");
+                               reply.error = get_label_vres_id(request.val.get_label_vres_id.contract_label,
+                                       &reply.val.get_label_vres_id.vres_id);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_CANCEL_CONTRACT:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: CANCEL_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error = cancel_contract(request.val.cancel_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_RENEGOTIATE_CONTRACT:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: RENEGOTIATE_CONTRACT");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error =
+                                       renegotiate_contract(&request.val.renegotiate_contract.new_contract,
+                                               request.val.renegotiate_contract.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_REQUEST_CONTRACT_RENEGOTIATION:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: REQUEST_CONTRACT_RENEGOTIATION");
+                               close(conn);
+                               syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
+                               syslog(LOG_INFO, "connection closed");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error =
+                                       renegotiate_contract(&request.val.request_contract_renegotiation.new_contract,
+                                               request.val.request_contract_renegotiation.vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
+                               /* signals the completion of the operation with a signal */
+                               if (request.val.request_contract_renegotiation.signal != FRSH_NULL_SIGNAL)
+                                       if (fosa_signal_queue(request.val.request_contract_renegotiation.signal,
+                                                       request.val.request_contract_renegotiation.siginfo,
+                                                       request.val.request_contract_renegotiation.thread_to_signal) != 0)
+                                               syslog(LOG_ERR,
+                                                       "can't signal process %d with signal %d to notify the completion of the renegotiation",
+                                                       request.val.request_contract_renegotiation.thread_to_signal.linux_pid,
+                                                       request.val.request_contract_renegotiation.signal);
+#endif
+                               goto end_cycle;
+                       }
+                       case FRSH_MT_GET_RENEGOTIATION_STATUS:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_RENEGOTIATION_STATUS");
+                               reply.error = get_renegotiation_status(request.val.get_renegotiation_status.vres_id,
+                                       &reply.val.get_renegotiation_status.renegotiation_status);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_NEGOTIATE_GROUP:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error =
+                                       negotiate_group(request.val.negotiate_group.vres_down_number,
+                                                       request.val.negotiate_group.vres_down,
+                                                       reply.val.negotiate_group.vres_down_status,
+                                                       request.val.negotiate_group.contracts_up_number,
+                                                       request.val.negotiate_group.contracts_up,
+                                                       reply.val.negotiate_group.vres_up,
+                                                       reply.val.negotiate_group.contracts_up_status);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+
+                               //reply.error = FRSH_NO_ERROR;
+                               //for (i = 0; i < request.val.negotiate_group.vres_down_number; i++)
+                               //      reply.error = reply.val.negotiate_group.vres_down_status[i] =
+                               //                      cancel_contract(request.val.negotiate_group.vres_down[i]);
+                               //for (i = 0; i < request.val.negotiate_group.contracts_up_number; i++)
+                               //      reply.error = reply.val.negotiate_group.contracts_up_status[i] =
+                               //                      negotiate_contract(&request.val.negotiate_group.contracts_up[i],
+                               //                                      &reply.val.negotiate_group.vres_up[i]);
+
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_CHANGE_MODE_SYNC:
+                       case FRSH_MT_CHANGE_MODE_ASYNC:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
+                               reply.error = FRSH_NO_ERROR;
+                               if (request.type == FRSH_MT_CHANGE_MODE_ASYNC) {
+                                       close(conn);
+                                       syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
+                                       syslog(LOG_INFO, "connection closed");
+
+                               }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error =
+                                       change_mode(request.val.change_mode.vres_down_number,
+                                                       request.val.change_mode.vres_down,
+                                                       reply.val.change_mode.vres_down_status,
+                                                       request.val.change_mode.contracts_touch_number,
+                                                       request.val.change_mode.contracts_touch,
+                                                       request.val.change_mode.vres_touch,
+                                                       reply.val.change_mode.vres_touch_status,
+                                                       request.val.change_mode.contracts_up_number,
+                                                       request.val.change_mode.contracts_up,
+                                                       reply.val.change_mode.vres_up,
+                                                       reply.val.change_mode.contracts_up_status);
+
+                               //for (i = 0; i < request.val.change_mode.vres_down_number; i++)
+                               //      reply.error = reply.val.change_mode.vres_down_status[i] =
+                               //                      cancel_contract(request.val.change_mode.vres_down[i]);
+                               //for (i = 0; i < request.val.change_mode.contracts_touch_number; i++)
+                               //      reply.error = reply.val.change_mode.vres_touch_status[i] =
+                               //                      renegotiate_contract(&request.val.change_mode.contracts_touch[i],
+                               //                                      request.val.change_mode.vres_touch[i]);
+                               //for (i = 0; i < request.val.change_mode.contracts_up_number; i++)
+                               //      reply.error = reply.val.change_mode.contracts_up_status[i] =
+                               //                      negotiate_contract(&request.val.change_mode.contracts_up[i],
+                               //                                      &reply.val.change_mode.vres_up[i]);
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+#ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
+                               /* signals the completion of the operation with a signal */
+                               if (request.val.change_mode.signal != FRSH_NULL_SIGNAL)
+                                       if (fosa_signal_queue(request.val.change_mode.signal,
+                                                       request.val.change_mode.siginfo,
+                                                       request.val.change_mode.thread_to_signal) != 0)
+                                               syslog(LOG_ERR,
+                                                       "can't signal process %d with signal %d to notify the completion of the renegotiation",
+                                                       request.val.change_mode.thread_to_signal.linux_pid,
+                                                       request.val.change_mode.signal);
+#endif
+                               if (request.type == FRSH_MT_CHANGE_MODE_SYNC)
+                                       goto answer_and_close;
+                               else
+                                       goto end_cycle;
+                       }
+                       case FRSH_MT_GET_CPUTIME:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_CPUTIME");
+                               reply.error = get_cputime(request.val.get_cputime.vres_id,
+                                       &reply.val.get_cputime.cputime);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_CURRENTBUDGET:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_CURRENTBUDGET");
+                               reply.error = get_current_budget(request.val.get_currentbudget.vres_id,
+                                       &reply.val.get_currentbudget.currentbudget);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_BUDGET_AND_PERIOD:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_BUDGET_AND_PERIOD");
+                               reply.error = get_budget_and_period(request.val.get_budget_and_period.vres_id,
+                                       &reply.val.get_budget_and_period.budget,
+                                       &reply.val.get_budget_and_period.period);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_GET_SERVICE_THREAD_DATA:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: GET_SERVICE_THREAD_DATA");
+                               reply.error = get_budget_and_period(service_th_vres_id,
+                                       &reply.val.get_service_thread_data.budget,
+                                       &reply.val.get_service_thread_data.period);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_SET_SERVICE_THREAD_DATA:
+                       {
+                               struct timespec old_budget, old_period;
+                                       syslog(LOG_DEBUG, "operation requested: SET_SERVICE_THREAD_DATA");
+                               old_budget = service_th_contract.budget_min;
+                               old_period = service_th_contract.period_max;
+                               service_th_contract.budget_min =
+                                       request.val.set_service_thread_data.budget;
+                               service_th_contract.period_max =
+                                       request.val.set_service_thread_data.period;
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_lock(&contract_mutex);
+#endif
+                               reply.error =
+                                       renegotiate_contract(&service_th_contract, service_th_vres_id);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
+                               fosa_mutex_unlock(&contract_mutex);
+#endif
+                               if (reply.error == FRSH_NO_ERROR)
+                                       reply.val.set_service_thread_data.accepted = true;
+                               else {
+                                       reply.val.set_service_thread_data.accepted = false;
+                                       service_th_contract.budget_min = old_budget;
+                                       service_th_contract.period_max = old_period;
+                               }
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_FEEDBACK_SET_SPARE:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: FEEDBACK_SET_SPARE");
+                               reply.error = reserve_feedback(&request.val.set_feedback_spare.spare_contract);
+                               goto answer_and_close;
+                       }
+                       case FRSH_MT_FEEDBACK_GET_SPARE:
+                       {
+                               syslog(LOG_DEBUG, "operation requested: FEEDBACK_GET_SPARE");
+                               if (feedback_spare_contract == NULL)
+                                       reply.error = FRSH_ERR_NOT_CONTRACTED_VRES;
+                               else {
+                                       reply.val.get_feedback_spare.spare_contract = *feedback_spare_contract;
+                                       reply.error = FRSH_NO_ERROR;
+                               }       
+                               goto answer_and_close;
+                       }
+                       default:
+                               syslog(LOG_ERR, "operation requested: unknown !!");
+                               reply.error = FRSH_SERVICE_TH_ERR_SOCKET;
+                               goto answer_and_close;
+               }
+answer_and_close:
+               if (send(conn, (void*) &reply, sizeof(frsh_out_msg_t), 0) < sizeof(frsh_out_msg_t))
+                       syslog(LOG_ERR, "can't send reply message (ERROR 0x%x)", FRSH_SERVICE_TH_ERR_SOCKET);
+               else
+                       syslog(LOG_DEBUG, "command reply sent (exit status 0x%x)", reply.error);
+               close(conn);
+               syslog(LOG_INFO, "connection closed");
+end_cycle:
+               ;
+       }
+
+       return EXIT_SUCCESS;
+}
+
+#endif         /* FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD */
diff --git a/frsh_aquosa/mngr/vres_repo.c b/frsh_aquosa/mngr/vres_repo.c
new file mode 100644 (file)
index 0000000..912f276
--- /dev/null
@@ -0,0 +1,307 @@
+
+
+/*********************************************************************/
+/* V R E S   R E P O S I T O R Y   U T I L I T Y   F U N C T I O N S */
+/*********************************************************************/
+
+/*
+ * access functions to vres repository
+ *
+ * some simple subsitutions macro which help keep vres_repo_t opaque and
+ * improve code cleanness and look & feel (exactly as such as
+ * with thread_repo_t a few lines above)
+ */
+
+/* given the id of a contracted vres stored in the repository returns
+ * a pointer to its entry */
+#define get_vres_entry(vres_id)                                \
+ ( & ( vres_repo.repo[vres_id_2_index(vres_id)] ) )
+/* gives the actual number of thread bound to a vres */
+#define get_vres_threads_number(vres_id)                       \
+ ( vres_repo.repo[vres_id_2_index(vres_id)].threads_number )
+/* gives the actual number of vres in the repository */
+#define get_vres_number()      \
+ ( vres_repo.vres_number )
+/* gives the actual number of dummy vres in the repository */
+#define get_dummy_vres_number()        \
+ ( vres_repo.dummy_vres_number )
+
+/*
+ * consistency and status checks
+ *
+ * common used checkings and tests about the access to the vres repository
+ * (is the index value in the correct range?) and the status of each entry
+ * (is it free?, non-free?, ecc.)
+ */
+
+/* are we accessing the repository with a valid index value ? */
+#define check_vres_repo_ind(ind) \
+ ( (ind >= 0 && ind < 2 * FRSH_MAX_N_VRES) ? true : false )
+/* is the vres repository entry (correctly accessed and) free ? */
+#define check_vres_repo_entry_free(ind)                                \
+ ( check_vres_repo_ind(ind) && ( vres_repo.repo[(ind)].processor.sid == FRSH_QRES_NOT_VALID_SID ) && ( vres_repo.repo[(ind)].disk.weight == FRSH_QRES_NOT_VALID_DISK ) )
+
+/* is the vres repository entry (correctly accessed and) non free ? */
+#define check_vres_repo_entry_nonfree(ind)                                     \
+ ( check_vres_repo_ind(ind) &&                                                 \
+  ( ( vres_repo.repo[(ind)].processor.sid != FRSH_QRES_NOT_VALID_SID ) ||      \
+  ( vres_repo.repo[(ind)].disk.weight != FRSH_QRES_NOT_VALID_DISK ) ) )                \
+
+/*
+ * initialize the repository
+ *
+ * sets all entries to invalid (that is, free) and init the mutex
+ * (if a multithreaded service thread has been configured)
+ *
+ * possible return values:
+ *  true (all ok)
+ *  false (something wrong with the mutex)
+ */
+static inline bool vres_repo_init()
+{
+       int i;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       if (fosa_mutex_init(&vres_repo.mutex, 0) != 0)
+               return false;
+#endif
+       vres_repo.vres_number = 0;
+       vres_repo.dummy_vres_number = 0;
+       for (i = 0; i < 2 * FRSH_MAX_N_VRES; i++) {
+               /* vres_repo.repo[i].contract untouched */
+               vres_repo.repo[i].processor.sid = FRSH_QRES_NOT_VALID_SID;
+               vres_repo.repo[i].processor.threads_number = 0;
+               vres_repo.repo[i].processor.first_thread_index = -1;
+               vres_repo.repo[i].disk.weight = FRSH_QRES_NOT_VALID_DISK;
+               vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
+       }
+
+       return true;
+}
+
+/*
+ * add an entry
+ *
+ * adds an entry into the repository. If successful the index of the entry is
+ * returned in vres_ind.
+ *
+ * We (again) "accept suggestions" and we implement this exaclty as previously
+ * described for the thread repository utility functions
+ *
+ * possible return values:
+ *  true (entry correctly added)
+ *  false (entry not added, maybe no more room in the repository!)
+ */
+static bool vres_repo_put(const frsh_contract_t *contract,
+       const unsigned int vres_data,
+       int *vres_ind)
+{
+       int i;
+       int start;
+       bool result;
+
+       /* depending by the contract type we must place the new entry
+        * if the "first half" (0 <= vres_ind < FRSH_MAX_N_VRES) of in the
+        * second half (FRSH_MAX_N_VRES <= vres_ind < 2*FRSH_MAX_N_VRES) of
+        * the repository */
+       switch (contract->contract_type) {
+               default:
+               case FRSH_CT_REGULAR:
+               case FRSH_CT_BACKGROUND:
+               {
+                       start = 0;
+                       break;
+               }
+               case FRSH_CT_DUMMY:
+               {
+                       start = FRSH_MAX_N_VRES;
+                       break;
+               }
+       }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&vres_repo.mutex);
+#endif
+       /* try to allocate the vres entry in the caller provided
+        * position (if any) and, only if it's not free, scan all
+        * the repository for a different suitable place */
+       if (vres_ind != NULL &&
+                       check_vres_repo_entry_free((start + *vres_ind) % FRSH_MAX_N_VRES))
+               i = (start + *vres_ind) % FRSH_MAX_N_VRES;
+       else {
+               i = start;
+               while (i < FRSH_MAX_N_VRES && !check_vres_repo_entry_free(i))
+                       i++;
+       }
+       /* if we're sure it's valid and free we place the
+        * vres entry into the repository in position 'i' */
+       result = false;
+       if (check_vres_repo_entry_free(i)) {
+               vres_repo.repo[i].contract = *contract;
+               switch (contract->resource_type) {
+                       case FRSH_RT_PROCESSOR:
+                       {
+                               vres_repo.repo[i].processor.sid = (qres_sid_t) vres_data;
+                               vres_repo.repo[i].processor.threads_number = 0;
+                               vres_repo.repo[i].processor.first_thread_index = -1;
+                               break;
+                       }
+                       case FRSH_RT_DISK:
+                       {
+                               vres_repo.repo[i].disk.weight = (unsigned int) vres_data;
+                               break;
+                       }
+                       default:
+                               return false;
+               }
+               vres_repo.repo[i].renegotiation_status = FRSH_RS_NOT_REQUESTED;
+               if (vres_ind != NULL)
+                       *vres_ind = i;
+               if (start == 0)
+                       vres_repo.vres_number++;
+               else
+                       vres_repo.dummy_vres_number++;
+               result = true;
+       }
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+
+       return result;
+}
+
+/*
+ * remove an entry
+ *
+ * sets the entry as free (if it is not) and decrement the (dummy?) vres count
+ *
+ * possible return values:
+ *  true (all ok, entry war non-free and is now free)
+ *  false (something wrong, entry was already free or index is out of renge)
+ */
+static bool vres_repo_free(const int vres_ind)
+{
+       thread_repo_entry_t *curr_thread;
+       int next_thread_ind;
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* lock the repository */
+       fosa_mutex_lock(&vres_repo.mutex);
+#endif
+       result = false;
+       if (check_vres_repo_entry_nonfree(vres_ind)) {
+               /* detach all the bound threads */
+               curr_thread = &thread_repo.repo[vres_repo.repo[vres_ind].processor.first_thread_index];
+               while (vres_repo.repo[vres_ind].processor.threads_number--) {
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+                       /* we need to lock the thread repository too */
+                       fosa_mutex_lock(&thread_repo.mutex);
+#endif
+#if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
+                       /* if no hierarchical scheduling only one thread per vres is correct */
+                       assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
+                               vres_repo.repo[vres_ind].processor.threads_number == 0);
+                       /* no threads in DUMMY contracts */
+                       assert(vres_repo.repo[vres_ind].contract.contract_type != FRSH_CT_DUMMY);
+#endif
+                       /* only REGULAR contract type case handled since
+                        * dummy vres _always_ has no bound thread and BACKGROUND
+                        * thread need no detaching (they're not attachet to any server) */
+                       if (vres_repo.repo[vres_ind].contract.contract_type == FRSH_CT_REGULAR)
+                               /* do not care the return value since the thread
+                                * could exist no more and the call could fail
+                                * quite easily and frequently! */
+                               qres_detach_thread(vres_repo.repo[vres_ind].processor.sid,
+                                               curr_thread->thread.linux_pid,
+                                               curr_thread->thread.linux_tid);
+                       /* free the thread repository entry too */
+                       curr_thread->thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
+                       curr_thread->thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
+                       curr_thread->thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
+                       curr_thread->vres = FRSH_NOT_VALID_VRES_ID;
+                       next_thread_ind = curr_thread->next_in_vres;
+                       curr_thread->next_in_vres = -1;
+                       thread_repo.threads_number--;
+                       /* move along the chain */
+                       if (next_thread_ind != -1)
+                               curr_thread = &thread_repo.repo[next_thread_ind];
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+                       /* unlock the thread repository */
+                       fosa_mutex_unlock(&thread_repo.mutex);
+#endif
+               }
+               /* set the repository entry as free */
+               /* vres_repo.repo[vres_id].contract untouched */
+               vres_repo.repo[vres_ind].processor.sid = FRSH_QRES_NOT_VALID_SID;
+               vres_repo.repo[vres_ind].processor.threads_number = 0;
+               vres_repo.repo[vres_ind].processor.first_thread_index = -1;
+               vres_repo.repo[vres_ind].renegotiation_status = FRSH_RS_NOT_REQUESTED;
+               if (vres_ind < FRSH_MAX_N_VRES)
+                       vres_repo.vres_number--;
+               else
+                       vres_repo.dummy_vres_number--;
+               result = true;
+       }
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       /* unlock the repository */
+       fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+
+       return result;
+}
+
+/*
+ * an entry is free ?
+ *
+ * check if a specific entry of the repository can be considered free.
+ * The needed check is very simple and reduces itself to one of the
+ * test macro defined above... Note if a multithreaded service thread is
+ * configured we do it with an atomic lock
+ *
+ * possible return values:
+ *  true (entry is free)
+ *  false (entry is not free or index is out of range)
+ */
+static inline bool vres_repo_isfree(const int vres_ind)
+{
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       fosa_mutex_lock(&vres_repo.mutex);
+#endif
+       result = check_vres_repo_entry_free(vres_ind);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+       return result;
+}
+
+/*
+ * an entry is "dummy" ?
+ *
+ * check if a specific entry of the repository (is not free!) and contains
+ * a dummy vres, created as a consequence of the negotiation of a CT_DUMMY
+ * contract.
+ * If a multithreaded service thread is configured the needed checkins are
+ * performed holding an atomic lock
+ *
+ * possible return values:
+ *  true (entry contains a dummy vres)
+ *  false (entry does not contains a dummy vres or index is out of range)
+ */
+static inline bool vres_repo_isdummy(const int vres_ind)
+{
+       bool result;
+
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       fosa_mutex_lock(&vres_repo.mutex);
+#endif
+       result = (check_vres_repo_ind(vres_ind) && vres_ind >= FRSH_MAX_N_VRES);
+#ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
+       fosa_mutex_unlock(&vres_repo.mutex);
+#endif
+       return result;
+}