]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Merge branch 'martin-devel'
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
10 #include "fwp_mngt.h"
11
12 #define FWP_MTU         2346  
13 #define BUFFSIZE        FWP_MTU 
14
15 /* buffer and socket for incomming message */
16 static unsigned char    buffer[FWP_MTU];
17 /* FIXME: This could be moved to local static variable in
18  * fwp_mngt_input() */
19
20 /* Admission control test */
21 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
22
23 /**
24  * fwp_mngt_input 
25  *
26  * Function waits for remote or local message 
27  * 
28  * @msgb  received message 
29  * \return 
30  * On success, it returns 0 and the pointer to received message in msgb parameter.
31  * On error, it returns negative error code
32  *
33  */
34 int fwp_mngr_input(struct fwp_msgb **pmsgb)
35 {
36         struct fwp_msgb *msgb;
37         ssize_t size;
38
39         FWP_DEBUG("Waiting for messages\n");
40         /* TODO: consider to replace with fwp_mngt_recv call */
41         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
42          
43         /* For future: fwp_socket could be allocated behind data in msgb*/
44         if (!(msgb = fwp_msgb_alloc(size))) {
45                 perror("No memory available.\n");
46                 return -ENOMEM;
47         }
48         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
49         msgb->data = buffer;
50         fwp_msgb_put(msgb, size);
51         
52         *pmsgb = msgb;
53         return (0);
54 }
55
56 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
57 {
58         fwp_participant_info_t participant_info, my_info;
59         fwp_participant_t *participant;
60         fwp_endpoint_attr_t attr;
61
62         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
63                         participant_id.node_id, participant_id.app_id);
64
65         fwp_endpoint_attr_init(&attr);
66         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
67         /* Create a new participant */
68         fwp_msg_hello_out(msgb->data, &participant_info);
69         participant = fwp_participant_new(&participant_info);
70         fwp_mngt_service_vres_create(&participant->vresd);
71         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
72                                         &attr, &participant->epointd);
73         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
74         fwp_contract_table_init(&participant->contract_table);
75
76         /* Insert participant into table */
77         fwp_participant_table_insert(participant);
78
79         /* Send back hello msg with mngr`s info */
80         /* prepare hello message */
81         fwp_msgb_reset_data(msgb);
82         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
83         
84         my_info.id = fwp_participant_this->id;
85         my_info.stream_id = fwp_participant_this->stream_id;
86
87         fwp_msg_hello_in(msgb->tail, &my_info);
88         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
89
90         /* Send hello to manager */
91         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
92                         fwp_participant_this, participant);
93
94         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
95                         participant_id.node_id, participant_id.app_id);
96 }
97
98 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
99 {       
100         fwp_participant_t *participant;
101
102         /* Find participant */
103         if (!(participant = fwp_participant_table_find(&participant_id))){
104                 return -EPERM;
105         }
106         
107         fwp_participant_table_delete(participant);
108         fwp_send_endpoint_unbind(participant->epointd);
109         fwp_endpoint_destroy(participant->epointd);
110         fwp_vres_destroy(participant->vresd);
111         /* TODO: iterate through contract table and delete contracts */
112         fwp_participant_delete(participant);
113                         
114         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
115                         participant_id.app_id);
116         
117         return 0;       
118 }
119
120 int 
121 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
122 {
123         fwp_participant_t *participant;
124         fwp_contract_data_t *contdata;
125
126         /* Find participant */
127         if (!(participant = fwp_participant_table_find(&participant_id))){
128                 return -EPERM;
129         }
130
131         contdata = fwp_contract_data_new();
132         
133         /* Extract contract header */
134         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
135         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
136         /* Extract contract params */
137         fwp_msg_contract_out(msgb->data, &contdata->contract);
138         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
139
140         /*launch admission test */
141         fwp_admctrl_test(contdata);             
142         
143         free(msgb);
144         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
145                                         sizeof(struct fwp_msg_contract) +
146                                         sizeof(struct fwp_msg_vres_params));
147         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
148         
149         /*Add contract header*/
150         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
151         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
152         /* Add contract params */
153         /* No needed to send back if spare capacity is not considered
154          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
155          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
156          * */
157         
158         /*Send back contract reservation */
159         if (contdata->status == FWP_CONT_RESERVED) {
160                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
161                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
162                                 contdata->vres_params.budget,
163                                 contdata->vres_params.period_usec,
164                                 contdata->vres_params.ac_id);
165                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
166                 /* Add contract to contract table */
167                 fwp_contract_table_insert(&participant->contract_table,contdata);
168                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
169
170         } else {
171                 free(contdata);
172         }       
173         
174         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
175                         fwp_participant_this, participant);
176         return 0;
177 }
178
179 int 
180 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
181 {
182         fwp_participant_t *participant;
183         fwp_contract_data_t *contdata;
184         fwp_contract_id_t  id;
185         fwp_contract_status_t  status;
186
187         /* Find participant */
188         if (!(participant = fwp_participant_table_find(&participant_id))){
189                 return -EPERM;
190         }
191
192         fwp_msg_contracthdr_out(msgb->data, &id, &status);
193         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
194         FWP_DEBUG("Contract id=%d to commit\n", id);
195         
196         contdata = fwp_contract_table_find(&participant->contract_table, id);
197         contdata->status = FWP_CONT_NEGOTIATED; 
198         
199         return 0;       
200 }
201
202 int 
203 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
204 {
205         fwp_participant_t *participant;
206         fwp_contract_data_t *contdata;
207         fwp_contract_id_t  id;
208         fwp_contract_status_t  status;
209
210         /* Find participant */
211         if (!(participant = fwp_participant_table_find(&participant_id))){
212                 return -EPERM;
213         }
214
215         fwp_msg_contracthdr_out(msgb->data, &id, &status);
216         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
217         
218         contdata = fwp_contract_table_find(&participant->contract_table, id);
219         contdata->status = FWP_CONT_NOTNEGOTIATED;
220         /* release vres - success only for local vres */
221         fwp_vres_destroy(contdata->vresd); 
222         /* delete contract from contract table */
223         fwp_contract_table_delete(&participant->contract_table, contdata);
224         fwp_contract_destroy(contdata);
225         
226         FWP_DEBUG("Contract id=%d to canceled\n", id);
227                 
228         return 0;       
229 }
230
231 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
232 {
233         fwp_msg_type_t msg_type;
234         fwp_participant_id_t    participant_id;
235
236         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
237         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
238         
239         switch (msg_type) {
240                 case  FWP_MSG_HELLO:
241                         FWP_DEBUG("Message HELLO received from nodeid = %d "
242                                   "appid = %d\n", participant_id.node_id, 
243                                         participant_id.app_id);
244                         fwp_mngr_hello(msgb, participant_id);
245                         break;
246                 
247                 case  FWP_MSG_BYE:
248                         FWP_DEBUG("Message BYE received from nodeid = %d "
249                                   "appid = %d\n", participant_id.node_id, 
250                                         participant_id.app_id);
251                         fwp_mngr_bye(msgb, participant_id);
252                         break;
253
254
255                 case  FWP_MSG_RESERVE: 
256                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
257                                   "appid = %d\n", participant_id.node_id, 
258                                         participant_id.app_id);
259                         fwp_mngr_contract_reserve(msgb, participant_id);
260                         break;
261
262                 case  FWP_MSG_COMMIT: 
263                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
264                                   "appid = %d\n", participant_id.node_id, 
265                                         participant_id.app_id);
266                         fwp_mngr_contract_commit(msgb, participant_id);
267                         break;  
268                 
269                 case  FWP_MSG_CANCEL: 
270                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
271                                   "appid = %d\n", participant_id.node_id, 
272                                         participant_id.app_id);
273                         fwp_mngr_contract_cancel(msgb, participant_id);
274                         break;  
275                 
276                 default:
277                         printf("Invalid message\n.");
278                         fwp_msgb_free(msgb);
279         }
280 }
281
282 void fwp_mngr_main_loop()
283 {
284         struct fwp_msgb *msgb;
285         int rv;
286
287         /* start admission control thread */
288         while (1 /*exit_flag*/){
289                 rv = fwp_mngr_input(&msgb);
290                 if (rv == 0 && msgb)
291                         fwp_mngr_msg_handler(msgb);
292                 FWP_DEBUG("Mngr waiting for next msg.\n");
293         }
294 }
295
296 int main()
297 {
298         if (fwp_init()) {
299                 fprintf(stderr,"FWP manager initialization failed.\n");
300                 exit(1);
301         }
302
303         fwp_mngr_main_loop();
304         
305         return 0;       
306