]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Utilization based test incorporated to FWP manager
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
10 #include "fwp_mngt.h"
11
12 #define FWP_MTU         2346
13 #define BUFFSIZE        FWP_MTU 
14
15 /* Admission control test */
16 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
17
18 /**
19  * fwp_mngt_input 
20  *
21  * Function waits for remote or local message 
22  * 
23  * @msgb  received message 
24  * \return 
25  * On success, it returns 0 and the pointer to received message in msgb parameter.
26  * On error, it returns negative error code
27  *
28  */
29 int fwp_mngr_input(struct fwp_msgb **pmsgb)
30 {
31         /* buffer and socket for incomming message */
32         static unsigned char    buffer[FWP_MTU];
33         struct fwp_msgb *msgb;
34         ssize_t size, expected;
35         struct fwp_msg_header *header = (void*)buffer;
36
37         FWP_DEBUG("Waiting for messages\n");
38         /* TODO: consider to replace with fwp_mngt_recv call */
39         size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
40         if (size < 0) {
41                 return size;
42         }
43         if (size < sizeof(*header)) {
44                 return -EPROTO;
45                 /* TODO: Use errno for error reporting */
46         }
47         expected = ntohs(header->length)-sizeof(*header);
48         size = fwp_recv(fwp_participant_this->epointd, buffer+size,
49                         expected, 0);
50         if (size < expected) {
51                 return -EPROTO;
52         }
53          
54         /* For future: fwp_socket could be allocated behind data in msgb*/
55         if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
56                 perror("No memory available.\n");
57                 return -ENOMEM;
58         }
59         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
60         msgb->data = buffer;
61         fwp_msgb_put(msgb, ntohs(header->length));
62         
63         *pmsgb = msgb;
64         return (0);
65 }
66
67 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
68 {
69         fwp_participant_info_t participant_info, my_info;
70         fwp_participant_t *participant;
71         fwp_endpoint_attr_t attr;
72
73         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
74                         participant_id.node_id, participant_id.app_id);
75
76         fwp_endpoint_attr_init(&attr);
77         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
78         /* Create a new participant */
79         fwp_msg_hello_out(msgb->data, &participant_info);
80         participant = fwp_participant_new(&participant_info);
81         fwp_mngt_service_vres_create(&participant->vresd);
82         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
83                                         &attr, &participant->epointd);
84         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
85         fwp_contract_table_init(&participant->contract_table);
86
87         /* Insert participant into table */
88         fwp_participant_table_insert(participant);
89
90         /* Send back hello msg with mngr`s info */
91         /* prepare hello message */
92         fwp_msgb_reset_data(msgb);
93         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
94         
95         my_info.id = fwp_participant_this->id;
96         my_info.stream_id = fwp_participant_this->stream_id;
97
98         fwp_msg_hello_in(msgb->tail, &my_info);
99         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
100
101         /* Send hello to manager */
102         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
103                         fwp_participant_this, participant);
104
105         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
106                         participant_id.node_id, participant_id.app_id);
107 }
108
109 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
110 {       
111         fwp_participant_t *participant;
112
113         /* Find participant */
114         if (!(participant = fwp_participant_table_find(&participant_id))){
115                 return -EPERM;
116         }
117         
118         fwp_participant_table_delete(participant);
119         fwp_send_endpoint_unbind(participant->epointd);
120         fwp_endpoint_destroy(participant->epointd);
121         fwp_vres_destroy(participant->vresd);
122         /* TODO: iterate through contract table and delete contracts */
123         fwp_participant_delete(participant);
124                         
125         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
126                         participant_id.app_id);
127         
128         return 0;       
129 }
130
131 int 
132 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
133 {
134         fwp_participant_t *participant;
135         fwp_contract_data_t *contdata;
136
137         /* Find participant */
138         if (!(participant = fwp_participant_table_find(&participant_id))){
139                 return -EPERM;
140         }
141
142         contdata = fwp_contract_data_new();
143         
144         /* Extract contract header */
145         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
146         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
147         /* Extract contract params */
148         fwp_msg_contract_out(msgb->data, &contdata->contract);
149         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
150
151         /*launch admission test */
152         fwp_admctrl_test(contdata);             
153         
154         free(msgb);
155         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
156                                         sizeof(struct fwp_msg_contract) +
157                                         sizeof(struct fwp_msg_vres_params));
158         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
159         
160         /*Add contract header*/
161         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
162         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
163         /* Add contract params */
164         /* No needed to send back if spare capacity is not considered
165          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
166          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
167          * */
168         
169         /*Send back contract reservation */
170         if (contdata->status == FWP_CONT_RESERVED) {
171                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
172                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
173                                 contdata->vres_params.budget,
174                                 contdata->vres_params.period_usec,
175                                 contdata->vres_params.ac_id);
176                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
177                 /* Add contract to contract table */
178                 fwp_contract_table_insert(&participant->contract_table,contdata);
179                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
180
181         } else {
182                 free(contdata);
183         }       
184         
185         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
186                         fwp_participant_this, participant);
187         return 0;
188 }
189
190 int 
191 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
192 {
193         fwp_participant_t *participant;
194         fwp_contract_data_t *contdata;
195         fwp_contract_id_t  id;
196         fwp_contract_status_t  status;
197
198         /* Find participant */
199         if (!(participant = fwp_participant_table_find(&participant_id))){
200                 return -EPERM;
201         }
202
203         fwp_msg_contracthdr_out(msgb->data, &id, &status);
204         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
205         FWP_DEBUG("Contract id=%d to commit\n", id);
206         
207         contdata = fwp_contract_table_find(&participant->contract_table, id);
208         contdata->status = FWP_CONT_NEGOTIATED;
209
210         /* TODO: Send response to confirm reception */
211         
212         return 0;       
213 }
214
215 int 
216 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
217 {
218         fwp_participant_t *participant;
219         fwp_contract_data_t *contdata;
220         fwp_contract_id_t  id;
221         fwp_contract_status_t  status;
222
223         /* Find participant */
224         if (!(participant = fwp_participant_table_find(&participant_id))){
225                 return -EPERM;
226         }
227
228         fwp_msg_contracthdr_out(msgb->data, &id, &status);
229         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
230         
231         contdata = fwp_contract_table_find(&participant->contract_table, id);
232         contdata->status = FWP_CONT_NOTNEGOTIATED;
233         /* release vres - success only for local vres */
234         fwp_vres_destroy(contdata->vresd); 
235         /* delete contract from contract table */
236         fwp_contract_table_delete(&participant->contract_table, contdata);
237         fwp_contract_destroy(contdata);
238         
239         FWP_DEBUG("Contract id=%d to canceled\n", id);
240                 
241         return 0;       
242 }
243
244 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
245 {
246         fwp_msg_type_t msg_type;
247         fwp_participant_id_t    participant_id;
248
249         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
250         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
251         
252         switch (msg_type) {
253                 case  FWP_MSG_HELLO:
254                         FWP_DEBUG("Message HELLO received from nodeid = %d "
255                                   "appid = %d\n", participant_id.node_id, 
256                                         participant_id.app_id);
257                         fwp_mngr_hello(msgb, participant_id);
258                         break;
259                 
260                 case  FWP_MSG_BYE:
261                         FWP_DEBUG("Message BYE received from nodeid = %d "
262                                   "appid = %d\n", participant_id.node_id, 
263                                         participant_id.app_id);
264                         fwp_mngr_bye(msgb, participant_id);
265                         break;
266
267
268                 case  FWP_MSG_RESERVE: 
269                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
270                                   "appid = %d\n", participant_id.node_id, 
271                                         participant_id.app_id);
272                         fwp_mngr_contract_reserve(msgb, participant_id);
273                         break;
274
275                 case  FWP_MSG_COMMIT: 
276                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
277                                   "appid = %d\n", participant_id.node_id, 
278                                         participant_id.app_id);
279                         fwp_mngr_contract_commit(msgb, participant_id);
280                         break;  
281                 
282                 case  FWP_MSG_CANCEL: 
283                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
284                                   "appid = %d\n", participant_id.node_id, 
285                                         participant_id.app_id);
286                         fwp_mngr_contract_cancel(msgb, participant_id);
287                         break;  
288                 
289                 default:
290                         printf("Invalid message\n.");
291                         fwp_msgb_free(msgb);
292         }
293 }
294
295 void fwp_mngr_main_loop()
296 {
297         struct fwp_msgb *msgb;
298         int rv;
299
300         /* start admission control thread */
301         while (1 /*exit_flag*/){
302                 rv = fwp_mngr_input(&msgb);
303                 if (rv == 0 && msgb)
304                         fwp_mngr_msg_handler(msgb);
305                 FWP_DEBUG("Mngr waiting for next msg.\n");
306         }
307 }
308
309 int main()
310 {
311         if (fwp_init()) {
312                 fprintf(stderr,"FWP manager initialization failed.\n");
313                 exit(1);
314         }
315
316         fwp_mngr_main_loop();
317         
318         return 0;       
319