]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - frsh_aquosa/mngr/thread_repo.c
Splitted frsh_servicethe.c
[frescor/fwp.git] / frsh_aquosa / mngr / thread_repo.c
1 static thread_repo_t thread_repo;                       /* registered thread repository */
2 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
3 static worker_thread_pool_t worker_pool;                /* thread pool for multithreaded implementation */
4 #endif
5
6 /***************************************************************************/
7 /* T H R E A D S   R E P O S I T O R Y   U T I L I T Y   F U N C T I O N S */
8 /***************************************************************************/
9
10 /*
11  * access functions to thread repository
12  *
13  * some simple subsitutions macro which help keep thread_repo_t opaque and
14  * improve code cleanness and look & feel
15  */
16
17 /* given the index of a registered thread in the repository returns
18  * a pointer to its entry () */
19 #define get_thread_entry(thread_ind)    \
20  ( & ( thread_repo.repo[(thread_ind)] ) )
21 /* gives the actual number of threads in the repository */
22 #define get_threads_number()    \
23  ( thread_repo.threads_number )
24
25 /*
26  * consistency and status checks
27  *
28  * common used checkings and tests about the access to the thread repository
29  * (is the index value in the correct range?) and the status of each entry
30  * (is it free?, non-free?, equal to an external one?, ecc.)
31  */
32
33 /* are we accessing the repository with a valid index value ? */
34 #define check_thread_repo_ind(ind)                                      \
35  ( ( ind >= 0 && ind < FRSH_MAX_N_THREADS ) ? true : false )
36 /* is the thread repository entry (correctly accessed and) free ? */
37 #define check_thread_repo_entry_free(ind)                                               \
38  ( ( ( check_thread_repo_ind(ind) ) &&                                                  \
39   thread_repo.repo[(ind)].thread.pthread_id == FRSH_NOT_VALID_THREAD_ID &&              \
40   thread_repo.repo[(ind)].thread.linux_pid == FRSH_NOT_VALID_THREAD_ID &&               \
41   thread_repo.repo[(ind)].thread.linux_tid == FRSH_NOT_VALID_THREAD_ID ) ? true : false )
42 /* is the thread repository entry (correctly accessed and) non free ? */
43 #define check_thread_repo_entry_nonfree(ind)                                            \
44  ( ( ( check_thread_repo_ind(ind) ) &&                                                  \
45   thread_repo.repo[(ind)].thread.pthread_id != FRSH_NOT_VALID_THREAD_ID &&              \
46   thread_repo.repo[(ind)].thread.linux_pid != FRSH_NOT_VALID_THREAD_ID &&               \
47   thread_repo.repo[(ind)].thread.linux_tid != FRSH_NOT_VALID_THREAD_ID ) ? true : false )
48 /* does the thread repository entry contain a thread equal to the one provided ? */
49 #define check_thread_repo_entry_equal(th, th_ind)                                       \
50  ( ( ( check_thread_repo_ind(th_ind) ) &&                                               \
51   ( thread_repo.repo[(th_ind)].thread.pthread_id == (th).pthread_id ) &&                \
52   ( thread_repo.repo[(th_ind)].thread.linux_pid == (th).linux_pid ) &&                  \
53   ( thread_repo.repo[(th_ind)].thread.linux_tid == (th).linux_tid ) ) ? true : false )  \
54
55 /*
56  * initialize the repository
57  *
58  * sets all entries to invalid (that is, free) and init the mutex
59  * (if a multithreaded service thread has been configured)
60  *
61  * possible return values:
62  *  true (all ok)
63  *  false (something wrong with the mutex)
64  */
65 static inline bool thread_repo_init()
66 {
67         int i;
68
69 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
70         if (fosa_mutex_init(&thread_repo.mutex, 0) != 0)
71                 return false;
72 #endif
73         thread_repo.threads_number = 0;
74         for (i = 0; i < FRSH_MAX_N_THREADS; i++) {
75                 thread_repo.repo[i].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
76                 thread_repo.repo[i].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
77                 thread_repo.repo[i].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
78                 thread_repo.repo[i].vres = FRSH_NOT_VALID_VRES_ID;
79                 thread_repo.repo[i].next_in_vres = -1;  /* no other thread in vres */
80         }
81
82         return true;
83 }
84
85 /*
86  * add an entry
87  *
88  * adds an entry into the repository. If successful the index of the entry is
89  * returned in thread_ind.
90  *
91  * We "accept suggestions", i.e. we suppose the caller knows the location of
92  * a free entry in the repository and ask us to insert the new entry exactly
93  * there, at thread_ind position.
94  * So we check if that location is really free and, if yes, we can add the
95  * entry right there. If, on the other way, the entry is not free we need to
96  * search the repository for a different place.
97  *
98  * Note we also keep updated the repository entry (see below) of the vres the
99  * thread is bound to.
100  *
101  * possible return values:
102  *  true (entry correctly added)
103  *  false (entry not added, maybe no more room in the repository!)
104  */
105 static bool thread_repo_put(const frsh_thread_id_t *thread,
106         const frsh_vres_id_t vres_id,
107         int *thread_ind)
108 {
109         int i;
110         bool result;
111
112 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
113         /* lock the repository */
114         fosa_mutex_lock(&thread_repo.mutex);
115 #endif
116         /* try to place the new entry in the caller provided position (if any)
117          * and, only if it's not free, scan all the repository for
118          * a different and suitable place */
119         if (thread_ind != NULL &&
120                         check_thread_repo_entry_free(*thread_ind % FRSH_MAX_N_THREADS))
121                 i = *thread_ind % FRSH_MAX_N_THREADS;
122         else {
123                 i = 0;
124                 while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_free(i))
125                         i++;
126         }
127         /* if we're sure it's valid and free we place the
128          * thread entry into the repository in position 'i' */
129         result = false;
130         if (check_thread_repo_entry_free(i)) {
131                 /* update the provided vres repository entry */
132 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
133                 /* we need to lock the vres repository too */
134                 fosa_mutex_lock(&vres_repo.mutex);
135 #endif
136 #if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
137                 assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
138                         get_vres_entry(vres_id)->processor.threads_number == 0);
139 #endif
140                 get_vres_entry(vres_id)->processor.threads_number++;
141                 /* keep updated the chain of thread of the vres repository entry */
142 #if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
143                 assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
144                         get_vres_entry(vres_id)->processor.first_thread_index == -1);
145 #endif
146                 thread_repo.repo[i].next_in_vres = get_vres_entry(vres_id)->processor.first_thread_index;
147                 get_vres_entry(vres_id)->processor.first_thread_index = i;
148 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
149                 /* unlock the vres repository */
150                 fosa_mutex_unlock(&vres_repo.mutex);
151 #endif
152                 thread_repo.repo[i].thread = *thread;
153                 thread_repo.repo[i].vres = vres_id;
154                 if (thread_ind != NULL)
155                         *thread_ind = i;
156                 thread_repo.threads_number++;
157                 result = true;
158         }
159
160 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
161         /* unlock the repository */
162         fosa_mutex_unlock(&thread_repo.mutex);
163 #endif
164
165         return result;
166 }
167
168 /*
169  * remove an entry
170  *
171  * sets the entry as free (if it is not) and decrement the thread count.
172  *
173  * Note we also keep updated the repository entry (see below) of the vres the
174  * thread is bound to.
175  *
176  * possible return values:
177  *  true (all ok, entry war non-free and is now free)
178  *  false (something wrong, entry was already free or index is out of renge)
179  */
180 static bool thread_repo_free(const int thread_ind)
181 {
182         vres_repo_entry_t *thread_vres;
183         thread_repo_entry_t *thread_p;
184         bool result;
185
186 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
187         /* lock the repository */
188         fosa_mutex_lock(&thread_repo.mutex);
189 #endif
190         result = false;
191         if (check_thread_repo_entry_nonfree(thread_ind)) {
192                 thread_p = &thread_repo.repo[thread_ind];
193 #if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
194                 assert(!vres_repo_isfree(vres_id_2_index(thread_p->vres)));
195 #endif
196                 thread_vres = get_vres_entry(thread_p->vres);
197                 /* update the bound vres repository entry */
198 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
199                 /* we need to lock the vres repository too */
200                 fosa_mutex_lock(&vres_repo.mutex);
201 #endif
202 #if defined(FRSH_CONFIG_ENABLE_DEBUG) && defined(FRSH_CONFIG_ENABLE_DEBUG_ASSERTS)
203                 assert(FRSH_HIERARCHICAL_MODULE_SUPPORTED ||
204                         get_vres_entry(thread_p->vres)->processor.threads_number == 1);
205 #endif
206                 thread_vres->processor.threads_number--;
207                 /* keep the chain of thread in the vres updated */
208                 if (thread_vres->processor.first_thread_index == thread_ind)
209                         thread_vres->processor.first_thread_index = thread_p->next_in_vres;
210                 else
211                         while (thread_p->next_in_vres != thread_ind)
212                                 thread_p = &thread_repo.repo[thread_p->next_in_vres];
213                 thread_p->next_in_vres = thread_repo.repo[thread_ind].next_in_vres;
214 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
215                 /* unlock the vres repository */
216                 fosa_mutex_unlock(&vres_repo.mutex);
217 #endif
218                 /* set the repository entry as free */
219                 thread_repo.repo[thread_ind].thread.pthread_id = FRSH_NOT_VALID_THREAD_ID;
220                 thread_repo.repo[thread_ind].thread.linux_pid = FRSH_NOT_VALID_THREAD_ID;
221                 thread_repo.repo[thread_ind].thread.linux_tid = FRSH_NOT_VALID_THREAD_ID;
222                 thread_repo.repo[thread_ind].vres = FRSH_NOT_VALID_VRES_ID;
223                 thread_repo.repo[thread_ind].next_in_vres = -1;
224                 thread_repo.threads_number--;
225                 result = true;
226         }
227 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
228         /* unlock the repository */
229         fosa_mutex_unlock(&thread_repo.mutex);
230 #endif
231
232         return result;
233 }
234
235 /*
236  * is an entry free ?
237  *
238  * check if a specific entry of the repository can be considered free.
239  * The needed check is very simple and reduces itself to one of the
240  * test macro define d above... Note if a multithreaded service thread is
241  * configured we do it with an atomic lock
242  *
243  * possible return values:
244  *  true (entry is free)
245  *  false (entry is not free or index is out of range)
246  */
247 static bool thread_repo_isfree(const int thread_ind)
248 {
249         bool result;
250
251 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
252         /* lock the repository */
253         fosa_mutex_lock(&thread_repo.mutex);
254 #endif
255         result = check_thread_repo_entry_free(thread_ind);
256 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
257         /* unlock the repository */
258         fosa_mutex_unlock(&thread_repo.mutex);
259 #endif
260         return result;
261 }
262
263 /* 
264  * search for an entry
265  *
266  * tries to find an entry in the repository containing a the thread descriptor
267  * provided as (first) argument. If successful the index of the entry is
268  * returned in thread_ind.
269  *
270  * We "accept suggestions" (as in the entry add function), i.e. we suppose the
271  * caller knows the entry he's searching for is in the repository and its
272  * index is exactly thread_ind, so we check if this is true and, if yes,
273  * we're done! If, on the other way, we can't find the searched entry at
274  * thread_ind position we start a complete scan of the repository
275  *
276  * possible return values:
277  *  true (entry found)
278  *  false (entry not found)
279  */
280 static bool thread_repo_find(const frsh_thread_id_t *thread, int *thread_ind)
281 {
282         int i;
283         bool result;
284
285 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
286         /* lock the repository */
287         fosa_mutex_lock(&thread_repo.mutex);
288 #endif
289         /* check if the caller provides us (we hope _intentionally_!) the
290          * correct position in the repository of the entry he's searching */
291         if (thread_ind != NULL &&
292                         check_thread_repo_entry_equal(*thread, (*thread_ind) % FRSH_MAX_N_THREADS))
293                 /* if yes we've finished our search before starting it! */
294                 i = (*thread_ind) % FRSH_MAX_N_THREADS;
295         else {
296                 /* if no we scan the repository till we find the entry (or till its end) */
297                 i = 0;
298                 while (i < FRSH_MAX_N_THREADS && !check_thread_repo_entry_equal(*thread, i))
299                         i++;
300         }
301         /* check again and definitively decide the entry is in the repository or not */
302         result = false;
303         if (check_thread_repo_entry_equal(*thread, i)) {
304                 if (thread_ind != NULL)
305                         *thread_ind = i;
306                 result = true;
307         }
308 #ifdef FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD
309         /* unlock the repository */
310         fosa_mutex_unlock(&thread_repo.mutex);
311 #endif
312
313         return result;
314 }
315
316 /*
317  * thread pool initialization
318  *
319  * initialize the worker thread pool (a couple of counters, a mutex and a
320  * condition variable) and create the minimum number of worker thread
321  * configured
322  *
323  * possible return values:
324  *  true (all ok, pool initialized and all thread created)
325  *  false (something wrong in mutex or condition initialization or in thread creation)
326  */
327
328 static void* worker_thread_code();
329
330 static inline bool worker_thread_pool_init()
331 {
332         int i;
333         frsh_thread_id_t new_thread;
334
335         worker_pool.active_thread_number = 0;
336         worker_pool.busy_thread_number = 0;
337         if (fosa_mutex_init(&worker_pool.mutex, 0) != 0)
338                 return false;
339         if (fosa_cond_init(&worker_pool.sleep) != 0)
340                 return false;
341         for (i = 0; i < MIN_N_WORKER_THREAD; i++)
342                 if (fosa_thread_create(&new_thread, NULL, worker_thread_code, NULL) != 0)
343                         return false;
344
345         return true;
346 }
347
348 /*
349  * worker thread code
350  *
351  * code executed by all the worker thread activated by the service thread to
352  * effectively serve the client(s) requests.
353  * Basically we check if there are queued requests and if the service thread
354  * is useful or, if not, it has to die.
355  * If "in service" a worker thread processes a request from a client and sends
356  * back the results (if requested) then check again if it's time to sleep or
357  * to die!
358  *
359  * possible exit status:
360  *  EXIT_SUCCESS (normal exit, the thread is no longer usefull)
361  *  EXIT_FAILURE (abnormal exit, something has gone severely wrong!)
362  */
363 static void* worker_thread_code(void *p)
364 {
365         frsh_in_msg_t request;
366         frsh_out_msg_t reply;
367         int conn;
368         frsh_thread_id_t thread_self;
369
370         /* lock the pool and say everyone we're a new
371          * living (active) and working (busy) thread */
372         fosa_mutex_lock(&worker_pool.mutex);
373         worker_pool.active_thread_number++;
374         worker_pool.busy_thread_number++;
375         /* done, unlock the pool */
376         fosa_mutex_unlock(&worker_pool.mutex);
377         thread_self = fosa_thread_self();
378         /* directly attach ourself to the service contract vres
379          * (no need fo negotiation or any other FRSH operation!)*/
380         if (qres_attach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
381                         thread_self.linux_pid,
382                         thread_self.linux_tid) != QOS_OK) {
383                 syslog(LOG_ERR, "can't attach the worker thread to the vres");
384
385                 return (void*)EXIT_FAILURE;
386         }
387         /* main worker thread cycle */
388         while (1) {
389                 /* check if there are new requests to serve */
390                 while (!dequeue_service_th_request(&conn, &request)) {
391                         fosa_mutex_lock(&worker_pool.mutex);
392                         /* sleeping or exiting depends on how many worker
393                          * thread the system has been configured to keep alive */
394                         if (worker_pool.active_thread_number <= MIN_N_WORKER_THREAD) {
395                                 /* we're still alive, we only go to sleep */
396                                 worker_pool.busy_thread_number--;
397                                 fosa_cond_wait(&worker_pool.sleep, &worker_pool.mutex);
398                                 /* the service thread wake up us,
399                                  * go and check again for queued requests */
400                                 worker_pool.busy_thread_number++;
401                                 fosa_mutex_unlock(&worker_pool.mutex);
402                         } else {
403                                 /* our work is no longer needed, we're dying! */
404                                 worker_pool.busy_thread_number--;
405                                 worker_pool.active_thread_number--;
406                                 fosa_mutex_unlock(&worker_pool.mutex);
407                                 if (qres_detach_thread(get_vres_entry(service_th_vres_id)->processor.sid,
408                                                 thread_self.linux_pid,
409                                                 thread_self.linux_tid) != QOS_OK) {
410                                         syslog(LOG_ERR, "can't detach the worker thread from the vres");
411
412                                         return (void*)EXIT_FAILURE;
413                                 }
414
415                                 return (void*)EXIT_SUCCESS;
416                         }
417                 }
418                 switch (request.type) {
419                         /* process client(s) requests according to the operation and
420                          * (simply?) colling one (or a little more) of the FRSH utility
421                          * function defined down in the code */
422                         case FRSH_MT_NEGOTIATE_CONTRACT:
423                         {
424                                 syslog(LOG_DEBUG, "operation requested: NEGOTIATE_CONTRACT");
425 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
426                                 fosa_mutex_lock(&contract_mutex);
427 #endif
428                                 reply.error = negotiate_contract(&request.val.negotiate_contract.contract,
429                                         &reply.val.negotiate_contract.vres_id);
430 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
431                                 fosa_mutex_unlock(&contract_mutex);
432 #endif
433                                 goto answer_and_close;
434                         }
435                         case FRSH_MT_BIND_THREAD:
436                         {
437                                 syslog(LOG_DEBUG, "operation requested: BIND_THREAD");
438                                 reply.error = bind_thread(&request.val.bind_thread.thread_id,
439                                         request.val.bind_thread.vres_id);
440                                 goto answer_and_close;
441                         }
442                         case FRSH_MT_UNBIND_THREAD:
443                         {
444                                 syslog(LOG_DEBUG, "operation requested: UNBIND_THREAD");
445                                 reply.error = unbind_thread(&request.val.unbind_thread.thread_id);
446                                 goto answer_and_close;
447                         }
448                         case FRSH_MT_GET_THREAD_VRES_ID:
449                         {
450                                 syslog(LOG_DEBUG, "operation requested: GET_THREAD_VRES_ID");
451                                 reply.error = get_thread_vres_id(&request.val.get_thread_vres_id.thread_id,
452                                         &reply.val.get_thread_vres_id.vres_id);
453                                 goto answer_and_close;
454                         }
455                         case FRSH_MT_GET_CONTRACT:
456                         {
457                                 syslog(LOG_DEBUG, "operation requested: GET_CONTRACT");
458                                 reply.error = get_contract(request.val.get_contract.vres_id,
459                                         &reply.val.get_contract.contract);
460                                 goto answer_and_close;
461                         }
462                         case FRSH_MT_GET_LABEL_VRES_ID:
463                         {
464                                 syslog(LOG_DEBUG, "operation requested: GET_LABEL_VRES_ID");
465                                 reply.error = get_label_vres_id(request.val.get_label_vres_id.contract_label,
466                                         &reply.val.get_label_vres_id.vres_id);
467                                 goto answer_and_close;
468                         }
469                         case FRSH_MT_CANCEL_CONTRACT:
470                         {
471                                 syslog(LOG_DEBUG, "operation requested: CANCEL_CONTRACT");
472 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
473                                 fosa_mutex_lock(&contract_mutex);
474 #endif
475                                 reply.error = cancel_contract(request.val.cancel_contract.vres_id);
476 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
477                                 fosa_mutex_unlock(&contract_mutex);
478 #endif
479                                 goto answer_and_close;
480                         }
481                         case FRSH_MT_RENEGOTIATE_CONTRACT:
482                         {
483                                 syslog(LOG_DEBUG, "operation requested: RENEGOTIATE_CONTRACT");
484 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
485                                 fosa_mutex_lock(&contract_mutex);
486 #endif
487                                 reply.error =
488                                         renegotiate_contract(&request.val.renegotiate_contract.new_contract,
489                                                 request.val.renegotiate_contract.vres_id);
490 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
491                                 fosa_mutex_unlock(&contract_mutex);
492 #endif
493                                 goto answer_and_close;
494                         }
495                         case FRSH_MT_REQUEST_CONTRACT_RENEGOTIATION:
496                         {
497                                 syslog(LOG_DEBUG, "operation requested: REQUEST_CONTRACT_RENEGOTIATION");
498                                 close(conn);
499                                 syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
500                                 syslog(LOG_INFO, "connection closed");
501 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
502                                 fosa_mutex_lock(&contract_mutex);
503 #endif
504                                 reply.error =
505                                         renegotiate_contract(&request.val.request_contract_renegotiation.new_contract,
506                                                 request.val.request_contract_renegotiation.vres_id);
507 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
508                                 fosa_mutex_unlock(&contract_mutex);
509 #endif
510 #ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
511                                 /* signals the completion of the operation with a signal */
512                                 if (request.val.request_contract_renegotiation.signal != FRSH_NULL_SIGNAL)
513                                         if (fosa_signal_queue(request.val.request_contract_renegotiation.signal,
514                                                         request.val.request_contract_renegotiation.siginfo,
515                                                         request.val.request_contract_renegotiation.thread_to_signal) != 0)
516                                                 syslog(LOG_ERR,
517                                                         "can't signal process %d with signal %d to notify the completion of the renegotiation",
518                                                         request.val.request_contract_renegotiation.thread_to_signal.linux_pid,
519                                                         request.val.request_contract_renegotiation.signal);
520 #endif
521                                 goto end_cycle;
522                         }
523                         case FRSH_MT_GET_RENEGOTIATION_STATUS:
524                         {
525                                 syslog(LOG_DEBUG, "operation requested: GET_RENEGOTIATION_STATUS");
526                                 reply.error = get_renegotiation_status(request.val.get_renegotiation_status.vres_id,
527                                         &reply.val.get_renegotiation_status.renegotiation_status);
528                                 goto answer_and_close;
529                         }
530                         case FRSH_MT_NEGOTIATE_GROUP:
531                         {
532                                 syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
533 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
534                                 fosa_mutex_lock(&contract_mutex);
535 #endif
536                                 reply.error =
537                                         negotiate_group(request.val.negotiate_group.vres_down_number,
538                                                         request.val.negotiate_group.vres_down,
539                                                         reply.val.negotiate_group.vres_down_status,
540                                                         request.val.negotiate_group.contracts_up_number,
541                                                         request.val.negotiate_group.contracts_up,
542                                                         reply.val.negotiate_group.vres_up,
543                                                         reply.val.negotiate_group.contracts_up_status);
544 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
545                                 fosa_mutex_unlock(&contract_mutex);
546 #endif
547
548                                 //reply.error = FRSH_NO_ERROR;
549                                 //for (i = 0; i < request.val.negotiate_group.vres_down_number; i++)
550                                 //      reply.error = reply.val.negotiate_group.vres_down_status[i] =
551                                 //                      cancel_contract(request.val.negotiate_group.vres_down[i]);
552                                 //for (i = 0; i < request.val.negotiate_group.contracts_up_number; i++)
553                                 //      reply.error = reply.val.negotiate_group.contracts_up_status[i] =
554                                 //                      negotiate_contract(&request.val.negotiate_group.contracts_up[i],
555                                 //                                      &reply.val.negotiate_group.vres_up[i]);
556
557                                 goto answer_and_close;
558                         }
559                         case FRSH_MT_CHANGE_MODE_SYNC:
560                         case FRSH_MT_CHANGE_MODE_ASYNC:
561                         {
562                                 syslog(LOG_DEBUG, "operation requested: NEGOTIATE_GROUP");
563                                 reply.error = FRSH_NO_ERROR;
564                                 if (request.type == FRSH_MT_CHANGE_MODE_ASYNC) {
565                                         close(conn);
566                                         syslog(LOG_DEBUG, "aync. operation, do not wait for completion");
567                                         syslog(LOG_INFO, "connection closed");
568
569                                 }
570 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
571                                 fosa_mutex_lock(&contract_mutex);
572 #endif
573                                 reply.error =
574                                         change_mode(request.val.change_mode.vres_down_number,
575                                                         request.val.change_mode.vres_down,
576                                                         reply.val.change_mode.vres_down_status,
577                                                         request.val.change_mode.contracts_touch_number,
578                                                         request.val.change_mode.contracts_touch,
579                                                         request.val.change_mode.vres_touch,
580                                                         reply.val.change_mode.vres_touch_status,
581                                                         request.val.change_mode.contracts_up_number,
582                                                         request.val.change_mode.contracts_up,
583                                                         reply.val.change_mode.vres_up,
584                                                         reply.val.change_mode.contracts_up_status);
585
586                                 //for (i = 0; i < request.val.change_mode.vres_down_number; i++)
587                                 //      reply.error = reply.val.change_mode.vres_down_status[i] =
588                                 //                      cancel_contract(request.val.change_mode.vres_down[i]);
589                                 //for (i = 0; i < request.val.change_mode.contracts_touch_number; i++)
590                                 //      reply.error = reply.val.change_mode.vres_touch_status[i] =
591                                 //                      renegotiate_contract(&request.val.change_mode.contracts_touch[i],
592                                 //                                      request.val.change_mode.vres_touch[i]);
593                                 //for (i = 0; i < request.val.change_mode.contracts_up_number; i++)
594                                 //      reply.error = reply.val.change_mode.contracts_up_status[i] =
595                                 //                      negotiate_contract(&request.val.change_mode.contracts_up[i],
596                                 //                                      &reply.val.change_mode.vres_up[i]);
597
598 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
599                                 fosa_mutex_unlock(&contract_mutex);
600 #endif
601 #ifdef FRSH_CONFIG_SERVICE_TH_LOCAL_MACHINE
602                                 /* signals the completion of the operation with a signal */
603                                 if (request.val.change_mode.signal != FRSH_NULL_SIGNAL)
604                                         if (fosa_signal_queue(request.val.change_mode.signal,
605                                                         request.val.change_mode.siginfo,
606                                                         request.val.change_mode.thread_to_signal) != 0)
607                                                 syslog(LOG_ERR,
608                                                         "can't signal process %d with signal %d to notify the completion of the renegotiation",
609                                                         request.val.change_mode.thread_to_signal.linux_pid,
610                                                         request.val.change_mode.signal);
611 #endif
612                                 if (request.type == FRSH_MT_CHANGE_MODE_SYNC)
613                                         goto answer_and_close;
614                                 else
615                                         goto end_cycle;
616                         }
617                         case FRSH_MT_GET_CPUTIME:
618                         {
619                                 syslog(LOG_DEBUG, "operation requested: GET_CPUTIME");
620                                 reply.error = get_cputime(request.val.get_cputime.vres_id,
621                                         &reply.val.get_cputime.cputime);
622                                 goto answer_and_close;
623                         }
624                         case FRSH_MT_GET_CURRENTBUDGET:
625                         {
626                                 syslog(LOG_DEBUG, "operation requested: GET_CURRENTBUDGET");
627                                 reply.error = get_current_budget(request.val.get_currentbudget.vres_id,
628                                         &reply.val.get_currentbudget.currentbudget);
629                                 goto answer_and_close;
630                         }
631                         case FRSH_MT_GET_BUDGET_AND_PERIOD:
632                         {
633                                 syslog(LOG_DEBUG, "operation requested: GET_BUDGET_AND_PERIOD");
634                                 reply.error = get_budget_and_period(request.val.get_budget_and_period.vres_id,
635                                         &reply.val.get_budget_and_period.budget,
636                                         &reply.val.get_budget_and_period.period);
637                                 goto answer_and_close;
638                         }
639                         case FRSH_MT_GET_SERVICE_THREAD_DATA:
640                         {
641                                 syslog(LOG_DEBUG, "operation requested: GET_SERVICE_THREAD_DATA");
642                                 reply.error = get_budget_and_period(service_th_vres_id,
643                                         &reply.val.get_service_thread_data.budget,
644                                         &reply.val.get_service_thread_data.period);
645                                 goto answer_and_close;
646                         }
647                         case FRSH_MT_SET_SERVICE_THREAD_DATA:
648                         {
649                                 struct timespec old_budget, old_period;
650                                         syslog(LOG_DEBUG, "operation requested: SET_SERVICE_THREAD_DATA");
651                                 old_budget = service_th_contract.budget_min;
652                                 old_period = service_th_contract.period_max;
653                                 service_th_contract.budget_min =
654                                         request.val.set_service_thread_data.budget;
655                                 service_th_contract.period_max =
656                                         request.val.set_service_thread_data.period;
657 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
658                                 fosa_mutex_lock(&contract_mutex);
659 #endif
660                                 reply.error =
661                                         renegotiate_contract(&service_th_contract, service_th_vres_id);
662 #ifdef FRSH_CONFIG_ENABLE_SERVICE_MULTITHREAD
663                                 fosa_mutex_unlock(&contract_mutex);
664 #endif
665                                 if (reply.error == FRSH_NO_ERROR)
666                                         reply.val.set_service_thread_data.accepted = true;
667                                 else {
668                                         reply.val.set_service_thread_data.accepted = false;
669                                         service_th_contract.budget_min = old_budget;
670                                         service_th_contract.period_max = old_period;
671                                 }
672                                 goto answer_and_close;
673                         }
674                         case FRSH_MT_FEEDBACK_SET_SPARE:
675                         {
676                                 syslog(LOG_DEBUG, "operation requested: FEEDBACK_SET_SPARE");
677                                 reply.error = reserve_feedback(&request.val.set_feedback_spare.spare_contract);
678                                 goto answer_and_close;
679                         }
680                         case FRSH_MT_FEEDBACK_GET_SPARE:
681                         {
682                                 syslog(LOG_DEBUG, "operation requested: FEEDBACK_GET_SPARE");
683                                 if (feedback_spare_contract == NULL)
684                                         reply.error = FRSH_ERR_NOT_CONTRACTED_VRES;
685                                 else {
686                                         reply.val.get_feedback_spare.spare_contract = *feedback_spare_contract;
687                                         reply.error = FRSH_NO_ERROR;
688                                 }       
689                                 goto answer_and_close;
690                         }
691                         default:
692                                 syslog(LOG_ERR, "operation requested: unknown !!");
693                                 reply.error = FRSH_SERVICE_TH_ERR_SOCKET;
694                                 goto answer_and_close;
695                 }
696 answer_and_close:
697                 if (send(conn, (void*) &reply, sizeof(frsh_out_msg_t), 0) < sizeof(frsh_out_msg_t))
698                         syslog(LOG_ERR, "can't send reply message (ERROR 0x%x)", FRSH_SERVICE_TH_ERR_SOCKET);
699                 else
700                         syslog(LOG_DEBUG, "command reply sent (exit status 0x%x)", reply.error);
701                 close(conn);
702                 syslog(LOG_INFO, "connection closed");
703 end_cycle:
704                 ;
705         }
706
707         return EXIT_SUCCESS;
708 }
709
710 #endif          /* FRSH_CONFIG_ENABLE_SERVICE_TH_MULTITHREAD */