]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
add the infraestructure for storing the negotiated contracts and performing the analy...
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  */
16
17 #include <assert.h>
18 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
19 #include "frescan_bwres_threads.h"
20 #include "frescan_bwres_messages.h"
21 #include "frescan_bwres_requests.h"
22 #include "frescan_bwres_analysis.h"
23 #include "frescan_config.h"
24 #include "frescan_debug.h"
25 #include "frescan_data.h"
26 #include "frescan_servers.h"
27
28 /**
29  * frescan_manager_thread_create()
30  *
31  * This call creates the manager thread at each node which will be waiting
32  * in a request queue for LOCAL or EXTERNAL requests.
33  */
34
35 static void *frescan_manager_thread(void *arg);
36
37 int frescan_manager_thread_create(frescan_network_t net)
38 {
39         int ret;
40         fosa_thread_attr_t attr;
41
42         ret = fosa_thread_attr_init(&attr);
43         if (ret != 0) return ret;
44
45         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
46         if (ret != 0) return ret;
47
48         ret = fosa_thread_create(&the_networks[net].manager_thread_id,
49                                  &attr,
50                                  frescan_manager_thread,
51                                  (void *)(uint32_t)net);
52         if (ret != 0) return ret;
53
54         ret = fosa_thread_attr_destroy(&attr);
55         if (ret != 0) return ret;
56
57         return 0;
58 }
59
60 /**
61  * frescan_acceptor_thread_create()
62  *
63  * This call creates the acceptor thread which will be waiting negotiation
64  * messages from the network and converting them into requests.
65  */
66
67 static void *frescan_acceptor_thread(void *arg);
68
69 int frescan_acceptor_thread_create(frescan_network_t net)
70 {
71         int ret;
72         fosa_thread_attr_t attr;
73
74         ret = fosa_thread_attr_init(&attr);
75         if (ret != 0) return ret;
76
77         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
78         if (ret != 0) return ret;
79
80         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
81                                  &attr,
82                                  frescan_acceptor_thread,
83                                  (void *)(uint32_t)net);
84         if (ret != 0) return ret;
85
86         ret = fosa_thread_attr_destroy(&attr);
87         if (ret != 0) return ret;
88
89         return 0;
90 }
91
92 /**
93  * frescan_manager_thread
94  */
95
96 static void frescan_manager_neg(frescan_request_data_t *req_data);
97 static void frescan_manager_reneg(frescan_request_data_t *req_data);
98 static void frescan_manager_cancel(frescan_request_data_t *req_data);
99 static void frescan_manager_repneg(frescan_request_data_t *req_data);
100
101 static void *frescan_manager_thread(void *arg)
102 {
103         int ret;
104         frescan_request_id_t   req;
105         frescan_request_data_t *req_data;
106         frescan_network_t net = (uint32_t)arg;
107
108         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "manager thread starts\n");
109
110         while(1) {
111                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "wait for a request\n");
112
113                 ret = frescan_requests_dequeue(&req);
114                 assert(ret == 0);
115
116                 ret = frescan_requests_get_data(req, &req_data);
117                 assert(ret == 0);
118
119                 switch(req_data->type) {
120                         case FRESCAN_REQ_NEG:
121                                 frescan_manager_neg(req_data);
122                                 break;
123                         case FRESCAN_REQ_RENEG:
124                                 frescan_manager_reneg(req_data);
125                                 break;
126                         case FRESCAN_REQ_CANCEL:
127                                 frescan_manager_cancel(req_data);
128                                 break;
129                         case FRESCAN_REP_NEG:
130                                 frescan_manager_repneg(req_data);
131                                 break;
132                         default:
133                                 ERROR("request type not supported\n");
134                                 assert(0);
135                 }
136
137                 if(req_data->request_node != the_networks[net].local_node) {
138                         ret = frescan_requests_free(req);
139                         assert(ret == 0);
140                 }
141         }
142 }
143
144 /**
145  * frescan_acceptor_thread()
146  */
147
148 static void *frescan_acceptor_thread(void *arg)
149 {
150         int ret;
151         frescan_request_id_t req;
152         frescan_network_t net = (uint32_t)arg;
153
154         DEBUG(FRESCAN_ACCEPTOR_ENABLE_DEBUG, "acceptor thread starts\n");
155
156         while(1) {
157                 ret = frescan_messages_recv_request(net, &req);
158                 assert(ret == 0);
159
160                 ret = frescan_requests_enqueue(req);
161                 assert(ret == 0);
162         }
163
164         return NULL;
165 }
166
167 /**
168  * frescan_manager_neg
169  */
170
171 static void frescan_manager_neg(frescan_request_data_t *req_data)
172 {
173         int ret;
174         bool accepted;
175
176         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "negotiation request\n");
177
178         if (the_networks[req_data->net].local_node == FRESCAN_NEG_MASTER_NODE) {
179                 // scheduling analysis
180                 ret = frescan_sa_add_contract
181                                 (&the_networks[req_data->net].scenario,
182                                  req_data->contract,
183                                  req_data->ss,
184                                  req_data->request_node);
185                 assert(ret == 0);
186
187                 ret = frescan_sa_sched_test
188                                 (&the_networks[req_data->net].scenario,
189                                  &accepted);
190                 assert(ret == 0);
191
192                 if (accepted) {
193                         ret = frescan_sa_get_final_values
194                                         (&the_networks[req_data->net].scenario,
195                                          req_data->ss,
196                                          req_data->request_node,
197                                          &req_data->final_values);
198                         assert(ret == 0);
199                         req_data->return_value = FRESCAN_REQ_ACCEPTED;
200                 } else {
201                         ret = frescan_sa_remove_contract
202                                         (&the_networks[req_data->net].scenario,
203                                          req_data->ss,
204                                          req_data->request_node);
205                         assert(ret == 0);
206                         req_data->return_value = FRESCAN_REQ_NOT_ACCEPTED;
207                 }
208
209                 // signal or reply the results
210                 if (req_data->request_node == FRESCAN_NEG_MASTER_NODE) {
211                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "master local\n");
212                         ret = frescan_bwres_robjs_signal(req_data->robj);
213                         assert(ret == 0);
214                 } else {
215                         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
216                               "master external, sending reply\n");
217
218                         req_data->type = FRESCAN_REP_NEG;
219                         ret = frescan_messages_send_request(req_data);
220                         assert(ret == 0);
221                 }
222         } else {
223                 DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG,
224                       "send negotiation request to master\n");
225                 ret = frescan_messages_send_request(req_data);
226                 assert(ret == 0);
227         }
228 }
229
230 /**
231  * frescan_manager_neg
232  */
233
234 static void frescan_manager_reneg(frescan_request_data_t *req_data)
235 {
236         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "renegotiation request\n");
237 }
238
239 /**
240  * frescan_manager_neg
241  */
242
243 static void frescan_manager_cancel(frescan_request_data_t *req_data)
244 {
245         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "cancel request\n");
246 }
247
248 /**
249  * frescan_manager_neg
250  */
251
252 static void frescan_manager_repneg(frescan_request_data_t *req_data)
253 {
254         int ret;
255         frescan_request_data_t *neg_req_data;
256
257         DEBUG(FRESCAN_MANAGER_ENABLE_DEBUG, "reply to neg request\n");
258
259         ret = frescan_requests_get_data(req_data->req, &neg_req_data);
260         assert(ret == 0);
261
262         neg_req_data->return_value = req_data->return_value;
263         neg_req_data->final_values = req_data->final_values;
264
265         ret = frescan_bwres_robjs_signal(neg_req_data->robj);
266         assert(ret == 0);
267 }