]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
Unified header for FNA
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  * @license
16  *
17 //----------------------------------------------------------------------
18 //  Copyright (C) 2006 - 2009 by the FRESCOR consortium:
19 //
20 //    Universidad de Cantabria,              SPAIN
21 //    University of York,                    UK
22 //    Scuola Superiore Sant'Anna,            ITALY
23 //    Kaiserslautern University,             GERMANY
24 //    Univ. Politecnica  Valencia,           SPAIN
25 //    Czech Technical University in Prague,  CZECH REPUBLIC
26 //    ENEA                                   SWEDEN
27 //    Thales Communication S.A.              FRANCE
28 //    Visual Tools S.A.                      SPAIN
29 //    Rapita Systems Ltd                     UK
30 //    Evidence                               ITALY
31 //
32 //    See http://www.frescor.org
33 //
34 //        The FRESCOR project (FP6/2005/IST/5-034026) is funded
35 //        in part by the European Union Sixth Framework Programme
36 //        The European Union is not liable of any use that may be
37 //        made of this code.
38 //
39 //
40 //  based on previous work (FSF) done in the FIRST project
41 //
42 //   Copyright (C) 2005  Mälardalen University, SWEDEN
43 //                       Scuola Superiore S.Anna, ITALY
44 //                       Universidad de Cantabria, SPAIN
45 //                       University of York, UK
46 //
47 // This file is part of FNA (Frescor Network Adaptation)
48 //
49 // FNA is free software; you can redistribute it and/or modify it
50 // under terms of the GNU General Public License as published by the
51 // Free Software Foundation; either version 2, or (at your option) any
52 // later version.  FNA is distributed in the hope that it will be
53 // useful, but WITHOUT ANY WARRANTY; without even the implied warranty
54 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
55 // General Public License for more details. You should have received a
56 // copy of the GNU General Public License along with FNA; see file
57 // COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,
58 // Cambridge, MA 02139, USA.
59 //
60 // As a special exception, including FNA header files in a file,
61 // instantiating FNA generics or templates, or linking other files
62 // with FNA objects to produce an executable application, does not
63 // by itself cause the resulting executable application to be covered
64 // by the GNU General Public License. This exception does not
65 // however invalidate any other reasons why the executable file might be
66 // covered by the GNU Public License.
67 // -----------------------------------------------------------------------
68  *
69  */
70
71 #include <assert.h>
72 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
73 #include "frescan_bwres_threads.h"
74 #include "frescan_bwres_messages.h"
75 #include "frescan_bwres_requests.h"
76 #include "frescan_bwres_robjs.h"
77 #include "frescan_bwres_analysis.h"
78 #include "frescan_bwres_mode_change.h"
79 #include "frescan_config.h"
80 #include "frescan_debug.h"
81 #include "frescan_data.h"
82 #include "frescan_servers.h"
83
84 static void *frescan_manager_thread(void *arg);
85 static void *frescan_acceptor_thread(void *arg);
86
87 static void frescan_manager_gn_prepare_scenario
88                                 (frescan_bwres_sa_scenario_t  *scenario,
89                                  frescan_bwres_request_data_t *req_data);
90
91 static void frescan_manager_gn_restore_scenario
92                                 (frescan_bwres_sa_scenario_t  *scenario,
93                                  frescan_bwres_request_data_t *req_data);
94
95 static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data);
96 static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data);
97 static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data);
98
99 /**
100  * frescan_manager_thread_create()
101  *
102  * This call creates the manager thread at each node which will be waiting
103  * in a request queue for LOCAL or EXTERNAL requests.
104  */
105
106 int frescan_manager_thread_create(frescan_network_t net)
107 {
108         int ret;
109         fosa_thread_attr_t attr;
110
111         ret = fosa_thread_attr_init(&attr);
112         if (ret != 0) return ret;
113
114         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_NEG_THREAD_PRIO);
115         if (ret != 0) return ret;
116
117         ret = fosa_thread_create(&frescan_data[net].manager_thread_id,
118                                  &attr,
119                                  frescan_manager_thread,
120                                  (void *)(uint32_t)net);
121         if (ret != 0) return ret;
122
123         ret = fosa_thread_attr_destroy(&attr);
124         if (ret != 0) return ret;
125
126         return 0;
127 }
128
129 /**
130  * frescan_acceptor_thread_create()
131  *
132  * This call creates the acceptor thread which will be waiting negotiation
133  * messages from the network and converting them into requests.
134  */
135
136 int frescan_acceptor_thread_create(frescan_network_t net)
137 {
138         int ret;
139         fosa_thread_attr_t attr;
140
141         ret = fosa_thread_attr_init(&attr);
142         if (ret != 0) return ret;
143
144         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_BWRES_ACCEPTOR_PRIO);
145         if (ret != 0) return ret;
146
147         ret = fosa_thread_create(&frescan_data[net].acceptor_thread_id,
148                                  &attr,
149                                  frescan_acceptor_thread,
150                                  (void *)(uint32_t)net);
151         if (ret != 0) return ret;
152
153         ret = fosa_thread_attr_destroy(&attr);
154         if (ret != 0) return ret;
155
156         return 0;
157 }
158
159 /**
160  * frescan_manager_thread
161  */
162
163 static void *frescan_manager_thread(void *arg)
164 {
165         int ret;
166         frescan_bwres_request_id_t   req;
167         frescan_bwres_request_data_t *req_data;
168         frescan_network_t net = (uint32_t)arg;
169
170         DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
171
172         while(1) {
173                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "wait for a request\n");
174
175                 ret = frescan_bwres_requests_dequeue(&req);
176                 assert(ret == 0);
177
178                 ret = frescan_bwres_requests_get_data(req, &req_data);
179                 assert(ret == 0);
180
181                 switch(req_data->type) {
182                         case FRESCAN_BWRES_REQ_GN:
183                                 frescan_manager_req_gn(req_data);
184                                 break;
185                         case FRESCAN_BWRES_REP_GN:
186                                 frescan_manager_rep_gn(req_data);
187                                 break;
188                         case FRESCAN_BWRES_REQ_MC:
189                                 frescan_manager_req_mc(req_data);
190                                 break;
191                         case FRESCAN_BWRES_REQ_RES:
192                         case FRESCAN_BWRES_REQ_RES_GET:
193                         case FRESCAN_BWRES_REP_RES_GET:
194                         case FRESCAN_BWRES_REQ_RES_SET:
195                         case FRESCAN_BWRES_REQ_RES_COMMIT:
196                         case FRESCAN_BWRES_REQ_RES_CANCEL:
197                         default:
198                                 FRESCAN_ERROR("request type not supported\n");
199                                 assert(0);
200                 }
201
202                 if(req_data->request_node != frescan_data[net].local_node) {
203                         ret = frescan_bwres_requests_free(req);
204                         assert(ret == 0);
205                 }
206         }
207 }
208
209 /**
210  * frescan_acceptor_thread()
211  */
212
213 static void *frescan_acceptor_thread(void *arg)
214 {
215         int ret;
216         frescan_bwres_request_id_t req;
217         frescan_network_t net = (uint32_t)arg;
218
219         DEBUG(FRESCAN_BWRES_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
220
221         while(1) {
222                 ret = frescan_messages_recv_request(net, &req);
223                 assert(ret == 0);
224
225                 ret = frescan_bwres_requests_enqueue(req);
226                 assert(ret == 0);
227         }
228
229         return NULL;
230 }
231
232 /**
233  * frescan_manager_req_gn
234  */
235
236 static void frescan_manager_req_gn(frescan_bwres_request_data_t *req_data)
237 {
238         int ret, i;
239         frescan_node_t me;
240         bool accepted;
241         frescan_bwres_sa_scenario_t *scenario;
242         frescan_ss_t ss;
243         frescan_server_params_t server_params;
244         frescan_bwres_vres_t *vres;
245
246         me = frescan_data[req_data->net].local_node;
247
248         if (me != FRESCAN_BWRES_MASTER_NODE) {
249                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
250                       "send gn req to master\n");
251                 ret = frescan_messages_send_request(req_data);
252                 assert(ret == 0);
253                 return;
254         }
255
256         scenario = &frescan_data[req_data->net].scenario;
257
258         DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "prepare new scenario\n");
259         frescan_manager_gn_prepare_scenario(scenario, req_data);
260
261         DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "perform sched analysis\n");
262         ret = frescan_bwres_sa_sched_test(scenario, &accepted);
263         assert(ret == 0);
264
265         if (accepted) {
266                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "ACCEPTED!\n");
267                 req_data->return_value = FRESCAN_BWRES_REQ_ACCEPTED;
268
269                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
270                       "spare capacity and mode change\n");
271
272                 ret = frescan_bwres_sa_spare_capacity(scenario);
273                 assert(ret == 0);
274
275                 ret = frescan_bwres_mode_change_protocol(req_data);
276                 assert(ret == 0);
277         } else {
278                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "FAILED!\n");
279                 req_data->return_value = FRESCAN_BWRES_REQ_NOT_ACCEPTED;
280                 frescan_manager_gn_restore_scenario(scenario, req_data);
281         }
282
283         if (req_data->request_node != me) {
284                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "send reply\n");
285                 req_data->type = FRESCAN_BWRES_REP_GN;
286
287                 ret = frescan_messages_send_request(req_data);
288                 assert(ret == 0);
289                 return;
290         }
291
292         if (req_data->return_value == FRESCAN_BWRES_REQ_ACCEPTED) {
293                 // create servers for new contracts
294                 req_data->ss_new->size = req_data->contracts_to_neg->size;
295                 for(i=0; i<req_data->ss_new->size; i++) {
296                         vres = &frescan_data[req_data->net].scenario.
297                                         vres_pool[me]
298                                                 [req_data->ss_new->ss[i]];
299
300                         server_params.budget = frsh_rel_time_to_usec(
301                                         frsh_sa_time_to_rel_time(vres->old_c)) /
302                                         FRESCAN_FRAME_TX_TIME_US;
303
304                         server_params.period = frsh_sa_time_to_rel_time
305                                                                 (vres->old_t);
306                         server_params.prio   = vres->old_p;
307
308                         // Create server
309                         ret = frescan_servers_create(req_data->net,
310                                                 &server_params,
311                                                 &ss);
312                         assert(ret == 0);
313                         assert (req_data->ss_new->ss[i] == ss);
314                 }
315         }
316
317         DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG, "signal robj\n");
318         ret = frescan_bwres_robjs_signal(req_data->robj);
319         assert(ret == 0);
320 }
321
322 /**
323  * frescan_manager_gn_prepare_scenario
324  */
325
326 static void frescan_manager_gn_prepare_scenario
327                                 (frescan_bwres_sa_scenario_t  *scenario,
328                                  frescan_bwres_request_data_t *req_data)
329 {
330         int ret, i;
331
332         // NEG-GROUP
333         for(i=0; i<req_data->contracts_to_neg->size; i++) {
334                 ret = freelist_alloc(&frescan_data[req_data->net].scenario.
335                                       ss_id_freelist[req_data->request_node]);
336                 assert(ret >= 0);
337
338                 req_data->ss_new->ss[i] = (frescan_ss_t)ret;
339
340                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
341                       "prealloc net:%d node:%d ss_new->ss[%d]:%d\n",
342                       req_data->net, req_data->request_node,
343                       i, req_data->ss_new->ss[i]);
344
345                 ret = frescan_bwres_sa_add_contract
346                                 (scenario,
347                                  req_data->ss_new->ss[i],
348                                  req_data->request_node,
349                                  &req_data->contracts_to_neg->contracts[i]);
350                 assert(ret == 0);
351         }
352
353         req_data->ss_new->size = req_data->contracts_to_neg->size;
354
355         // RENEG-GROUP
356         scenario->backup_contracts_to_reneg.size =
357                                         req_data->contracts_to_reneg->size;
358
359         for(i=0; i<req_data->contracts_to_reneg->size; i++) {
360                 ret = frescan_bwres_sa_update_contract
361                         (scenario,
362                          req_data->ss_to_reneg->ss[i],
363                          req_data->request_node,
364                          &req_data->contracts_to_reneg->contracts[i],
365                          &scenario->backup_contracts_to_reneg.contracts[i]);
366                 assert(ret == 0);
367         }
368
369         // CANCEL-GROUP
370         scenario->backup_contracts_to_cancel.size =
371                                                 req_data->ss_to_cancel->size;
372
373         for(i=0; i<req_data->ss_to_cancel->size; i++) {
374                 ret = frescan_bwres_sa_remove_contract
375                         (scenario,
376                          req_data->ss_to_cancel->ss[i],
377                          req_data->request_node,
378                          &scenario->backup_contracts_to_cancel.contracts[i]);
379                 assert(ret == 0);
380         }
381 }
382
383 /**
384  * frescan_manager_gn_restore_scenario
385  */
386
387 static void frescan_manager_gn_restore_scenario
388                 (frescan_bwres_sa_scenario_t  *scenario,
389                  frescan_bwres_request_data_t *req_data)
390 {
391         int ret, i;
392
393         // NEG-GROUP
394         for(i=0; i<req_data->contracts_to_neg->size; i++) {
395                 ret = frescan_bwres_sa_remove_contract
396                                 (scenario,
397                                  req_data->ss_new->ss[i],
398                                  req_data->request_node,
399                                  NULL);
400                 assert(ret == 0);
401
402                 ret = freelist_free(&frescan_data[req_data->net].scenario.
403                                         ss_id_freelist[req_data->request_node],
404                                     req_data->ss_new->ss[i]);
405                 assert(ret == 0);
406         }
407
408         // RENEG-GROUP
409         for(i=0; i<req_data->contracts_to_reneg->size; i++) {
410                 ret = frescan_bwres_sa_update_contract
411                         (scenario,
412                          req_data->ss_to_reneg->ss[i],
413                          req_data->request_node,
414                          &scenario->backup_contracts_to_reneg.contracts[i],
415                          NULL);
416                 assert(ret == 0);
417         }
418
419         // CANCEL-GROUP
420         for(i=0; i<req_data->ss_to_cancel->size; i++) {
421                 ret = frescan_bwres_sa_add_contract
422                         (scenario,
423                          req_data->ss_to_cancel->ss[i],
424                          req_data->request_node,
425                          &scenario->backup_contracts_to_cancel.contracts[i]);
426                 assert(ret == 0);
427         }
428 }
429
430 /**
431  * frescan_manager_rep_gn
432  */
433
434 static void frescan_manager_rep_gn(frescan_bwres_request_data_t *req_data)
435 {
436         int ret, i;
437         frescan_bwres_request_data_t *caller_req;
438
439         ret = frescan_bwres_requests_get_data(req_data->req, &caller_req);
440         assert(ret == 0);
441
442         caller_req->return_value = req_data->return_value;
443
444         DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
445               "ret:%d -> %s\n", req_data->return_value,
446               (req_data->return_value ==
447                FRESCAN_BWRES_REQ_ACCEPTED) ? "OK" : "FAIL");
448
449         if (req_data->return_value == FRESCAN_BWRES_REQ_ACCEPTED) {
450                 assert (req_data->ss_new->size ==
451                         caller_req->contracts_to_neg->size);
452                 caller_req->ss_new->size = req_data->ss_new->size;
453
454                 DEBUG(FRESCAN_BWRES_MANAGER_ENABLE_DEBUG,
455                       "ss_new->size:%u\n", caller_req->ss_new->size);
456
457                 for (i=0; i<caller_req->ss_new->size; i++) {
458                         caller_req->ss_new->ss[i] = req_data->ss_new->ss[i];
459                 }
460         }
461
462         ret = frescan_bwres_robjs_signal(caller_req->robj);
463         assert(ret == 0);
464 }
465
466 /**
467  * frescan_manager_req_mc
468  */
469
470 static void frescan_manager_req_mc(frescan_bwres_request_data_t *req_data)
471 {
472         int ret;
473
474         ret = frescan_bwres_mode_change_local(req_data->net,
475                                               req_data->mode_change_type,
476                                               req_data->ss_to_cancel);
477         assert(ret == 0);
478 }