]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Restored screen on manager exit
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
10 #include "fwp_mngt.h"
11 #include "gui.h"
12
13 #define FWP_MTU         2346
14 #define BUFFSIZE        FWP_MTU 
15
16 /* Admission control test */
17 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
18
19 /**
20  * fwp_mngt_input 
21  *
22  * Function waits for remote or local message 
23  * 
24  * @msgb  received message 
25  * \return 
26  * On success, it returns 0 and the pointer to received message in msgb parameter.
27  * On error, it returns negative error code
28  *
29  */
30 int fwp_mngr_input(struct fwp_msgb **pmsgb)
31 {
32         /* buffer and socket for incomming message */
33         static unsigned char    buffer[FWP_MTU];
34         struct fwp_msgb *msgb;
35         ssize_t size, expected;
36         struct fwp_msg_header *header = (void*)buffer;
37
38         FWP_DEBUG("Waiting for messages\n");
39         /* TODO: consider to replace with fwp_mngt_recv call */
40         size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
41         if (size < 0) {
42                 return size;
43         }
44         if (size < sizeof(*header)) {
45                 return -EPROTO;
46                 /* TODO: Use errno for error reporting */
47         }
48         expected = ntohs(header->length)-sizeof(*header);
49         size = fwp_recv(fwp_participant_this->epointd, buffer+size,
50                         expected, 0);
51         if (size < expected) {
52                 return -EPROTO;
53         }
54          
55         /* For future: fwp_socket could be allocated behind data in msgb*/
56         if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
57                 perror("No memory available.\n");
58                 return -ENOMEM;
59         }
60         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
61         msgb->data = buffer;
62         fwp_msgb_put(msgb, ntohs(header->length));
63         
64         *pmsgb = msgb;
65         return (0);
66 }
67
68 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
69 {
70         fwp_participant_info_t participant_info, my_info;
71         fwp_participant_t *participant;
72         fwp_endpoint_attr_t attr;
73
74         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
75                         participant_id.node_id, participant_id.app_id);
76
77         fwp_endpoint_attr_init(&attr);
78         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
79         /* Create a new participant */
80         fwp_msg_hello_out(msgb->data, &participant_info);
81         participant = fwp_participant_new(&participant_info);
82         fwp_mngt_service_vres_create(&participant->vresd);
83         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
84                                         &attr, &participant->epointd);
85         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
86         fwp_contract_table_init(&participant->contract_table);
87
88         /* Insert participant into table */
89         fwp_participant_table_insert(participant);
90
91         /* Send back hello msg with mngr`s info */
92         /* prepare hello message */
93         fwp_msgb_reset_data(msgb);
94         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
95         
96         my_info.id = fwp_participant_this->id;
97         my_info.stream_id = fwp_participant_this->stream_id;
98
99         fwp_msg_hello_in(msgb->tail, &my_info);
100         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
101
102         /* Send hello to manager */
103         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
104                         fwp_participant_this, participant);
105
106         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
107                         participant_id.node_id, participant_id.app_id);
108 }
109
110 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
111 {       
112         fwp_participant_t *participant;
113
114         /* Find participant */
115         if (!(participant = fwp_participant_table_find(&participant_id))){
116                 return -EPERM;
117         }
118         
119         fwp_participant_table_delete(participant);
120         fwp_send_endpoint_unbind(participant->epointd);
121         fwp_endpoint_destroy(participant->epointd);
122         fwp_vres_destroy(participant->vresd);
123         /* TODO: iterate through contract table and delete contracts */
124         fwp_participant_delete(participant);
125                         
126         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
127                         participant_id.app_id);
128         
129         return 0;       
130 }
131
132 int 
133 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
134 {
135         fwp_participant_t *participant;
136         fwp_contract_data_t *contdata;
137
138         /* Find participant */
139         if (!(participant = fwp_participant_table_find(&participant_id))){
140                 return -EPERM;
141         }
142
143         contdata = fwp_contract_data_new();
144         
145         /* Extract contract header */
146         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
147         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
148         /* Extract contract params */
149         fwp_msg_contract_out(msgb->data, &contdata->contract);
150         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
151
152         /*launch admission test */
153         fwp_admctrl_test(contdata);             
154         
155         free(msgb);
156         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
157                                         sizeof(struct fwp_msg_contract) +
158                                         sizeof(struct fwp_msg_vres_params));
159         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
160         
161         /*Add contract header*/
162         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
163         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
164         /* Add contract params */
165         /* No needed to send back if spare capacity is not considered
166          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
167          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
168          * */
169         
170         /*Send back contract reservation */
171         if (contdata->status == FWP_CONT_RESERVED) {
172                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
173                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
174                                 contdata->vres_params.budget,
175                                 contdata->vres_params.period_usec,
176                                 contdata->vres_params.ac_id);
177                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
178                 /* Add contract to contract table */
179                 fwp_contract_table_insert(&participant->contract_table,contdata);
180                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
181
182         } else {
183                 free(contdata);
184         }       
185         
186         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
187                         fwp_participant_this, participant);
188         return 0;
189 }
190
191 int 
192 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
193 {
194         fwp_participant_t *participant;
195         fwp_contract_data_t *contdata;
196         fwp_contract_id_t  id;
197         fwp_contract_status_t  status;
198
199         /* Find participant */
200         if (!(participant = fwp_participant_table_find(&participant_id))){
201                 return -EPERM;
202         }
203
204         fwp_msg_contracthdr_out(msgb->data, &id, &status);
205         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
206         FWP_DEBUG("Contract id=%d to commit\n", id);
207         
208         contdata = fwp_contract_table_find(&participant->contract_table, id);
209         contdata->status = FWP_CONT_NEGOTIATED;
210
211         /* TODO: Send response to confirm reception */
212         
213         return 0;       
214 }
215
216 int 
217 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
218 {
219         fwp_participant_t *participant;
220         fwp_contract_data_t *contdata;
221         fwp_contract_id_t  id;
222         fwp_contract_status_t  status;
223
224         /* Find participant */
225         if (!(participant = fwp_participant_table_find(&participant_id))){
226                 return -EPERM;
227         }
228
229         fwp_msg_contracthdr_out(msgb->data, &id, &status);
230         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
231         
232         contdata = fwp_contract_table_find(&participant->contract_table, id);
233         contdata->status = FWP_CONT_NOTNEGOTIATED;
234         /* release vres - success only for local vres */
235         fwp_vres_destroy(contdata->vresd); 
236         /* delete contract from contract table */
237         fwp_contract_table_delete(&participant->contract_table, contdata);
238         fwp_contract_destroy(contdata);
239         
240         FWP_DEBUG("Contract id=%d to canceled\n", id);
241                 
242         return 0;       
243 }
244
245 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
246 {
247         fwp_msg_type_t msg_type;
248         fwp_participant_id_t    participant_id;
249
250         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
251         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
252         
253         switch (msg_type) {
254                 case  FWP_MSG_HELLO:
255                         FWP_DEBUG("Message HELLO received from nodeid = %d "
256                                   "appid = %d\n", participant_id.node_id, 
257                                         participant_id.app_id);
258                         fwp_mngr_hello(msgb, participant_id);
259                         break;
260                 
261                 case  FWP_MSG_BYE:
262                         FWP_DEBUG("Message BYE received from nodeid = %d "
263                                   "appid = %d\n", participant_id.node_id, 
264                                         participant_id.app_id);
265                         fwp_mngr_bye(msgb, participant_id);
266                         break;
267
268
269                 case  FWP_MSG_RESERVE: 
270                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
271                                   "appid = %d\n", participant_id.node_id, 
272                                         participant_id.app_id);
273                         fwp_mngr_contract_reserve(msgb, participant_id);
274                         break;
275
276                 case  FWP_MSG_COMMIT: 
277                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
278                                   "appid = %d\n", participant_id.node_id, 
279                                         participant_id.app_id);
280                         fwp_mngr_contract_commit(msgb, participant_id);
281                         break;  
282                 
283                 case  FWP_MSG_CANCEL: 
284                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
285                                   "appid = %d\n", participant_id.node_id, 
286                                         participant_id.app_id);
287                         fwp_mngr_contract_cancel(msgb, participant_id);
288                         break;  
289                 
290                 default:
291                         printf("Invalid message\n.");
292                         fwp_msgb_free(msgb);
293         }
294 }
295
296 void fwp_mngr_main_loop()
297 {
298         struct fwp_msgb *msgb;
299         int rv;
300
301         /* start admission control thread */
302         while (1 /*exit_flag*/){
303                 gui_print_status();
304                 rv = fwp_mngr_input(&msgb);
305                 if (rv == 0 && msgb)
306                         fwp_mngr_msg_handler(msgb);
307                 FWP_DEBUG("Mngr waiting for next msg.\n");
308         }
309 }
310
311 int main()
312 {
313         if (fwp_init()) {
314                 fprintf(stderr,"FWP manager initialization failed.\n");
315                 exit(1);
316         }
317
318         gui_init();
319         fwp_mngr_main_loop();
320         gui_end();
321         
322         return 0;       
323