]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
fed6212169192b8f70a175cc093fffa477d33e65
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  * @license
16  *
17  * -----------------------------------------------------------------------
18  *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
19  *
20  *    Universidad de Cantabria,              SPAIN
21  *    University of York,                    UK
22  *    Scuola Superiore Sant'Anna,            ITALY
23  *    Kaiserslautern University,             GERMANY
24  *    Univ. Politécnica  Valencia,           SPAIN
25  *    Czech Technical University in Prague,  CZECH REPUBLIC
26  *    ENEA                                   SWEDEN
27  *    Thales Communication S.A.              FRANCE
28  *    Visual Tools S.A.                      SPAIN
29  *    Rapita Systems Ltd                     UK
30  *    Evidence                               ITALY
31  *
32  *    See http://www.frescor.org for a link to partners' websites
33  *
34  *           FRESCOR project (FP6/2005/IST/5-034026) is funded
35  *        in part by the European Union Sixth Framework Programme
36  *        The European Union is not liable of any use that may be
37  *        made of this code.
38  *
39  *  This file is part of FRESCAN
40  *
41  *  FRESCAN is free software; you can  redistribute it and/or  modify
42  *  it under the terms of  the GNU General Public License as published by
43  *  the Free Software Foundation;  either  version 2, or (at  your option)
44  *  any later version.
45  *
46  *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
47  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
48  *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
49  *  General Public License for more details.
50  *
51  *  You should have  received a  copy of  the  GNU  General Public License
52  *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
53  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
54  *  02111-1307, USA.
55  *
56  * As a special exception, including FRESCAN header files in a file,
57  * instantiating FRESCAN generics or templates, or linking other files
58  * with FRESCAN objects to produce an executable application, does not
59  * by itself cause the resulting executable application to be covered
60  * by the GNU General Public License. This exception does not
61  * however invalidate any other reasons why the executable file might be
62  * covered by the GNU Public License.
63  * -----------------------------------------------------------------------
64  *
65  */
66
67 #include <assert.h>
68 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
69 #include "frescan_bwres_threads.h"
70 #include "frescan_bwres_messages.h"
71 #include "frescan_bwres_requests.h"
72 #include "frescan_bwres_analysis.h"
73 #include "frescan_bwres_mode_change.h"
74 #include "frescan_config.h"
75 #include "frescan_debug.h"
76 #include "frescan_data.h"
77 #include "frescan_servers.h"
78
79 /**
80  * frescan_manager_thread_create()
81  *
82  * This call creates the manager thread at each node which will be waiting
83  * in a request queue for LOCAL or EXTERNAL requests.
84  */
85
86 static void *frescan_manager_thread(void *arg);
87
88 int frescan_manager_thread_create(frescan_network_t net)
89 {
90         int ret;
91         fosa_thread_attr_t attr;
92
93         ret = fosa_thread_attr_init(&attr);
94         if (ret != 0) return ret;
95
96         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
97         if (ret != 0) return ret;
98
99         ret = fosa_thread_create(&the_networks[net].manager_thread_id,
100                                  &attr,
101                                  frescan_manager_thread,
102                                  (void *)(uint32_t)net);
103         if (ret != 0) return ret;
104
105         ret = fosa_thread_attr_destroy(&attr);
106         if (ret != 0) return ret;
107
108         return 0;
109 }
110
111 /**
112  * frescan_acceptor_thread_create()
113  *
114  * This call creates the acceptor thread which will be waiting negotiation
115  * messages from the network and converting them into requests.
116  */
117
118 static void *frescan_acceptor_thread(void *arg);
119
120 int frescan_acceptor_thread_create(frescan_network_t net)
121 {
122         int ret;
123         fosa_thread_attr_t attr;
124
125         ret = fosa_thread_attr_init(&attr);
126         if (ret != 0) return ret;
127
128         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
129         if (ret != 0) return ret;
130
131         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
132                                  &attr,
133                                  frescan_acceptor_thread,
134                                  (void *)(uint32_t)net);
135         if (ret != 0) return ret;
136
137         ret = fosa_thread_attr_destroy(&attr);
138         if (ret != 0) return ret;
139
140         return 0;
141 }
142
143 /**
144  * frescan_manager_thread
145  */
146
147 static void frescan_manager_neg(frescan_request_data_t *req_data);
148 static void frescan_manager_reneg(frescan_request_data_t *req_data);
149 static void frescan_manager_cancel(frescan_request_data_t *req_data);
150 static void frescan_manager_repneg(frescan_request_data_t *req_data);
151 static void frescan_manager_budget_change(frescan_request_data_t *req_data);
152
153 static void *frescan_manager_thread(void *arg)
154 {
155         int ret;
156         frescan_request_id_t   req;
157         frescan_request_data_t *req_data;
158         frescan_network_t net = (uint32_t)arg;
159
160         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
161
162         while(1) {
163                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
164
165                 ret = frescan_requests_dequeue(&req);
166                 assert(ret == 0);
167
168                 ret = frescan_requests_get_data(req, &req_data);
169                 assert(ret == 0);
170
171                 switch(req_data->type) {
172                         case FRESCAN_REQ_NEG:
173                                 frescan_manager_neg(req_data);
174                                 break;
175                         case FRESCAN_REQ_RENEG:
176                                 frescan_manager_reneg(req_data);
177                                 break;
178                         case FRESCAN_REQ_CANCEL:
179                                 frescan_manager_cancel(req_data);
180                                 break;
181                         case FRESCAN_REP_DEC_BUDGET:
182                         case FRESCAN_REP_INC_BUDGET:
183                                 frescan_manager_budget_change(req_data);
184                                 break;
185                         case FRESCAN_REP_NEG:
186                                 frescan_manager_repneg(req_data);
187                                 break;
188                         default:
189                                 FRESCAN_ERROR("request type not supported\n");
190                                 assert(0);
191                 }
192
193                 if(req_data->request_node != the_networks[net].local_node) {
194                         ret = frescan_requests_free(req); // acceptor request
195                         assert(ret == 0);
196                 }
197         }
198 }
199
200 /**
201  * frescan_acceptor_thread()
202  */
203
204 static void *frescan_acceptor_thread(void *arg)
205 {
206         int ret;
207         frescan_request_id_t req;
208         frescan_network_t net = (uint32_t)arg;
209
210         DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
211
212         while(1) {
213                 ret = frescan_messages_recv_request(net, &req);
214                 assert(ret == 0);
215
216                 ret = frescan_requests_enqueue(req);
217                 assert(ret == 0);
218         }
219
220         return NULL;
221 }
222
223 /**
224  * frescan_manager_neg
225  */
226
227 static void frescan_manager_neg(frescan_request_data_t *req_data)
228 {
229         int ret;
230         bool accepted;
231         frescan_sa_scenario_t *scenario;
232
233         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
234
235         scenario = &the_networks[req_data->net].scenario;
236
237         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
238                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
239                       "add contract to scenario\n");
240
241                 ret = frescan_sa_add_contract
242                                 (scenario,
243                                  req_data->ss,
244                                  req_data->request_node,
245                                  req_data->contract);
246                 assert(ret == 0);
247
248                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
249                       "perform sched analysis\n");
250
251                 ret = frescan_sa_sched_test(scenario, &accepted);
252                 assert(ret == 0);
253
254                 if (accepted) {
255                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
256                               "schedulable! distribute spare capacity\n");
257
258                         ret = frescan_sa_spare_capacity(scenario);
259                         assert(ret == 0);
260
261                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
262
263                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
264                               "perform the mode change protocol!\n");
265
266                         ret = frescan_bwres_mode_change_protocol(req_data);
267                         assert(ret == 0);
268
269                         if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
270                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
271                                       "signal local request\n");
272                                 ret = frescan_bwres_robjs_signal
273                                                         (req_data->robj);
274                                 assert(ret == 0);
275                         }
276                 } else {
277                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
278                               "not schedulable!\n");
279
280                         ret = frescan_sa_remove_contract
281                                         (scenario,
282                                          req_data->ss,
283                                          req_data->request_node);
284                         assert(ret == 0);
285
286                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
287
288                         if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
289                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
290                                       "signal local request\n");
291                                 ret = frescan_bwres_robjs_signal(req_data->robj);
292                                 assert(ret == 0);
293                         } else {
294                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
295                                       "sending reply\n");
296                                 req_data->type = FRESCAN_REP_NEG;
297                                 ret = frescan_messages_send_request(req_data);
298                                 assert(ret == 0);
299                         }
300                 }
301         } else {
302                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
303                       "send negotiation request to master\n");
304                 ret = frescan_messages_send_request(req_data);
305                 assert(ret == 0);
306         }
307 }
308
309 /**
310  * frescan_manager_reneg
311  */
312
313 static void frescan_manager_reneg(frescan_request_data_t *req_data)
314 {
315         int ret;
316         bool is_schedulable;
317         frsh_contract_t old_contract;
318         frescan_sa_scenario_t *scenario;
319
320         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
321
322         scenario = &the_networks[req_data->net].scenario;
323
324         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
325                 // scheduling analysis
326                 ret = frescan_sa_update_contract
327                                 (scenario,
328                                  req_data->ss,
329                                  req_data->request_node,
330                                  req_data->contract,
331                                  &old_contract);
332                 assert(ret == 0);
333
334                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
335                       "perform sched analysis\n");
336
337                 ret = frescan_sa_sched_test(scenario, &is_schedulable);
338                 assert(ret == 0);
339
340                 if (is_schedulable) {
341                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
342                               "schedulable! distribute spare capacity\n");
343
344                         ret = frescan_sa_spare_capacity(scenario);
345                         assert(ret == 0);
346
347                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
348
349                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
350                               "perform the mode change protocol!\n");
351
352                         ret = frescan_bwres_mode_change_protocol(req_data);
353                         assert(ret == 0);
354                 } else {
355                         ret = frescan_sa_update_contract
356                                         (scenario,
357                                          req_data->ss,
358                                          req_data->request_node,
359                                          &old_contract,
360                                          NULL);
361                         assert(ret == 0);
362                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
363                 }
364
365                 // signal or reply the results
366                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
367                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
368                         ret = frescan_bwres_robjs_signal(req_data->robj);
369                         assert(ret == 0);
370                 }
371         } else {
372                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
373                       "send renegotiation request to master\n");
374                 ret = frescan_messages_send_request(req_data);
375                 assert(ret == 0);
376         }
377 }
378
379 /**
380  * frescan_manager_cancel
381  */
382
383 static void frescan_manager_cancel(frescan_request_data_t *req_data)
384 {
385         int ret;
386         bool is_schedulable;
387         frescan_sa_scenario_t *scenario;
388
389         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
390
391         scenario = &the_networks[req_data->net].scenario;
392
393         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
394                 ret = frescan_sa_remove_contract
395                                 (scenario,
396                                  req_data->ss,
397                                  req_data->request_node);
398                 assert(ret == 0);
399
400                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
401                       "assign priorities\n");
402
403                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
404                       "perform sched analysis\n");
405
406                 ret = frescan_sa_sched_test(scenario, &is_schedulable);
407                 assert(ret == 0);
408
409                 assert(is_schedulable == true);
410
411                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
412                       "redistribute spare capacity\n");
413
414                 ret = frescan_sa_spare_capacity(scenario);
415                 assert(ret == 0);
416
417                 ret = frescan_bwres_mode_change_protocol(req_data);
418                 assert(ret == 0);
419         } else {
420                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
421                       "send cancel request to master\n");
422                 ret = frescan_messages_send_request(req_data);
423                 assert(ret == 0);
424         }
425
426         if (req_data->request_node == the_networks[req_data->net].local_node) {
427                 ret = frescan_bwres_robjs_signal(req_data->robj);
428                 assert(ret == 0);
429         }
430 }
431
432 /**
433  * frescan_manager_budget_change
434  */
435
436 static void frescan_manager_budget_change(frescan_request_data_t *req_data)
437 {
438         int ret;
439         frescan_sa_mode_change_type_t mode_change_type;
440         frescan_node_t me = the_networks[req_data->net].local_node;
441
442         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "budget change request\n");
443
444         if (req_data->type == FRESCAN_REP_INC_BUDGET) {
445                 mode_change_type = FRESCAN_SA_BUDGET_INC;
446         } else {
447                 mode_change_type = FRESCAN_SA_BUDGET_DEC;
448         }
449
450         ret = frescan_bwres_budget_change(req_data,
451                                           me,
452                                           mode_change_type);
453         assert(ret == 0);
454 }
455
456 /**
457  * frescan_manager_repneg
458  */
459
460 static void frescan_manager_repneg(frescan_request_data_t *req_data)
461 {
462 //         int ret;
463 //         frescan_request_data_t *neg_req_data;
464 //
465 //         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
466 //
467 //         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
468 //         assert(ret == 0);
469 //
470 //         neg_req_data->return_value = req_data->return_value;
471 //         neg_req_data->final_values = req_data->final_values;
472 //
473 //         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
474 //         assert(ret == 0);
475
476 }