]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Merge branch 'michal-devel' into martin-devel
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include <error.h>
5 #include <errno.h>
6 #include "fwp_confdefs.h"
7 #include "fwp.h"
8
9 #include "fwp_contract_table.h"
10 #include "fwp_participant_table.h"
11 #include "fwp_admctrl.h"
12 #include "fwp_mngt.h"
13 #include "gui.h"
14
15 #define FWP_MTU         2346
16 #define BUFFSIZE        FWP_MTU 
17
18 /* Admission control test */
19 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
20
21 /**
22  * fwp_mngt_input 
23  *
24  * Function waits for remote or local message 
25  * 
26  * @msgb  received message 
27  * \return 
28  * On success, it returns 0 and the pointer to received message in msgb parameter.
29  * On error, it returns negative error code
30  *
31  */
32 int fwp_mngr_input(struct fwp_msgb **pmsgb)
33 {
34         /* buffer and socket for incomming message */
35         static unsigned char    buffer[FWP_MTU];
36         struct fwp_msgb *msgb;
37         ssize_t size, expected;
38         struct fwp_msg_header *header = (void*)buffer;
39
40         FWP_DEBUG("Waiting for messages\n");
41         /* TODO: consider to replace with fwp_mngt_recv call */
42         size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
43         if (size < 0) {
44                 return size;
45         }
46         if (size < sizeof(*header)) {
47                 return -EPROTO;
48                 /* TODO: Use errno for error reporting */
49         }
50         expected = ntohs(header->length)-sizeof(*header);
51         size = fwp_recv(fwp_participant_this->epointd, buffer+size,
52                         expected, 0);
53         if (size < expected) {
54                 return -EPROTO;
55         }
56          
57         /* For future: fwp_socket could be allocated behind data in msgb*/
58         if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
59                 perror("No memory available.\n");
60                 return -ENOMEM;
61         }
62         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
63         msgb->data = buffer;
64         fwp_msgb_put(msgb, ntohs(header->length));
65         
66         *pmsgb = msgb;
67         return (0);
68 }
69
70 /** 
71  * Processes hello message.
72  * 
73  * @param msgb 
74  * @param participant_id 
75  * 
76  * @return Zero on success, -1 on error.
77  */
78 int fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
79 {
80         fwp_participant_info_t participant_info, my_info;
81         fwp_participant_t *participant;
82         fwp_endpoint_attr_t attr;
83         int ret;
84
85         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
86                         participant_id.node_id, participant_id.app_id);
87
88         fwp_endpoint_attr_init(&attr);
89         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
90         /* Create a new participant */
91         fwp_msg_hello_out(msgb->data, &participant_info);
92         participant = fwp_participant_new(&participant_info);
93         if (!participant)
94                 return -1;
95         ret = fwp_mngt_service_vres_create(&participant->service_contract,
96                                            &participant->vresd);
97         if (ret < 0)
98                 goto err_vres;
99         ret = fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
100                                        &attr, &participant->epointd);
101         if (ret < 0)
102                 goto err_endpoint;
103         ret = fwp_send_endpoint_bind(participant->epointd, participant->vresd);
104         if (ret < 0)
105                 goto err_bind;
106         fwp_contract_table_init(&participant->contract_table);
107
108         /* Insert participant into table */
109         fwp_participant_table_insert(participant);
110
111         /* Send back hello msg with mngr`s info */
112         /* prepare hello message */
113         fwp_msgb_reset_data(msgb);
114         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
115         
116         my_info.id = fwp_participant_this->id;
117         my_info.stream_id = fwp_participant_this->stream_id;
118
119         fwp_msg_hello_in(msgb->tail, &my_info);
120         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
121
122         /* Send hello to manager */
123         ret = fwp_mngt_send(FWP_MSG_HELLO, msgb, 
124                             fwp_participant_this, participant);
125         if (ret < 0)
126                 goto err_send;
127
128         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
129                         participant_id.node_id, participant_id.app_id);
130         return 0;
131 err_send:
132         fwp_send_endpoint_unbind(participant->epointd);
133 err_bind:
134         fwp_endpoint_destroy(participant->epointd);
135 err_endpoint:
136         /* FIXME: This function is probably not the opposite of
137          * fwp_mngt_service_vres_create(), beacuse it doesn't delete
138          * the service contract. */
139         fwp_vres_destroy(participant->vresd);
140         fwp_contract_destroy(participant->service_contract);
141 err_vres:
142         fwp_participant_delete(participant);
143         return -1;
144 }
145
146 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
147 {       
148         fwp_participant_t *participant;
149         fwp_contract_data_t* contdata;
150
151         /* Find participant */
152         if (!(participant = fwp_participant_table_find(&participant_id))){
153                 return -EPERM;
154         }
155
156         /* TODO: Check for errors */
157         fwp_participant_table_delete(participant);
158         fwp_send_endpoint_unbind(participant->epointd);
159         fwp_endpoint_destroy(participant->epointd);
160         fwp_vres_destroy(participant->vresd);
161
162         /* Delete all participant's contracts */
163         for (contdata = fwp_contract_table_foreach_begin(&participant->contract_table);
164              contdata;
165              contdata = fwp_contract_table_foreach_next(&participant->contract_table, contdata)) {
166                 fwp_contract_destroy(contdata);
167         }
168         fwp_contract_table_foreach_end(&participant->contract_table);
169
170         fwp_participant_delete(participant);
171                         
172         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
173                         participant_id.app_id);
174         
175         return 0;       
176 }
177
178 int 
179 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
180 {
181         fwp_participant_t *participant;
182         fwp_contract_data_t *contdata;
183
184         /* Find participant */
185         if (!(participant = fwp_participant_table_find(&participant_id))){
186                 return -EPERM;
187         }
188
189         contdata = fwp_contract_data_new();
190         
191         /* Extract contract header */
192         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
193         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
194         /* Extract contract params */
195         fwp_msg_contract_out(msgb->data, &contdata->contract);
196         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
197
198         /*launch admission test */
199         fwp_admctrl_test(contdata);             
200         
201         free(msgb);
202         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
203                                         sizeof(struct fwp_msg_contract) +
204                                         sizeof(struct fwp_msg_vres_params));
205         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
206         
207         /*Add contract header*/
208         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
209         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
210         /* Add contract params */
211         /* No needed to send back if spare capacity is not considered
212          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
213          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
214          * */
215         
216         /*Send back contract reservation */
217         if (contdata->status == FWP_CONT_RESERVED) {
218                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
219                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
220                                 contdata->vres_params.budget,
221                                 contdata->vres_params.period_usec,
222                                 contdata->vres_params.ac_id);
223                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
224                 /* Add contract to contract table */
225                 fwp_contract_table_insert(&participant->contract_table,contdata);
226                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
227
228         } else {
229                 free(contdata);
230         }       
231         
232         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
233                         fwp_participant_this, participant);
234         return 0;
235 }
236
237 int 
238 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
239 {
240         fwp_participant_t *participant;
241         fwp_contract_data_t *contdata;
242         fwp_contract_id_t  id;
243         fwp_contract_status_t  status;
244
245         /* Find participant */
246         if (!(participant = fwp_participant_table_find(&participant_id))){
247                 return -EPERM;
248         }
249
250         fwp_msg_contracthdr_out(msgb->data, &id, &status);
251         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
252         FWP_DEBUG("Contract id=%d to commit\n", id);
253         
254         contdata = fwp_contract_table_find(&participant->contract_table, id);
255         contdata->status = FWP_CONT_NEGOTIATED;
256
257         /* TODO: Send response to confirm reception */
258         
259         return 0;       
260 }
261
262 int 
263 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
264 {
265         fwp_participant_t *participant;
266         fwp_contract_data_t *contdata;
267         fwp_contract_id_t  id;
268         fwp_contract_status_t  status;
269
270         /* Find participant */
271         if (!(participant = fwp_participant_table_find(&participant_id))){
272                 return -EPERM;
273         }
274
275         fwp_msg_contracthdr_out(msgb->data, &id, &status);
276         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
277         
278         contdata = fwp_contract_table_find(&participant->contract_table, id);
279         contdata->status = FWP_CONT_NOTNEGOTIATED;
280         /* release vres - success only for local vres */
281         fwp_vres_destroy(contdata->vresd); 
282         /* delete contract from contract table */
283         fwp_contract_table_delete(&participant->contract_table, contdata);
284         fwp_contract_destroy(contdata);
285
286         /* Update admission data (only necessary for demo and GUI) */
287         fwp_admctrl_test(NULL);         
288         
289         FWP_DEBUG("Contract id=%d to canceled\n", id);
290                 
291         return 0;       
292 }
293
294 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
295 {
296         fwp_msg_type_t msg_type;
297         fwp_participant_id_t    participant_id;
298         int ret = 0;
299
300         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
301         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
302         
303         switch (msg_type) {
304                 case  FWP_MSG_HELLO:
305                         FWP_DEBUG("Message HELLO received from nodeid = %d "
306                                   "appid = %d\n", participant_id.node_id, 
307                                         participant_id.app_id);
308                         ret = fwp_mngr_hello(msgb, participant_id);
309                         if (ret < 0) {
310                                 ret = fwp_mngr_bye(msgb, participant_id);
311                                 if (ret < 0) {
312                                         error(0, errno, "Cannot send bye");
313                                 }
314                         }
315                         break;
316                 
317                 case  FWP_MSG_BYE:
318                         FWP_DEBUG("Message BYE received from nodeid = %d "
319                                   "appid = %d\n", participant_id.node_id, 
320                                         participant_id.app_id);
321                         fwp_mngr_bye(msgb, participant_id);
322                         break;
323
324
325                 case  FWP_MSG_RESERVE: 
326                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
327                                   "appid = %d\n", participant_id.node_id, 
328                                         participant_id.app_id);
329                         fwp_mngr_contract_reserve(msgb, participant_id);
330                         break;
331
332                 case  FWP_MSG_COMMIT: 
333                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
334                                   "appid = %d\n", participant_id.node_id, 
335                                         participant_id.app_id);
336                         fwp_mngr_contract_commit(msgb, participant_id);
337                         break;  
338                 
339                 case  FWP_MSG_CANCEL: 
340                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
341                                   "appid = %d\n", participant_id.node_id, 
342                                         participant_id.app_id);
343                         fwp_mngr_contract_cancel(msgb, participant_id);
344                         break;  
345                 
346                 default:
347                         printf("Invalid message\n.");
348                         fwp_msgb_free(msgb);
349         }
350 }
351
352 void fwp_mngr_main_loop()
353 {
354         struct fwp_msgb *msgb;
355         int rv;
356
357         /* start admission control thread */
358         while (1 /*exit_flag*/){
359                 gui_print_status();
360                 rv = fwp_mngr_input(&msgb);
361                 if (rv == 0 && msgb)
362                         fwp_mngr_msg_handler(msgb);
363                 FWP_DEBUG("Mngr waiting for next msg.\n");
364         }
365 }
366
367 int main()
368 {
369         if (fwp_init()) {
370                 fprintf(stderr,"FWP manager initialization failed.\n");
371                 exit(1);
372         }
373
374         gui_init();
375         fwp_mngr_main_loop();
376         gui_end();
377         
378         return 0;       
379