]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
changes to use the FRSH FSA module to do the analysis and spare capacity. TODO: finis...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  * @license
16  *
17  * -----------------------------------------------------------------------
18  *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
19  *
20  *    Universidad de Cantabria,              SPAIN
21  *    University of York,                    UK
22  *    Scuola Superiore Sant'Anna,            ITALY
23  *    Kaiserslautern University,             GERMANY
24  *    Univ. Politécnica  Valencia,           SPAIN
25  *    Czech Technical University in Prague,  CZECH REPUBLIC
26  *    ENEA                                   SWEDEN
27  *    Thales Communication S.A.              FRANCE
28  *    Visual Tools S.A.                      SPAIN
29  *    Rapita Systems Ltd                     UK
30  *    Evidence                               ITALY
31  *
32  *    See http://www.frescor.org for a link to partners' websites
33  *
34  *           FRESCOR project (FP6/2005/IST/5-034026) is funded
35  *        in part by the European Union Sixth Framework Programme
36  *        The European Union is not liable of any use that may be
37  *        made of this code.
38  *
39  *  This file is part of FRESCAN
40  *
41  *  FRESCAN is free software; you can  redistribute it and/or  modify
42  *  it under the terms of  the GNU General Public License as published by
43  *  the Free Software Foundation;  either  version 2, or (at  your option)
44  *  any later version.
45  *
46  *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
47  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
48  *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
49  *  General Public License for more details.
50  *
51  *  You should have  received a  copy of  the  GNU  General Public License
52  *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
53  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
54  *  02111-1307, USA.
55  *
56  * As a special exception, including FRESCAN header files in a file,
57  * instantiating FRESCAN generics or templates, or linking other files
58  * with FRESCAN objects to produce an executable application, does not
59  * by itself cause the resulting executable application to be covered
60  * by the GNU General Public License. This exception does not
61  * however invalidate any other reasons why the executable file might be
62  * covered by the GNU Public License.
63  * -----------------------------------------------------------------------
64  *
65  */
66
67 #include <assert.h>
68 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
69 #include "frescan_bwres_threads.h"
70 #include "frescan_bwres_messages.h"
71 #include "frescan_bwres_requests.h"
72 #include "frescan_bwres_analysis.h"
73 #include "frescan_bwres_mode_change.h"
74 #include "frescan_config.h"
75 #include "frescan_debug.h"
76 #include "frescan_data.h"
77 #include "frescan_servers.h"
78
79 /**
80  * frescan_manager_thread_create()
81  *
82  * This call creates the manager thread at each node which will be waiting
83  * in a request queue for LOCAL or EXTERNAL requests.
84  */
85
86 static void *frescan_manager_thread(void *arg);
87
88 int frescan_manager_thread_create(frescan_network_t net)
89 {
90         int ret;
91         fosa_thread_attr_t attr;
92
93         ret = fosa_thread_attr_init(&attr);
94         if (ret != 0) return ret;
95
96         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
97         if (ret != 0) return ret;
98
99         ret = fosa_thread_create(&the_networks[net].manager_thread_id,
100                                  &attr,
101                                  frescan_manager_thread,
102                                  (void *)(uint32_t)net);
103         if (ret != 0) return ret;
104
105         ret = fosa_thread_attr_destroy(&attr);
106         if (ret != 0) return ret;
107
108         return 0;
109 }
110
111 /**
112  * frescan_acceptor_thread_create()
113  *
114  * This call creates the acceptor thread which will be waiting negotiation
115  * messages from the network and converting them into requests.
116  */
117
118 static void *frescan_acceptor_thread(void *arg);
119
120 int frescan_acceptor_thread_create(frescan_network_t net)
121 {
122         int ret;
123         fosa_thread_attr_t attr;
124
125         ret = fosa_thread_attr_init(&attr);
126         if (ret != 0) return ret;
127
128         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
129         if (ret != 0) return ret;
130
131         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
132                                  &attr,
133                                  frescan_acceptor_thread,
134                                  (void *)(uint32_t)net);
135         if (ret != 0) return ret;
136
137         ret = fosa_thread_attr_destroy(&attr);
138         if (ret != 0) return ret;
139
140         return 0;
141 }
142
143 /**
144  * frescan_manager_thread
145  */
146
147 static void frescan_manager_neg(frescan_request_data_t *req_data);
148 static void frescan_manager_reneg(frescan_request_data_t *req_data);
149 static void frescan_manager_cancel(frescan_request_data_t *req_data);
150 static void frescan_manager_repneg(frescan_request_data_t *req_data);
151
152 static void *frescan_manager_thread(void *arg)
153 {
154         int ret;
155         frescan_request_id_t   req;
156         frescan_request_data_t *req_data;
157         frescan_network_t net = (uint32_t)arg;
158
159         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
160
161         while(1) {
162                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
163
164                 ret = frescan_requests_dequeue(&req);
165                 assert(ret == 0);
166
167                 ret = frescan_requests_get_data(req, &req_data);
168                 assert(ret == 0);
169
170                 switch(req_data->type) {
171                         case FRESCAN_REQ_NEG:
172                                 frescan_manager_neg(req_data);
173                                 break;
174                         case FRESCAN_REQ_RENEG:
175                                 frescan_manager_reneg(req_data);
176                                 break;
177                         case FRESCAN_REQ_CANCEL:
178                                 frescan_manager_cancel(req_data);
179                                 break;
180                         case FRESCAN_REP_NEG:
181                                 frescan_manager_repneg(req_data);
182                                 break;
183                         default:
184                                 FRESCAN_ERROR("request type not supported\n");
185                                 assert(0);
186                 }
187
188                 if(req_data->request_node != the_networks[net].local_node) {
189                         ret = frescan_requests_free(req); // acceptor request
190                         assert(ret == 0);
191                 }
192         }
193 }
194
195 /**
196  * frescan_acceptor_thread()
197  */
198
199 static void *frescan_acceptor_thread(void *arg)
200 {
201         int ret;
202         frescan_request_id_t req;
203         frescan_network_t net = (uint32_t)arg;
204
205         DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
206
207         while(1) {
208                 ret = frescan_messages_recv_request(net, &req);
209                 assert(ret == 0);
210
211                 ret = frescan_requests_enqueue(req);
212                 assert(ret == 0);
213         }
214
215         return NULL;
216 }
217
218 /**
219  * frescan_manager_neg
220  */
221
222 static void frescan_manager_neg(frescan_request_data_t *req_data)
223 {
224         int ret;
225         bool accepted;
226
227         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
228
229         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
230                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
231                       "add contract to scenario\n");
232
233                 ret = frescan_sa_add_contract
234                                 (&the_networks[req_data->net].scenario,
235                                  req_data->ss,
236                                  req_data->request_node,
237                                  req_data->contract);
238                 assert(ret == 0);
239
240                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
241                       "assign priorities\n");
242
243                 ret = frsh_sa_assign_priorities
244                                 (&the_networks[req_data->net].scenario);
245                 assert(ret == 0);
246
247                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
248                       "perform sched analysis\n");
249
250                 ret = frescan_sa_sched_test
251                                 (&the_networks[req_data->net].scenario,
252                                  &accepted);
253                 assert(ret == 0);
254
255                 if (accepted) {
256                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
257                               "schedulable! distribute spare capacity\n");
258
259                         ret = frescan_sa_spare_capacity
260                                        (&the_networks[req_data->net].scenario);
261                         assert(ret == 0);
262
263                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
264
265                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
266                               "perform the mode change protocol!\n");
267
268                         ret = frescan_bwres_mode_change_protocol(req_data);
269                         assert(ret == 0);
270                 } else {
271                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
272                               "not schedulable!\n");
273
274                         ret = frescan_sa_remove_contract
275                                         (&the_networks[req_data->net].scenario,
276                                          req_data->ss,
277                                          req_data->request_node);
278                         assert(ret == 0);
279
280                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
281
282                         if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
283                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
284                                       "signal local request\n");
285                                 ret = frescan_bwres_robjs_signal(req_data->robj);
286                                 assert(ret == 0);
287                         } else {
288                                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
289                                       "sending reply\n");
290                                 req_data->type = FRESCAN_REP_NEG;
291                                 ret = frescan_messages_send_request(req_data);
292                                 assert(ret == 0);
293                         }
294                 }
295         } else {
296                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
297                       "send negotiation request to master\n");
298                 ret = frescan_messages_send_request(req_data);
299                 assert(ret == 0);
300         }
301 }
302
303 /**
304  * frescan_manager_reneg
305  */
306
307 static void frescan_manager_reneg(frescan_request_data_t *req_data)
308 {
309         int ret;
310         bool is_schedulable;
311         frsh_contract_t old_contract;
312
313         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
314
315         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
316                 // scheduling analysis
317                 ret = frescan_sa_update_contract
318                                 (&the_networks[req_data->net].scenario,
319                                  req_data->ss,
320                                  req_data->request_node,
321                                  req_data->contract,
322                                  &old_contract);
323                 assert(ret == 0);
324
325                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
326                       "assign priorities\n");
327
328                 ret = frsh_sa_assign_priorities
329                                 (&the_networks[req_data->net].scenario);
330                 assert(ret == 0);
331
332                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
333                       "perform sched analysis\n");
334
335                 ret = frescan_sa_sched_test
336                                 (&the_networks[req_data->net].scenario,
337                                  &is_schedulable);
338                 assert(ret == 0);
339
340                 if (accepted) {
341                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
342                               "schedulable! distribute spare capacity\n");
343
344                         ret = frescan_sa_spare_capacity
345                                         (&the_networks[req_data->net].scenario);
346                         assert(ret == 0);
347
348                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
349
350                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
351                               "perform the mode change protocol!\n");
352
353                         ret = frescan_bwres_mode_change_protocol(req_data);
354                         assert(ret == 0);
355                 } else {
356                         ret = frescan_sa_update_contract
357                                         (&the_networks[req_data->net].scenario,
358                                          req_data->ss,
359                                          req_data->request_node,
360                                          &old_contract,
361                                          NULL);
362                         assert(ret == 0);
363                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
364                 }
365
366                 // signal or reply the results
367                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
368                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
369                         ret = frescan_bwres_robjs_signal(req_data->robj);
370                         assert(ret == 0);
371                 }
372         } else {
373                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
374                       "send renegotiation request to master\n");
375                 ret = frescan_messages_send_request(req_data);
376                 assert(ret == 0);
377         }
378 }
379
380 /**
381  * frescan_manager_cancel
382  */
383
384 static void frescan_manager_cancel(frescan_request_data_t *req_data)
385 {
386         int ret;
387         bool is_schedulable;
388
389         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
390
391         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
392                 ret = frescan_sa_remove_contract
393                                 (&the_networks[req_data->net].scenario,
394                                  req_data->ss,
395                                  req_data->request_node);
396                 assert(ret == 0);
397
398                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
399                       "assign priorities\n");
400
401                 ret = frsh_sa_assign_priorities
402                                 (&the_networks[req_data->net].scenario);
403                 assert(ret == 0);
404
405                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
406                       "perform sched analysis\n");
407
408                 ret = frescan_sa_sched_test
409                                 (&the_networks[req_data->net].scenario,
410                                  &is_schedulable);
411                 assert(ret == 0);
412
413                 assert(is_schedulable == true);
414
415                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
416                       "redistribute spare capacity\n");
417
418                 ret = frescan_sa_spare_capacity
419                                 (&the_networks[req_data->net].scenario);
420                 assert(ret == 0);
421
422                 ret = frescan_bwres_mode_change_protocol(req_data);
423                 assert(ret == 0);
424         } else {
425                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
426                       "send cancel request to master\n");
427                 ret = frescan_messages_send_request(req_data);
428                 assert(ret == 0);
429         }
430
431         if (req_data->request_node == the_networks[req_data->net].local_node) {
432                 ret = frescan_bwres_robjs_signal(req_data->robj);
433                 assert(ret == 0);
434         }
435 }
436
437 /**
438  * frescan_manager_repneg
439  */
440
441 static void frescan_manager_repneg(frescan_request_data_t *req_data)
442 {
443         int ret;
444         frescan_request_data_t *neg_req_data;
445
446         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
447
448         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
449         assert(ret == 0);
450
451         neg_req_data->return_value = req_data->return_value;
452         neg_req_data->final_values = req_data->final_values;
453
454         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
455         assert(ret == 0);
456 }