]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Added msg BYE
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
10 #include "fwp_mngt.h"
11
12 #define FWP_MTU         2346  
13 #define BUFFSIZE        FWP_MTU 
14
15 /* buffer and socket for incomming message */
16 static unsigned char    buffer[FWP_MTU];
17
18 /* Admission control test */
19 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
20
21 /**
22  * fwp_mngt_input 
23  *
24  * Function waits for remote or local message 
25  * 
26  * @msgb  received message 
27  * \return 
28  * On success, it returns 0 and the pointer to received message in msgb parameter.
29  * On error, it returns negative error code
30  *
31  */
32 int fwp_mngr_input(struct fwp_msgb **pmsgb)
33 {
34         struct fwp_msgb *msgb;
35         ssize_t size;
36
37         FWP_DEBUG("Waiting for messages\n");
38         /* TODO: consider to replace with fwp_mngt_recv call */
39         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
40          
41         /* For future: fwp_socket could be allocated behind data in msgb*/
42         if (!(msgb = fwp_msgb_alloc(size))) {
43                 perror("No memory available.\n");
44                 return -ENOMEM;
45         }
46         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
47         msgb->data = buffer;
48         fwp_msgb_put(msgb, size);
49         
50         *pmsgb = msgb;
51         return (0);
52 }
53
54 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
55 {
56         fwp_participant_info_t participant_info, my_info;
57         fwp_participant_t *participant;
58         fwp_endpoint_attr_t attr;
59
60         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
61                         participant_id.node_id, participant_id.app_id);
62
63         fwp_endpoint_attr_init(&attr);
64         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
65         /* Create a new participant */
66         fwp_msg_hello_out(msgb->data, &participant_info);
67         participant = fwp_participant_create(&participant_info);
68         fwp_mngt_service_vres_create(&participant->vresd);
69         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
70                                         &attr, &participant->epointd);
71         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
72         fwp_contract_table_init(&participant->contract_table);
73
74         /* Insert participant into table */
75         fwp_participant_table_insert(participant);
76
77         /* Send back hello msg with mngr`s info */
78         /* prepare hello message */
79         fwp_msgb_reset_data(msgb);
80         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
81         
82         my_info.id = fwp_participant_this->id;
83         my_info.stream_id = fwp_participant_this->stream_id;
84
85         fwp_msg_hello_in(msgb->tail, &my_info);
86         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
87
88         /* Send hello to manager */
89         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
90                         fwp_participant_this, participant);
91
92         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
93                         participant_id.node_id, participant_id.app_id);
94 }
95
96 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
97 {       
98         fwp_participant_t *participant;
99
100         /* Find participant */
101         if (!(participant = fwp_participant_table_find(&participant_id))){
102                 return -EPERM;
103         }
104         
105         /* TODO: iterate through contract table and delete contracts */
106         fwp_participant_table_delete(participant);
107                         
108         FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id, 
109                         participant_id.app_id);
110         
111         return 0;       
112 }
113
114 int 
115 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
116 {
117         fwp_participant_t *participant;
118         fwp_contract_data_t *contdata;
119
120         /* Find participant */
121         if (!(participant = fwp_participant_table_find(&participant_id))){
122                 return -EPERM;
123         }
124
125         contdata = fwp_contract_data_new();
126         
127         /* Extract contract header */
128         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
129         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
130         /* Extract contract params */
131         fwp_msg_contract_out(msgb->data, &contdata->contract);
132         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
133
134         /*launch admission test */
135         fwp_admctrl_test(contdata);             
136         
137         free(msgb);
138         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
139                                         sizeof(struct fwp_msg_contract) +
140                                         sizeof(struct fwp_msg_vres_params));
141         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
142         
143         /*Add contract header*/
144         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
145         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
146         /* Add contract params */
147         /* No needed to send back if spare capacity is not considered
148          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
149          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
150          * */
151         
152         /*Send back contract reservation */
153         if (contdata->status == FWP_CONT_RESERVED) {
154                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
155                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
156                                 contdata->vres_params.budget,
157                                 contdata->vres_params.period_usec,
158                                 contdata->vres_params.ac_id);
159                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
160                 /* Add contract to contract table */
161                 fwp_contract_table_insert(&participant->contract_table,contdata);
162                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
163
164         } else {
165                 free(contdata);
166         }       
167         
168         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
169                         fwp_participant_this, participant);
170         return 0;
171 }
172
173 int 
174 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
175 {
176         fwp_participant_t *participant;
177         fwp_contract_data_t *contdata;
178         fwp_contract_id_t  id;
179         fwp_contract_status_t  status;
180
181         /* Find participant */
182         if (!(participant = fwp_participant_table_find(&participant_id))){
183                 return -EPERM;
184         }
185
186         fwp_msg_contracthdr_out(msgb->data, &id, &status);
187         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
188         FWP_DEBUG("Contract id=%d to commit\n", id);
189         
190         contdata = fwp_contract_table_find(&participant->contract_table, id);
191         contdata->status = FWP_CONT_NEGOTIATED; 
192         
193         return 0;       
194 }
195
196 int 
197 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
198 {
199         fwp_participant_t *participant;
200         fwp_contract_data_t *contdata;
201         fwp_contract_id_t  id;
202         fwp_contract_status_t  status;
203
204         /* Find participant */
205         if (!(participant = fwp_participant_table_find(&participant_id))){
206                 return -EPERM;
207         }
208
209         fwp_msg_contracthdr_out(msgb->data, &id, &status);
210         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
211         
212         contdata = fwp_contract_table_find(&participant->contract_table, id);
213         contdata->status = FWP_CONT_NOTNEGOTIATED;
214         /* release vres - success only for local vres */
215         fwp_vres_destroy(contdata->vresd); 
216         /* delete contract from contract table */
217         fwp_contract_table_delete(&participant->contract_table, contdata);
218         fwp_contract_destroy(contdata);
219         
220         FWP_DEBUG("Contract id=%d to canceled\n", id);
221                 
222         return 0;       
223 }
224
225 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
226 {
227         fwp_msg_type_t msg_type;
228         fwp_participant_id_t    participant_id;
229
230         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
231         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
232         
233         switch (msg_type) {
234                 case  FWP_MSG_HELLO:
235                         FWP_DEBUG("Message HELLO received from nodeid = %d "
236                                   "appid = %d\n", participant_id.node_id, 
237                                         participant_id.app_id);
238                         fwp_mngr_hello(msgb, participant_id);
239                         break;
240                 
241                 case  FWP_MSG_BYE:
242                         FWP_DEBUG("Message BYE received from nodeid = %d "
243                                   "appid = %d\n", participant_id.node_id, 
244                                         participant_id.app_id);
245                         fwp_mngr_bye(msgb, participant_id);
246                         break;
247
248
249                 case  FWP_MSG_RESERVE: 
250                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
251                                   "appid = %d\n", participant_id.node_id, 
252                                         participant_id.app_id);
253                         fwp_mngr_contract_reserve(msgb, participant_id);
254                         break;
255
256                 case  FWP_MSG_COMMIT: 
257                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
258                                   "appid = %d\n", participant_id.node_id, 
259                                         participant_id.app_id);
260                         fwp_mngr_contract_commit(msgb, participant_id);
261                         break;  
262                 
263                 case  FWP_MSG_CANCEL: 
264                         FWP_DEBUG("Message CANCEL received from nodeid = %d "
265                                   "appid = %d\n", participant_id.node_id, 
266                                         participant_id.app_id);
267                         fwp_mngr_contract_cancel(msgb, participant_id);
268                         break;  
269                 
270                 default:
271                         printf("Invalid message\n.");
272                         fwp_msgb_free(msgb);
273         }
274 }
275
276 void fwp_mngr_main_loop()
277 {
278         struct fwp_msgb *msgb;
279
280         /* start admission control thread */
281         while (1 /*exit_flag*/){
282                 fwp_mngr_input(&msgb);
283                 if (msgb)
284                         fwp_mngr_msg_handler(msgb);
285                 FWP_DEBUG("Mngr waiting for next msg.\n");
286         }
287 }
288
289 #if 0
290 int fwp_mngr_init()
291 {
292         fwp_participant_info_t  my_info;
293         int rv;
294
295         if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
296             (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
297
298                 return rv;
299         }
300         
301         /* Create fwp_participant_this */
302         my_info.id.node_id = inet_addr("127.0.0.1");
303         my_info.id.app_id = getpid();
304         my_info.stream_id = FWP_MNGR_STREAM_ID;
305
306         fwp_participant_this = fwp_participant_create(&my_info);
307         fwp_participant_mngr = fwp_participant_this;
308         fwp_receive_endpoint_create(my_info.stream_id, 0,
309                                         &fwp_participant_this->epointd);
310         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
311                                fwp_participant_this->id.node_id,
312                                fwp_participant_this->stream_id);
313         return 0;
314
315 }
316 #endif
317
318 int main()
319 {
320         if (fwp_init()) {
321                 fprintf(stderr,"FWP manager initialization failed.\n");
322                 exit(1);
323         }
324
325         fwp_mngr_main_loop();
326         
327         return 0;       
328