]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
ef750b8b7beb1dfe0cbe3dbc7e22eba9b2d32212
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  */
16
17 #include <assert.h>
18 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
19 #include "frescan_bwres_threads.h"
20 #include "frescan_bwres_messages.h"
21 #include "frescan_bwres_requests.h"
22 #include "frescan_bwres_analysis.h"
23 #include "frescan_config.h"
24 #include "frescan_debug.h"
25 #include "frescan_data.h"
26 #include "frescan_servers.h"
27
28 /**
29  * frescan_manager_thread_create()
30  *
31  * This call creates the manager thread at each node which will be waiting
32  * in a request queue for LOCAL or EXTERNAL requests.
33  */
34
35 static void *frescan_manager_thread(void *arg);
36
37 int frescan_manager_thread_create(frescan_network_t net)
38 {
39         int ret;
40         fosa_thread_attr_t attr;
41
42         ret = fosa_thread_attr_init(&attr);
43         if (ret != 0) return ret;
44
45         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
46         if (ret != 0) return ret;
47
48         ret = fosa_thread_create(&the_networks[net].manager_thread_id,
49                                  &attr,
50                                  frescan_manager_thread,
51                                  (void *)(uint32_t)net);
52         if (ret != 0) return ret;
53
54         ret = fosa_thread_attr_destroy(&attr);
55         if (ret != 0) return ret;
56
57         return 0;
58 }
59
60 /**
61  * frescan_acceptor_thread_create()
62  *
63  * This call creates the acceptor thread which will be waiting negotiation
64  * messages from the network and converting them into requests.
65  */
66
67 static void *frescan_acceptor_thread(void *arg);
68
69 int frescan_acceptor_thread_create(frescan_network_t net)
70 {
71         int ret;
72         fosa_thread_attr_t attr;
73
74         ret = fosa_thread_attr_init(&attr);
75         if (ret != 0) return ret;
76
77         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
78         if (ret != 0) return ret;
79
80         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
81                                  &attr,
82                                  frescan_acceptor_thread,
83                                  (void *)(uint32_t)net);
84         if (ret != 0) return ret;
85
86         ret = fosa_thread_attr_destroy(&attr);
87         if (ret != 0) return ret;
88
89         return 0;
90 }
91
92 /**
93  * frescan_manager_thread
94  */
95
96 static void frescan_manager_neg(frescan_request_data_t *req_data);
97 static void frescan_manager_reneg(frescan_request_data_t *req_data);
98 static void frescan_manager_cancel(frescan_request_data_t *req_data);
99 static void frescan_manager_repneg(frescan_request_data_t *req_data);
100
101 static void *frescan_manager_thread(void *arg)
102 {
103         int ret;
104         frescan_request_id_t   req;
105         frescan_request_data_t *req_data;
106         frescan_network_t net = (uint32_t)arg;
107
108         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
109
110         while(1) {
111                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
112
113                 ret = frescan_requests_dequeue(&req);
114                 assert(ret == 0);
115
116                 ret = frescan_requests_get_data(req, &req_data);
117                 assert(ret == 0);
118
119                 switch(req_data->type) {
120                         case FRESCAN_REQ_NEG:
121                                 frescan_manager_neg(req_data);
122                                 break;
123                         case FRESCAN_REQ_RENEG:
124                                 frescan_manager_reneg(req_data);
125                                 break;
126                         case FRESCAN_REQ_CANCEL:
127                                 frescan_manager_cancel(req_data);
128                                 break;
129                         case FRESCAN_REP_NEG:
130                                 frescan_manager_repneg(req_data);
131                                 break;
132                         default:
133                                 ERROR("request type not supported\n");
134                                 assert(0);
135                 }
136
137                 if(req_data->request_node != the_networks[net].local_node) {
138                         ret = frescan_requests_free(req); // acceptor request
139                         assert(ret == 0);
140                 }
141         }
142 }
143
144 /**
145  * frescan_acceptor_thread()
146  */
147
148 static void *frescan_acceptor_thread(void *arg)
149 {
150         int ret;
151         frescan_request_id_t req;
152         frescan_network_t net = (uint32_t)arg;
153
154         DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
155
156         while(1) {
157                 ret = frescan_messages_recv_request(net, &req);
158                 assert(ret == 0);
159
160                 ret = frescan_requests_enqueue(req);
161                 assert(ret == 0);
162         }
163
164         return NULL;
165 }
166
167 /**
168  * frescan_manager_neg
169  */
170
171 static void frescan_manager_neg(frescan_request_data_t *req_data)
172 {
173         int ret;
174         bool accepted;
175
176         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
177
178         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
179                 // scheduling analysis
180                 ret = frescan_sa_add_contract
181                                 (&the_networks[req_data->net].scenario,
182                                  req_data->contract,
183                                  req_data->ss,
184                                  req_data->request_node);
185                 assert(ret == 0);
186
187                 ret = frescan_sa_sched_test
188                                 (&the_networks[req_data->net].scenario,
189                                  &accepted);
190                 assert(ret == 0);
191
192                 if (accepted) {
193                         ret = frescan_sa_get_final_values
194                                         (&the_networks[req_data->net].scenario,
195                                          req_data->ss,
196                                          req_data->request_node,
197                                          &req_data->final_values);
198                         assert(ret == 0);
199                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
200                 } else {
201                         ret = frescan_sa_remove_contract
202                                         (&the_networks[req_data->net].scenario,
203                                          req_data->ss,
204                                          req_data->request_node);
205                         assert(ret == 0);
206                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
207                 }
208
209                 // signal or reply the results
210                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
211                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
212                         ret = frescan_bwres_robjs_signal(req_data->robj);
213                         assert(ret == 0);
214                 } else {
215                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
216                               "master external, sending reply\n");
217
218                         req_data->type = FRESCAN_REP_NEG;
219                         ret = frescan_messages_send_request(req_data);
220                         assert(ret == 0);
221                 }
222         } else {
223                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
224                       "send negotiation request to master\n");
225                 ret = frescan_messages_send_request(req_data);
226                 assert(ret == 0);
227         }
228 }
229
230 /**
231  * frescan_manager_neg
232  */
233
234 static void frescan_manager_reneg(frescan_request_data_t *req_data)
235 {
236         int ret;
237         bool accepted;
238         frescan_contract_t old_contract;
239
240         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
241
242         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
243                 // scheduling analysis
244                 ret = frescan_sa_update_contract
245                                 (&the_networks[req_data->net].scenario,
246                                  req_data->ss,
247                                  req_data->request_node,
248                                  req_data->contract,
249                                  &old_contract);
250                 assert(ret == 0);
251
252                 ret = frescan_sa_sched_test
253                                 (&the_networks[req_data->net].scenario,
254                                  &accepted);
255                 assert(ret == 0);
256
257                 if (accepted) {
258                         ret = frescan_sa_get_final_values
259                                         (&the_networks[req_data->net].scenario,
260                                           req_data->ss,
261                                           req_data->request_node,
262                                           &req_data->final_values);
263                         assert(ret == 0);
264                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
265                 } else {
266                         ret = frescan_sa_update_contract
267                                         (&the_networks[req_data->net].scenario,
268                                          req_data->ss,
269                                          req_data->request_node,
270                                          &old_contract,
271                                          NULL);
272                         assert(ret == 0);
273                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
274                 }
275
276                 // signal or reply the results
277                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
278                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
279                         ret = frescan_bwres_robjs_signal(req_data->robj);
280                         assert(ret == 0);
281                 } else {
282                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
283                               "master external, sending reply\n");
284
285                         req_data->type = FRESCAN_REP_NEG;
286                         ret = frescan_messages_send_request(req_data);
287                         assert(ret == 0);
288                 }
289         } else {
290                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
291                       "send renegotiation request to master\n");
292                 ret = frescan_messages_send_request(req_data);
293                 assert(ret == 0);
294         }
295 }
296
297 /**
298  * frescan_manager_cancel
299  */
300
301 static void frescan_manager_cancel(frescan_request_data_t *req_data)
302 {
303         int ret;
304
305         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
306
307         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
308                 ret = frescan_sa_remove_contract
309                                 (&the_networks[req_data->net].scenario,
310                                  req_data->ss,
311                                  req_data->request_node);
312                 assert(ret == 0);
313         } else {
314                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
315                       "send cancel request to master\n");
316                 ret = frescan_messages_send_request(req_data);
317                 assert(ret == 0);
318         }
319
320         if (req_data->request_node == the_networks[req_data->net].local_node) {
321                 ret = frescan_bwres_robjs_signal(req_data->robj);
322                 assert(ret == 0);
323         }
324 }
325
326 /**
327  * frescan_manager_repneg
328  */
329
330 static void frescan_manager_repneg(frescan_request_data_t *req_data)
331 {
332         int ret;
333         frescan_request_data_t *neg_req_data;
334
335         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
336
337         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
338         assert(ret == 0);
339
340         neg_req_data->return_value = req_data->return_value;
341         neg_req_data->final_values = req_data->final_values;
342
343         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
344         assert(ret == 0);
345 }