]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Applied Michal's fixes
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
10 #include "fwp_mngt.h"
11
12 #define FWP_MTU         2346  
13 #define BUFFSIZE        FWP_MTU 
14
15 /* Admission control test */
16 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
17
18 /**
19  * fwp_mngt_input 
20  *
21  * Function waits for remote or local message 
22  * 
23  * @msgb  received message 
24  * \return 
25  * On success, it returns 0 and the pointer to received message in msgb parameter.
26  * On error, it returns negative error code
27  *
28  */
29 int fwp_mngr_input(struct fwp_msgb **pmsgb)
30 {
31         /* buffer and socket for incomming message */
32         static unsigned char    buffer[FWP_MTU];
33         struct fwp_msgb *msgb;
34         ssize_t size;
35
36         FWP_DEBUG("Waiting for messages\n");
37         /* TODO: consider to replace with fwp_mngt_recv call */
38         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
39          
40         /* For future: fwp_socket could be allocated behind data in msgb*/
41         if (!(msgb = fwp_msgb_alloc(size))) {
42                 perror("No memory available.\n");
43                 return -ENOMEM;
44         }
45         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
46         msgb->data = buffer;
47         fwp_msgb_put(msgb, size);
48         
49         *pmsgb = msgb;
50         return (0);
51 }
52
53 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
54 {
55         fwp_participant_info_t participant_info, my_info;
56         fwp_participant_t *participant;
57         fwp_endpoint_attr_t attr;
58
59         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
60                         participant_id.node_id, participant_id.app_id);
61
62         fwp_endpoint_attr_init(&attr);
63         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
64         /* Create a new participant */
65         fwp_msg_hello_out(msgb->data, &participant_info);
66         participant = fwp_participant_new(&participant_info);
67         fwp_mngt_service_vres_create(&participant->vresd);
68         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
69                                         &attr, &participant->epointd);
70         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
71         fwp_contract_table_init(&participant->contract_table);
72
73         /* Insert participant into table */
74         fwp_participant_table_insert(participant);
75
76         /* Send back hello msg with mngr`s info */
77         /* prepare hello message */
78         fwp_msgb_reset_data(msgb);
79         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
80         
81         my_info.id = fwp_participant_this->id;
82         my_info.stream_id = fwp_participant_this->stream_id;
83
84         fwp_msg_hello_in(msgb->tail, &my_info);
85         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
86
87         /* Send hello to manager */
88         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
89                         fwp_participant_this, participant);
90
91         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
92                         participant_id.node_id, participant_id.app_id);
93 }
94
95 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
96 {       
97         fwp_participant_t *participant;
98
99         /* Find participant */
100         if (!(participant = fwp_participant_table_find(&participant_id))){
101                 return -EPERM;
102         }
103         
104         fwp_participant_table_delete(participant);
105         fwp_send_endpoint_unbind(participant->epointd);
106         fwp_endpoint_destroy(participant->epointd);
107         fwp_vres_destroy(participant->vresd);
108         /* TODO: iterate through contract table and delete contracts */
109         fwp_participant_delete(participant);
110                         
111         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
112                         participant_id.app_id);
113         
114         return 0;       
115 }
116
117 int 
118 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
119 {
120         fwp_participant_t *participant;
121         fwp_contract_data_t *contdata;
122
123         /* Find participant */
124         if (!(participant = fwp_participant_table_find(&participant_id))){
125                 return -EPERM;
126         }
127
128         contdata = fwp_contract_data_new();
129         
130         /* Extract contract header */
131         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
132         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
133         /* Extract contract params */
134         fwp_msg_contract_out(msgb->data, &contdata->contract);
135         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
136
137         /*launch admission test */
138         fwp_admctrl_test(contdata);             
139         
140         free(msgb);
141         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
142                                         sizeof(struct fwp_msg_contract) +
143                                         sizeof(struct fwp_msg_vres_params));
144         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
145         
146         /*Add contract header*/
147         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
148         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
149         /* Add contract params */
150         /* No needed to send back if spare capacity is not considered
151          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
152          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
153          * */
154         
155         /*Send back contract reservation */
156         if (contdata->status == FWP_CONT_RESERVED) {
157                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
158                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
159                                 contdata->vres_params.budget,
160                                 contdata->vres_params.period_usec,
161                                 contdata->vres_params.ac_id);
162                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
163                 /* Add contract to contract table */
164                 fwp_contract_table_insert(&participant->contract_table,contdata);
165                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
166
167         } else {
168                 free(contdata);
169         }       
170         
171         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
172                         fwp_participant_this, participant);
173         return 0;
174 }
175
176 int 
177 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
178 {
179         fwp_participant_t *participant;
180         fwp_contract_data_t *contdata;
181         fwp_contract_id_t  id;
182         fwp_contract_status_t  status;
183
184         /* Find participant */
185         if (!(participant = fwp_participant_table_find(&participant_id))){
186                 return -EPERM;
187         }
188
189         fwp_msg_contracthdr_out(msgb->data, &id, &status);
190         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
191         FWP_DEBUG("Contract id=%d to commit\n", id);
192         
193         contdata = fwp_contract_table_find(&participant->contract_table, id);
194         contdata->status = FWP_CONT_NEGOTIATED; 
195         
196         return 0;       
197 }
198
199 int 
200 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
201 {
202         fwp_participant_t *participant;
203         fwp_contract_data_t *contdata;
204         fwp_contract_id_t  id;
205         fwp_contract_status_t  status;
206
207         /* Find participant */
208         if (!(participant = fwp_participant_table_find(&participant_id))){
209                 return -EPERM;
210         }
211
212         fwp_msg_contracthdr_out(msgb->data, &id, &status);
213         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
214         
215         contdata = fwp_contract_table_find(&participant->contract_table, id);
216         contdata->status = FWP_CONT_NOTNEGOTIATED;
217         /* release vres - success only for local vres */
218         fwp_vres_destroy(contdata->vresd); 
219         /* delete contract from contract table */
220         fwp_contract_table_delete(&participant->contract_table, contdata);
221         fwp_contract_destroy(contdata);
222         
223         FWP_DEBUG("Contract id=%d to canceled\n", id);
224                 
225         return 0;       
226 }
227
228 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
229 {
230         fwp_msg_type_t msg_type;
231         fwp_participant_id_t    participant_id;
232
233         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
234         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
235         
236         switch (msg_type) {
237                 case  FWP_MSG_HELLO:
238                         FWP_DEBUG("Message HELLO received from nodeid = %d "
239                                   "appid = %d\n", participant_id.node_id, 
240                                         participant_id.app_id);
241                         fwp_mngr_hello(msgb, participant_id);
242                         break;
243                 
244                 case  FWP_MSG_BYE:
245                         FWP_DEBUG("Message BYE received from nodeid = %d "
246                                   "appid = %d\n", participant_id.node_id, 
247                                         participant_id.app_id);
248                         fwp_mngr_bye(msgb, participant_id);
249                         break;
250
251
252                 case  FWP_MSG_RESERVE: 
253                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
254                                   "appid = %d\n", participant_id.node_id, 
255                                         participant_id.app_id);
256                         fwp_mngr_contract_reserve(msgb, participant_id);
257                         break;
258
259                 case  FWP_MSG_COMMIT: 
260                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
261                                   "appid = %d\n", participant_id.node_id, 
262                                         participant_id.app_id);
263                         fwp_mngr_contract_commit(msgb, participant_id);
264                         break;  
265                 
266                 case  FWP_MSG_CANCEL: 
267                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
268                                   "appid = %d\n", participant_id.node_id, 
269                                         participant_id.app_id);
270                         fwp_mngr_contract_cancel(msgb, participant_id);
271                         break;  
272                 
273                 default:
274                         printf("Invalid message\n.");
275                         fwp_msgb_free(msgb);
276         }
277 }
278
279 void fwp_mngr_main_loop()
280 {
281         struct fwp_msgb *msgb;
282         int rv;
283
284         /* start admission control thread */
285         while (1 /*exit_flag*/){
286                 rv = fwp_mngr_input(&msgb);
287                 if (rv == 0 && msgb)
288                         fwp_mngr_msg_handler(msgb);
289                 FWP_DEBUG("Mngr waiting for next msg.\n");
290         }
291 }
292
293 int main()
294 {
295         if (fwp_init()) {
296                 fprintf(stderr,"FWP manager initialization failed.\n");
297                 exit(1);
298         }
299
300         fwp_mngr_main_loop();
301         
302         return 0;       
303