1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
6 #include "fwp_confdefs.h"
9 #include "fwp_contract_table.h"
10 #include "fwp_participant_table.h"
11 #include "fwp_admctrl.h"
16 #define BUFFSIZE FWP_MTU
18 /* Admission control test */
19 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
24 * Function waits for remote or local message
26 * @msgb received message
28 * On success, it returns 0 and the pointer to received message in msgb parameter.
29 * On error, it returns negative error code
32 int fwp_mngr_input(struct fwp_msgb **pmsgb)
34 /* buffer and socket for incomming message */
35 static unsigned char buffer[FWP_MTU];
36 struct fwp_msgb *msgb;
37 ssize_t size, expected;
38 struct fwp_msg_header *header = (void*)buffer;
40 FWP_DEBUG("Waiting for messages\n");
41 /* TODO: consider to replace with fwp_mngt_recv call */
42 size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
46 if (size < sizeof(*header)) {
48 /* TODO: Use errno for error reporting */
50 expected = ntohs(header->length)-sizeof(*header);
51 size = fwp_recv(fwp_participant_this->epointd, buffer+size,
53 if (size < expected) {
57 /* For future: fwp_socket could be allocated behind data in msgb*/
58 if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
59 perror("No memory available.\n");
62 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
64 fwp_msgb_put(msgb, ntohs(header->length));
71 * Processes hello message.
74 * @param participant_id
76 * @return Zero on success, -1 on error.
78 int fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
80 fwp_participant_info_t participant_info, my_info;
81 fwp_participant_t *participant;
82 fwp_endpoint_attr_t attr;
85 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
86 participant_id.node_id, participant_id.app_id);
88 fwp_endpoint_attr_init(&attr);
89 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
90 /* Create a new participant */
91 fwp_msg_hello_out(msgb->data, &participant_info);
92 participant = fwp_participant_new(&participant_info);
95 ret = fwp_mngt_service_vres_create(&participant->service_contract,
99 ret = fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
100 &attr, &participant->epointd);
103 ret = fwp_send_endpoint_bind(participant->epointd, participant->vresd);
106 fwp_contract_table_init(&participant->contract_table);
108 /* Insert participant into table */
109 fwp_participant_table_insert(participant);
111 /* Send back hello msg with mngr`s info */
112 /* prepare hello message */
113 fwp_msgb_reset_data(msgb);
114 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
116 my_info.id = fwp_participant_this->id;
117 my_info.stream_id = fwp_participant_this->stream_id;
119 fwp_msg_hello_in(msgb->tail, &my_info);
120 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
122 /* Send hello to manager */
123 ret = fwp_mngt_send(FWP_MSG_HELLO, msgb,
124 fwp_participant_this, participant);
128 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
129 participant_id.node_id, participant_id.app_id);
132 fwp_send_endpoint_unbind(participant->epointd);
134 fwp_endpoint_destroy(participant->epointd);
136 /* FIXME: This function is probably not the opposite of
137 * fwp_mngt_service_vres_create(), beacuse it doesn't delete
138 * the service contract. */
139 /*fwp_vres_destroy(participant->vresd);*/
140 fwp_contract_destroy(participant->service_contract);
142 fwp_participant_delete(participant);
146 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
148 fwp_participant_t *participant;
149 fwp_contract_data_t* contdata;
151 /* Find participant */
152 if (!(participant = fwp_participant_table_find(&participant_id))){
156 /* TODO: Check for errors */
157 fwp_participant_table_delete(participant);
158 fwp_send_endpoint_unbind(participant->epointd);
159 fwp_endpoint_destroy(participant->epointd);
160 fwp_contract_destroy(participant->service_contract);
161 //fwp_vres_destroy(participant->vresd);
163 /* Delete all participant's contracts */
164 for (contdata = fwp_contract_table_foreach_begin(&participant->contract_table);
166 contdata = fwp_contract_table_foreach_next(&participant->contract_table, contdata)) {
167 fwp_contract_destroy(contdata);
169 fwp_contract_table_foreach_end(&participant->contract_table);
171 fwp_participant_delete(participant);
173 FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id,
174 participant_id.app_id);
180 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
182 fwp_participant_t *participant;
183 fwp_contract_data_t *contdata;
185 /* Find participant */
186 if (!(participant = fwp_participant_table_find(&participant_id))){
190 contdata = fwp_contract_data_new();
192 /* Extract contract header */
193 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
194 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
195 /* Extract contract params */
196 fwp_msg_contract_out(msgb->data, &contdata->contract);
197 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
199 /*launch admission test */
200 fwp_admctrl_test(contdata);
203 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
204 sizeof(struct fwp_msg_contract) +
205 sizeof(struct fwp_msg_vres_params));
206 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
208 /*Add contract header*/
209 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
210 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
211 /* Add contract params */
212 /* No needed to send back if spare capacity is not considered
213 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
214 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
217 /*Send back contract reservation */
218 if (contdata->status == FWP_CONT_RESERVED) {
219 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
220 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
221 contdata->vres_params.budget,
222 contdata->vres_params.period_usec,
223 contdata->vres_params.ac_id);
224 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
225 /* Add contract to contract table */
226 fwp_contract_table_insert(&participant->contract_table,contdata);
227 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
233 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
234 fwp_participant_this, participant);
239 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
241 fwp_participant_t *participant;
242 fwp_contract_data_t *contdata;
243 fwp_contract_id_t id;
244 fwp_contract_status_t status;
246 /* Find participant */
247 if (!(participant = fwp_participant_table_find(&participant_id))){
251 fwp_msg_contracthdr_out(msgb->data, &id, &status);
252 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
253 FWP_DEBUG("Contract id=%d to commit\n", id);
255 contdata = fwp_contract_table_find(&participant->contract_table, id);
256 contdata->status = FWP_CONT_NEGOTIATED;
258 /* TODO: Send response to confirm reception */
264 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
266 fwp_participant_t *participant;
267 fwp_contract_data_t *contdata;
268 fwp_contract_id_t id;
269 fwp_contract_status_t status;
271 /* Find participant */
272 if (!(participant = fwp_participant_table_find(&participant_id))){
276 fwp_msg_contracthdr_out(msgb->data, &id, &status);
277 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
279 contdata = fwp_contract_table_find(&participant->contract_table, id);
280 contdata->status = FWP_CONT_NOTNEGOTIATED;
281 /* release vres - success only for local vres */
282 fwp_vres_destroy(contdata->vresd);
283 /* delete contract from contract table */
284 fwp_contract_table_delete(&participant->contract_table, contdata);
285 fwp_contract_destroy(contdata);
287 /* Update admission data (only necessary for demo and GUI) */
288 fwp_admctrl_test(NULL);
290 FWP_DEBUG("Contract id=%d to canceled\n", id);
295 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
297 fwp_msg_type_t msg_type;
298 fwp_participant_id_t participant_id;
301 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
302 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
306 FWP_DEBUG("Message HELLO received from nodeid = %d "
307 "appid = %d\n", participant_id.node_id,
308 participant_id.app_id);
309 ret = fwp_mngr_hello(msgb, participant_id);
311 ret = fwp_mngr_bye(msgb, participant_id);
313 error(0, errno, "Cannot send bye");
319 FWP_DEBUG("Message BYE received from nodeid = %d "
320 "appid = %d\n", participant_id.node_id,
321 participant_id.app_id);
322 fwp_mngr_bye(msgb, participant_id);
326 case FWP_MSG_RESERVE:
327 FWP_DEBUG("Message RESERVE received from nodeid = %d "
328 "appid = %d\n", participant_id.node_id,
329 participant_id.app_id);
330 fwp_mngr_contract_reserve(msgb, participant_id);
334 FWP_DEBUG("Message COMMIT received from nodeid = %d "
335 "appid = %d\n", participant_id.node_id,
336 participant_id.app_id);
337 fwp_mngr_contract_commit(msgb, participant_id);
341 FWP_DEBUG("Message CANCEL received from nodeid = %d "
342 "appid = %d\n", participant_id.node_id,
343 participant_id.app_id);
344 fwp_mngr_contract_cancel(msgb, participant_id);
348 printf("Invalid message\n.");
353 void fwp_mngr_main_loop()
355 struct fwp_msgb *msgb;
358 /* start admission control thread */
359 while (1 /*exit_flag*/){
360 //gui_print_status();
361 rv = fwp_mngr_input(&msgb);
363 fwp_mngr_msg_handler(msgb);
364 FWP_DEBUG("Mngr waiting for next msg.\n");
371 fprintf(stderr,"FWP manager initialization failed.\n");
376 fwp_mngr_main_loop();