]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/old.mngr/fwp_mngr.c
Updated forb initialization in manager
[frescor/fwp.git] / fwp / old.mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include <error.h>
5 #include <errno.h>
6 #include "fwp_confdefs.h"
7 #include "fwp.h"
8
9 #include "fwp_contract_table.h"
10 #include "fwp_participant_table.h"
11 #include "fwp_admctrl.h"
12 #include "fwp_mngt.h"
13 #include "gui.h"
14
15 #define FWP_MTU         2346
16 #define BUFFSIZE        FWP_MTU 
17
18 /* Admission control test */
19 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
20
21 /**
22  * fwp_mngt_input 
23  *
24  * Function waits for remote or local message 
25  * 
26  * @msgb  received message 
27  * \return 
28  * On success, it returns 0 and the pointer to received message in msgb parameter.
29  * On error, it returns negative error code
30  *
31  */
32 int fwp_mngr_input(struct fwp_msgb **pmsgb)
33 {
34         /* buffer and socket for incomming message */
35         static unsigned char    buffer[FWP_MTU];
36         struct fwp_msgb *msgb;
37         ssize_t size, expected;
38         struct fwp_msg_header *header = (void*)buffer;
39
40         FWP_DEBUG("Waiting for messages\n");
41         /* TODO: consider to replace with fwp_mngt_recv call */
42         size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
43         if (size < 0) {
44                 return size;
45         }
46         if (size < sizeof(*header)) {
47                 return -EPROTO;
48                 /* TODO: Use errno for error reporting */
49         }
50         expected = ntohs(header->length)-sizeof(*header);
51         size = fwp_recv(fwp_participant_this->epointd, buffer+size,
52                         expected, 0);
53         if (size < expected) {
54                 return -EPROTO;
55         }
56          
57         /* For future: fwp_socket could be allocated behind data in msgb*/
58         if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
59                 perror("No memory available.\n");
60                 return -ENOMEM;
61         }
62         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
63         msgb->data = buffer;
64         fwp_msgb_put(msgb, ntohs(header->length));
65         
66         *pmsgb = msgb;
67         return (0);
68 }
69
70 /** 
71  * Processes hello message.
72  * 
73  * @param msgb 
74  * @param participant_id 
75  * 
76  * @return Zero on success, -1 on error.
77  */
78 int fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
79 {
80         fwp_participant_info_t participant_info, my_info;
81         fwp_participant_t *participant;
82         fwp_endpoint_attr_t attr;
83         int ret;
84
85         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
86                         participant_id.node_id, participant_id.app_id);
87
88         fwp_endpoint_attr_init(&attr);
89         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
90         /* Create a new participant */
91         fwp_msg_hello_out(msgb->data, &participant_info);
92         participant = fwp_participant_new(&participant_info);
93         if (!participant)
94                 return -1;
95         ret = fwp_mngt_service_vres_create(&participant->service_contract,
96                                            &participant->vresd);
97         if (ret < 0)
98                 goto err_vres;
99         ret = fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
100                                        &attr, &participant->epointd);
101         if (ret < 0)
102                 goto err_endpoint;
103         ret = fwp_send_endpoint_bind(participant->epointd, participant->vresd);
104         if (ret < 0)
105                 goto err_bind;
106         fwp_contract_table_init(&participant->contract_table);
107
108         /* Insert participant into table */
109         fwp_participant_table_insert(participant);
110
111         /* Send back hello msg with mngr`s info */
112         /* prepare hello message */
113         fwp_msgb_reset_data(msgb);
114         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
115         
116         my_info.id = fwp_participant_this->id;
117         my_info.stream_id = fwp_participant_this->stream_id;
118
119         fwp_msg_hello_in(msgb->tail, &my_info);
120         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
121
122         /* Send hello to manager */
123         ret = fwp_mngt_send(FWP_MSG_HELLO, msgb, 
124                             fwp_participant_this, participant);
125         if (ret < 0)
126                 goto err_send;
127
128         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
129                         participant_id.node_id, participant_id.app_id);
130         return 0;
131 err_send:
132         fwp_send_endpoint_unbind(participant->epointd);
133 err_bind:
134         fwp_endpoint_destroy(participant->epointd);
135 err_endpoint:
136         /* FIXME: This function is probably not the opposite of
137          * fwp_mngt_service_vres_create(), beacuse it doesn't delete
138          * the service contract. */
139         /*fwp_vres_destroy(participant->vresd);*/
140         fwp_contract_destroy(participant->service_contract);
141 err_vres:
142         fwp_participant_delete(participant);
143         return -1;
144 }
145
146 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
147 {       
148         fwp_participant_t *participant;
149         fwp_contract_data_t* contdata;
150
151         /* Find participant */
152         if (!(participant = fwp_participant_table_find(&participant_id))){
153                 return -EPERM;
154         }
155
156         /* TODO: Check for errors */
157         fwp_participant_table_delete(participant);
158         fwp_send_endpoint_unbind(participant->epointd);
159         fwp_endpoint_destroy(participant->epointd);
160         fwp_contract_destroy(participant->service_contract);
161         //fwp_vres_destroy(participant->vresd);
162
163         /* Delete all participant's contracts */
164         for (contdata = fwp_contract_table_foreach_begin(&participant->contract_table);
165              contdata;
166              contdata = fwp_contract_table_foreach_next(&participant->contract_table, contdata)) {
167                 fwp_contract_destroy(contdata);
168         }
169         fwp_contract_table_foreach_end(&participant->contract_table);
170
171         fwp_participant_delete(participant);
172                         
173         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
174                         participant_id.app_id);
175         
176         return 0;       
177 }
178
179 int 
180 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
181 {
182         fwp_participant_t *participant;
183         fwp_contract_data_t *contdata;
184
185         /* Find participant */
186         if (!(participant = fwp_participant_table_find(&participant_id))){
187                 return -EPERM;
188         }
189
190         contdata = fwp_contract_data_new();
191         
192         /* Extract contract header */
193         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
194         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
195         /* Extract contract params */
196         fwp_msg_contract_out(msgb->data, &contdata->contract);
197         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
198
199         /*launch admission test */
200         fwp_admctrl_test(contdata);             
201         
202         free(msgb);
203         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
204                                         sizeof(struct fwp_msg_contract) +
205                                         sizeof(struct fwp_msg_vres_params));
206         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
207         
208         /*Add contract header*/
209         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
210         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
211         /* Add contract params */
212         /* No needed to send back if spare capacity is not considered
213          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
214          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
215          * */
216         
217         /*Send back contract reservation */
218         if (contdata->status == FWP_CONT_RESERVED) {
219                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
220                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
221                                 contdata->vres_params.budget,
222                                 contdata->vres_params.period_usec,
223                                 contdata->vres_params.ac_id);
224                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
225                 /* Add contract to contract table */
226                 fwp_contract_table_insert(&participant->contract_table,contdata);
227                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
228
229         } else {
230                 free(contdata);
231         }       
232         
233         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
234                         fwp_participant_this, participant);
235         return 0;
236 }
237
238 int 
239 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
240 {
241         fwp_participant_t *participant;
242         fwp_contract_data_t *contdata;
243         fwp_contract_id_t  id;
244         fwp_contract_status_t  status;
245
246         /* Find participant */
247         if (!(participant = fwp_participant_table_find(&participant_id))){
248                 return -EPERM;
249         }
250
251         fwp_msg_contracthdr_out(msgb->data, &id, &status);
252         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
253         FWP_DEBUG("Contract id=%d to commit\n", id);
254         
255         contdata = fwp_contract_table_find(&participant->contract_table, id);
256         contdata->status = FWP_CONT_NEGOTIATED;
257
258         /* TODO: Send response to confirm reception */
259         
260         return 0;       
261 }
262
263 int 
264 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
265 {
266         fwp_participant_t *participant;
267         fwp_contract_data_t *contdata;
268         fwp_contract_id_t  id;
269         fwp_contract_status_t  status;
270
271         /* Find participant */
272         if (!(participant = fwp_participant_table_find(&participant_id))){
273                 return -EPERM;
274         }
275
276         fwp_msg_contracthdr_out(msgb->data, &id, &status);
277         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
278         
279         contdata = fwp_contract_table_find(&participant->contract_table, id);
280         contdata->status = FWP_CONT_NOTNEGOTIATED;
281         /* release vres - success only for local vres */
282         fwp_vres_destroy(contdata->vresd); 
283         /* delete contract from contract table */
284         fwp_contract_table_delete(&participant->contract_table, contdata);
285         fwp_contract_destroy(contdata);
286
287         /* Update admission data (only necessary for demo and GUI) */
288         fwp_admctrl_test(NULL);         
289         
290         FWP_DEBUG("Contract id=%d to canceled\n", id);
291                 
292         return 0;       
293 }
294
295 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
296 {
297         fwp_msg_type_t msg_type;
298         fwp_participant_id_t    participant_id;
299         int ret = 0;
300
301         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
302         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
303         
304         switch (msg_type) {
305                 case  FWP_MSG_HELLO:
306                         FWP_DEBUG("Message HELLO received from nodeid = %d "
307                                   "appid = %d\n", participant_id.node_id, 
308                                         participant_id.app_id);
309                         ret = fwp_mngr_hello(msgb, participant_id);
310                         if (ret < 0) {
311                                 ret = fwp_mngr_bye(msgb, participant_id);
312                                 if (ret < 0) {
313                                         error(0, errno, "Cannot send bye");
314                                 }
315                         }
316                         break;
317                 
318                 case  FWP_MSG_BYE:
319                         FWP_DEBUG("Message BYE received from nodeid = %d "
320                                   "appid = %d\n", participant_id.node_id, 
321                                         participant_id.app_id);
322                         fwp_mngr_bye(msgb, participant_id);
323                         break;
324
325
326                 case  FWP_MSG_RESERVE: 
327                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
328                                   "appid = %d\n", participant_id.node_id, 
329                                         participant_id.app_id);
330                         fwp_mngr_contract_reserve(msgb, participant_id);
331                         break;
332
333                 case  FWP_MSG_COMMIT: 
334                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
335                                   "appid = %d\n", participant_id.node_id, 
336                                         participant_id.app_id);
337                         fwp_mngr_contract_commit(msgb, participant_id);
338                         break;  
339                 
340                 case  FWP_MSG_CANCEL: 
341                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
342                                   "appid = %d\n", participant_id.node_id, 
343                                         participant_id.app_id);
344                         fwp_mngr_contract_cancel(msgb, participant_id);
345                         break;  
346                 
347                 default:
348                         printf("Invalid message\n.");
349                         fwp_msgb_free(msgb);
350         }
351 }
352
353 void fwp_mngr_main_loop()
354 {
355         struct fwp_msgb *msgb;
356         int rv;
357
358         /* start admission control thread */
359         while (1 /*exit_flag*/){
360                 //gui_print_status();
361                 rv = fwp_mngr_input(&msgb);
362                 if (rv == 0 && msgb)
363                         fwp_mngr_msg_handler(msgb);
364                 FWP_DEBUG("Mngr waiting for next msg.\n");
365         }
366 }
367
368 int main()
369 {
370         if (fwp_init()) {
371                 fprintf(stderr,"FWP manager initialization failed.\n");
372                 exit(1);
373         }
374
375         //gui_init();
376         fwp_mngr_main_loop();
377         //gui_end();
378         
379         return 0;       
380