1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
4 #include "fwp_confdefs.h"
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
14 #define BUFFSIZE FWP_MTU
16 /* Admission control test */
17 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
22 * Function waits for remote or local message
24 * @msgb received message
26 * On success, it returns 0 and the pointer to received message in msgb parameter.
27 * On error, it returns negative error code
30 int fwp_mngr_input(struct fwp_msgb **pmsgb)
32 /* buffer and socket for incomming message */
33 static unsigned char buffer[FWP_MTU];
34 struct fwp_msgb *msgb;
35 ssize_t size, expected;
36 struct fwp_msg_header *header = (void*)buffer;
38 FWP_DEBUG("Waiting for messages\n");
39 /* TODO: consider to replace with fwp_mngt_recv call */
40 size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
44 if (size < sizeof(*header)) {
46 /* TODO: Use errno for error reporting */
48 expected = ntohs(header->length)-sizeof(*header);
49 size = fwp_recv(fwp_participant_this->epointd, buffer+size,
51 if (size < expected) {
55 /* For future: fwp_socket could be allocated behind data in msgb*/
56 if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
57 perror("No memory available.\n");
60 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
62 fwp_msgb_put(msgb, ntohs(header->length));
68 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
70 fwp_participant_info_t participant_info, my_info;
71 fwp_participant_t *participant;
72 fwp_endpoint_attr_t attr;
74 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
75 participant_id.node_id, participant_id.app_id);
77 fwp_endpoint_attr_init(&attr);
78 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
79 /* Create a new participant */
80 fwp_msg_hello_out(msgb->data, &participant_info);
81 participant = fwp_participant_new(&participant_info);
82 fwp_mngt_service_vres_create(&participant->vresd);
83 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
84 &attr, &participant->epointd);
85 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
86 fwp_contract_table_init(&participant->contract_table);
88 /* Insert participant into table */
89 fwp_participant_table_insert(participant);
91 /* Send back hello msg with mngr`s info */
92 /* prepare hello message */
93 fwp_msgb_reset_data(msgb);
94 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
96 my_info.id = fwp_participant_this->id;
97 my_info.stream_id = fwp_participant_this->stream_id;
99 fwp_msg_hello_in(msgb->tail, &my_info);
100 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
102 /* Send hello to manager */
103 fwp_mngt_send(FWP_MSG_HELLO, msgb,
104 fwp_participant_this, participant);
106 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
107 participant_id.node_id, participant_id.app_id);
110 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
112 fwp_participant_t *participant;
114 /* Find participant */
115 if (!(participant = fwp_participant_table_find(&participant_id))){
119 fwp_participant_table_delete(participant);
120 fwp_send_endpoint_unbind(participant->epointd);
121 fwp_endpoint_destroy(participant->epointd);
122 fwp_vres_destroy(participant->vresd);
123 /* TODO: iterate through contract table and delete contracts */
124 fwp_participant_delete(participant);
126 FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id,
127 participant_id.app_id);
133 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
135 fwp_participant_t *participant;
136 fwp_contract_data_t *contdata;
138 /* Find participant */
139 if (!(participant = fwp_participant_table_find(&participant_id))){
143 contdata = fwp_contract_data_new();
145 /* Extract contract header */
146 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
147 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
148 /* Extract contract params */
149 fwp_msg_contract_out(msgb->data, &contdata->contract);
150 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
152 /*launch admission test */
153 fwp_admctrl_test(contdata);
156 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
157 sizeof(struct fwp_msg_contract) +
158 sizeof(struct fwp_msg_vres_params));
159 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
161 /*Add contract header*/
162 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
163 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
164 /* Add contract params */
165 /* No needed to send back if spare capacity is not considered
166 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
167 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
170 /*Send back contract reservation */
171 if (contdata->status == FWP_CONT_RESERVED) {
172 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
173 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
174 contdata->vres_params.budget,
175 contdata->vres_params.period_usec,
176 contdata->vres_params.ac_id);
177 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
178 /* Add contract to contract table */
179 fwp_contract_table_insert(&participant->contract_table,contdata);
180 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
186 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
187 fwp_participant_this, participant);
192 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
194 fwp_participant_t *participant;
195 fwp_contract_data_t *contdata;
196 fwp_contract_id_t id;
197 fwp_contract_status_t status;
199 /* Find participant */
200 if (!(participant = fwp_participant_table_find(&participant_id))){
204 fwp_msg_contracthdr_out(msgb->data, &id, &status);
205 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
206 FWP_DEBUG("Contract id=%d to commit\n", id);
208 contdata = fwp_contract_table_find(&participant->contract_table, id);
209 contdata->status = FWP_CONT_NEGOTIATED;
211 /* TODO: Send response to confirm reception */
217 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
219 fwp_participant_t *participant;
220 fwp_contract_data_t *contdata;
221 fwp_contract_id_t id;
222 fwp_contract_status_t status;
224 /* Find participant */
225 if (!(participant = fwp_participant_table_find(&participant_id))){
229 fwp_msg_contracthdr_out(msgb->data, &id, &status);
230 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
232 contdata = fwp_contract_table_find(&participant->contract_table, id);
233 contdata->status = FWP_CONT_NOTNEGOTIATED;
234 /* release vres - success only for local vres */
235 fwp_vres_destroy(contdata->vresd);
236 /* delete contract from contract table */
237 fwp_contract_table_delete(&participant->contract_table, contdata);
238 fwp_contract_destroy(contdata);
240 /* Update admission data (only necessary for demo and GUI) */
241 fwp_admctrl_test(NULL);
243 FWP_DEBUG("Contract id=%d to canceled\n", id);
248 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
250 fwp_msg_type_t msg_type;
251 fwp_participant_id_t participant_id;
253 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
254 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
258 FWP_DEBUG("Message HELLO received from nodeid = %d "
259 "appid = %d\n", participant_id.node_id,
260 participant_id.app_id);
261 fwp_mngr_hello(msgb, participant_id);
265 FWP_DEBUG("Message BYE received from nodeid = %d "
266 "appid = %d\n", participant_id.node_id,
267 participant_id.app_id);
268 fwp_mngr_bye(msgb, participant_id);
272 case FWP_MSG_RESERVE:
273 FWP_DEBUG("Message RESERVE received from nodeid = %d "
274 "appid = %d\n", participant_id.node_id,
275 participant_id.app_id);
276 fwp_mngr_contract_reserve(msgb, participant_id);
280 FWP_DEBUG("Message COMMIT received from nodeid = %d "
281 "appid = %d\n", participant_id.node_id,
282 participant_id.app_id);
283 fwp_mngr_contract_commit(msgb, participant_id);
287 FWP_DEBUG("Message CANCEL received from nodeid = %d "
288 "appid = %d\n", participant_id.node_id,
289 participant_id.app_id);
290 fwp_mngr_contract_cancel(msgb, participant_id);
294 printf("Invalid message\n.");
299 void fwp_mngr_main_loop()
301 struct fwp_msgb *msgb;
304 /* start admission control thread */
305 while (1 /*exit_flag*/){
307 rv = fwp_mngr_input(&msgb);
309 fwp_mngr_msg_handler(msgb);
310 FWP_DEBUG("Mngr waiting for next msg.\n");
317 fprintf(stderr,"FWP manager initialization failed.\n");
322 fwp_mngr_main_loop();