]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
added message to notify a budget increase or decrease (due to spare capacity)
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  * @license
16  *
17  * -----------------------------------------------------------------------
18  *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
19  *
20  *    Universidad de Cantabria,              SPAIN
21  *    University of York,                    UK
22  *    Scuola Superiore Sant'Anna,            ITALY
23  *    Kaiserslautern University,             GERMANY
24  *    Univ. Politécnica  Valencia,           SPAIN
25  *    Czech Technical University in Prague,  CZECH REPUBLIC
26  *    ENEA                                   SWEDEN
27  *    Thales Communication S.A.              FRANCE
28  *    Visual Tools S.A.                      SPAIN
29  *    Rapita Systems Ltd                     UK
30  *    Evidence                               ITALY
31  *
32  *    See http://www.frescor.org for a link to partners' websites
33  *
34  *           FRESCOR project (FP6/2005/IST/5-034026) is funded
35  *        in part by the European Union Sixth Framework Programme
36  *        The European Union is not liable of any use that may be
37  *        made of this code.
38  *
39  *  This file is part of FRESCAN
40  *
41  *  FRESCAN is free software; you can  redistribute it and/or  modify
42  *  it under the terms of  the GNU General Public License as published by
43  *  the Free Software Foundation;  either  version 2, or (at  your option)
44  *  any later version.
45  *
46  *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
47  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
48  *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
49  *  General Public License for more details.
50  *
51  *  You should have  received a  copy of  the  GNU  General Public License
52  *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
53  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
54  *  02111-1307, USA.
55  *
56  * As a special exception, including FRESCAN header files in a file,
57  * instantiating FRESCAN generics or templates, or linking other files
58  * with FRESCAN objects to produce an executable application, does not
59  * by itself cause the resulting executable application to be covered
60  * by the GNU General Public License. This exception does not
61  * however invalidate any other reasons why the executable file might be
62  * covered by the GNU Public License.
63  * -----------------------------------------------------------------------
64  *
65  */
66
67 #include <assert.h>
68 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
69 #include "frescan_bwres_threads.h"
70 #include "frescan_bwres_messages.h"
71 #include "frescan_bwres_requests.h"
72 #include "frescan_bwres_analysis.h"
73 #include "frescan_bwres_mode_change.h"
74 #include "frescan_config.h"
75 #include "frescan_debug.h"
76 #include "frescan_data.h"
77 #include "frescan_servers.h"
78
79 /**
80  * frescan_manager_thread_create()
81  *
82  * This call creates the manager thread at each node which will be waiting
83  * in a request queue for LOCAL or EXTERNAL requests.
84  */
85
86 static void *frescan_manager_thread(void *arg);
87
88 int frescan_manager_thread_create(frescan_network_t net)
89 {
90         int ret;
91         fosa_thread_attr_t attr;
92
93         ret = fosa_thread_attr_init(&attr);
94         if (ret != 0) return ret;
95
96         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
97         if (ret != 0) return ret;
98
99         ret = fosa_thread_create(&the_networks[net].manager_thread_id,
100                                  &attr,
101                                  frescan_manager_thread,
102                                  (void *)(uint32_t)net);
103         if (ret != 0) return ret;
104
105         ret = fosa_thread_attr_destroy(&attr);
106         if (ret != 0) return ret;
107
108         return 0;
109 }
110
111 /**
112  * frescan_acceptor_thread_create()
113  *
114  * This call creates the acceptor thread which will be waiting negotiation
115  * messages from the network and converting them into requests.
116  */
117
118 static void *frescan_acceptor_thread(void *arg);
119
120 int frescan_acceptor_thread_create(frescan_network_t net)
121 {
122         int ret;
123         fosa_thread_attr_t attr;
124
125         ret = fosa_thread_attr_init(&attr);
126         if (ret != 0) return ret;
127
128         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
129         if (ret != 0) return ret;
130
131         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
132                                  &attr,
133                                  frescan_acceptor_thread,
134                                  (void *)(uint32_t)net);
135         if (ret != 0) return ret;
136
137         ret = fosa_thread_attr_destroy(&attr);
138         if (ret != 0) return ret;
139
140         return 0;
141 }
142
143 /**
144  * frescan_manager_thread
145  */
146
147 static void frescan_manager_neg(frescan_request_data_t *req_data);
148 static void frescan_manager_reneg(frescan_request_data_t *req_data);
149 static void frescan_manager_cancel(frescan_request_data_t *req_data);
150 static void frescan_manager_repneg(frescan_request_data_t *req_data);
151
152 static void *frescan_manager_thread(void *arg)
153 {
154         int ret;
155         frescan_request_id_t   req;
156         frescan_request_data_t *req_data;
157         frescan_network_t net = (uint32_t)arg;
158
159         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
160
161         while(1) {
162                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
163
164                 ret = frescan_requests_dequeue(&req);
165                 assert(ret == 0);
166
167                 ret = frescan_requests_get_data(req, &req_data);
168                 assert(ret == 0);
169
170                 switch(req_data->type) {
171                         case FRESCAN_REQ_NEG:
172                                 frescan_manager_neg(req_data);
173                                 break;
174                         case FRESCAN_REQ_RENEG:
175                                 frescan_manager_reneg(req_data);
176                                 break;
177                         case FRESCAN_REQ_CANCEL:
178                                 frescan_manager_cancel(req_data);
179                                 break;
180                         case FRESCAN_REP_NEG:
181                                 frescan_manager_repneg(req_data);
182                                 break;
183                         default:
184                                 FRESCAN_ERROR("request type not supported\n");
185                                 assert(0);
186                 }
187
188                 if(req_data->request_node != the_networks[net].local_node) {
189                         ret = frescan_requests_free(req); // acceptor request
190                         assert(ret == 0);
191                 }
192         }
193 }
194
195 /**
196  * frescan_acceptor_thread()
197  */
198
199 static void *frescan_acceptor_thread(void *arg)
200 {
201         int ret;
202         frescan_request_id_t req;
203         frescan_network_t net = (uint32_t)arg;
204
205         DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
206
207         while(1) {
208                 ret = frescan_messages_recv_request(net, &req);
209                 assert(ret == 0);
210
211                 ret = frescan_requests_enqueue(req);
212                 assert(ret == 0);
213         }
214
215         return NULL;
216 }
217
218 /**
219  * frescan_manager_neg
220  */
221
222 static void frescan_manager_neg(frescan_request_data_t *req_data)
223 {
224         int ret;
225         bool accepted;
226         frescan_sa_scenario_t *scenario;
227
228         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
229
230         scenario = &the_networks[req_data->net].scenario;
231
232         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
233                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
234                       "add contract to scenario\n");
235
236                 ret = frescan_sa_add_contract
237                                 (scenario,
238                                  req_data->ss,
239                                  req_data->request_node,
240                                  req_data->contract);
241                 assert(ret == 0);
242
243                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
244                       "perform sched analysis\n");
245
246                 ret = frescan_sa_sched_test(scenario, &accepted);
247                 assert(ret == 0);
248
249                 if (accepted) {
250                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
251                               "schedulable! distribute spare capacity\n");
252
253                         ret = frescan_sa_spare_capacity(scenario);
254                         assert(ret == 0);
255
256                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
257
258                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
259                               "perform the mode change protocol!\n");
260
261                         ret = frescan_bwres_mode_change_protocol(req_data);
262                         assert(ret == 0);
263                 } else {
264                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
265                               "not schedulable!\n");
266
267                         ret = frescan_sa_remove_contract
268                                         (scenario,
269                                          req_data->ss,
270                                          req_data->request_node);
271                         assert(ret == 0);
272
273                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
274
275                         if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
276                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
277                                       "signal local request\n");
278                                 ret = frescan_bwres_robjs_signal(req_data->robj);
279                                 assert(ret == 0);
280                         } else {
281                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
282                                       "sending reply\n");
283                                 req_data->type = FRESCAN_REP_NEG;
284                                 ret = frescan_messages_send_request(req_data);
285                                 assert(ret == 0);
286                         }
287                 }
288         } else {
289                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
290                       "send negotiation request to master\n");
291                 ret = frescan_messages_send_request(req_data);
292                 assert(ret == 0);
293         }
294 }
295
296 /**
297  * frescan_manager_reneg
298  */
299
300 static void frescan_manager_reneg(frescan_request_data_t *req_data)
301 {
302         int ret;
303         bool is_schedulable;
304         frsh_contract_t old_contract;
305         frescan_sa_scenario_t *scenario;
306
307         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
308
309         scenario = &the_networks[req_data->net].scenario;
310
311         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
312                 // scheduling analysis
313                 ret = frescan_sa_update_contract
314                                 (scenario,
315                                  req_data->ss,
316                                  req_data->request_node,
317                                  req_data->contract,
318                                  &old_contract);
319                 assert(ret == 0);
320
321                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
322                       "perform sched analysis\n");
323
324                 ret = frescan_sa_sched_test(scenario, &is_schedulable);
325                 assert(ret == 0);
326
327                 if (is_schedulable) {
328                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
329                               "schedulable! distribute spare capacity\n");
330
331                         ret = frescan_sa_spare_capacity(scenario);
332                         assert(ret == 0);
333
334                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
335
336                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
337                               "perform the mode change protocol!\n");
338
339                         ret = frescan_bwres_mode_change_protocol(req_data);
340                         assert(ret == 0);
341                 } else {
342                         ret = frescan_sa_update_contract
343                                         (scenario,
344                                          req_data->ss,
345                                          req_data->request_node,
346                                          &old_contract,
347                                          NULL);
348                         assert(ret == 0);
349                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
350                 }
351
352                 // signal or reply the results
353                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
354                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
355                         ret = frescan_bwres_robjs_signal(req_data->robj);
356                         assert(ret == 0);
357                 }
358         } else {
359                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
360                       "send renegotiation request to master\n");
361                 ret = frescan_messages_send_request(req_data);
362                 assert(ret == 0);
363         }
364 }
365
366 /**
367  * frescan_manager_cancel
368  */
369
370 static void frescan_manager_cancel(frescan_request_data_t *req_data)
371 {
372         int ret;
373         bool is_schedulable;
374         frescan_sa_scenario_t *scenario;
375
376         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
377
378         scenario = &the_networks[req_data->net].scenario;
379
380         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
381                 ret = frescan_sa_remove_contract
382                                 (scenario,
383                                  req_data->ss,
384                                  req_data->request_node);
385                 assert(ret == 0);
386
387                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
388                       "assign priorities\n");
389
390                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
391                       "perform sched analysis\n");
392
393                 ret = frescan_sa_sched_test(scenario, &is_schedulable);
394                 assert(ret == 0);
395
396                 assert(is_schedulable == true);
397
398                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
399                       "redistribute spare capacity\n");
400
401                 ret = frescan_sa_spare_capacity(scenario);
402                 assert(ret == 0);
403
404                 ret = frescan_bwres_mode_change_protocol(req_data);
405                 assert(ret == 0);
406         } else {
407                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
408                       "send cancel request to master\n");
409                 ret = frescan_messages_send_request(req_data);
410                 assert(ret == 0);
411         }
412
413         if (req_data->request_node == the_networks[req_data->net].local_node) {
414                 ret = frescan_bwres_robjs_signal(req_data->robj);
415                 assert(ret == 0);
416         }
417 }
418
419 /**
420  * frescan_manager_repneg
421  */
422
423 static void frescan_manager_repneg(frescan_request_data_t *req_data)
424 {
425         int ret;
426         frescan_request_data_t *neg_req_data;
427
428         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
429
430         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
431         assert(ret == 0);
432
433         neg_req_data->return_value = req_data->return_value;
434         neg_req_data->final_values = req_data->final_values;
435
436         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
437         assert(ret == 0);
438 }