1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
4 #include "fwp_confdefs.h"
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
13 #define BUFFSIZE FWP_MTU
15 /* Admission control test */
16 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_utilization;
21 * Function waits for remote or local message
23 * @msgb received message
25 * On success, it returns 0 and the pointer to received message in msgb parameter.
26 * On error, it returns negative error code
29 int fwp_mngr_input(struct fwp_msgb **pmsgb)
31 /* buffer and socket for incomming message */
32 static unsigned char buffer[FWP_MTU];
33 struct fwp_msgb *msgb;
34 ssize_t size, expected;
35 struct fwp_msg_header *header = (void*)buffer;
37 FWP_DEBUG("Waiting for messages\n");
38 /* TODO: consider to replace with fwp_mngt_recv call */
39 size = fwp_recv(fwp_participant_this->epointd, buffer, sizeof(*header), 0);
43 if (size < sizeof(*header)) {
45 /* TODO: Use errno for error reporting */
47 expected = ntohs(header->length)-sizeof(*header);
48 size = fwp_recv(fwp_participant_this->epointd, buffer+size,
50 if (size < expected) {
54 /* For future: fwp_socket could be allocated behind data in msgb*/
55 if (!(msgb = fwp_msgb_alloc(ntohs(header->length)))) {
56 perror("No memory available.\n");
59 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
61 fwp_msgb_put(msgb, ntohs(header->length));
67 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
69 fwp_participant_info_t participant_info, my_info;
70 fwp_participant_t *participant;
71 fwp_endpoint_attr_t attr;
73 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
74 participant_id.node_id, participant_id.app_id);
76 fwp_endpoint_attr_init(&attr);
77 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
78 /* Create a new participant */
79 fwp_msg_hello_out(msgb->data, &participant_info);
80 participant = fwp_participant_new(&participant_info);
81 fwp_mngt_service_vres_create(&participant->vresd);
82 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
83 &attr, &participant->epointd);
84 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
85 fwp_contract_table_init(&participant->contract_table);
87 /* Insert participant into table */
88 fwp_participant_table_insert(participant);
90 /* Send back hello msg with mngr`s info */
91 /* prepare hello message */
92 fwp_msgb_reset_data(msgb);
93 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
95 my_info.id = fwp_participant_this->id;
96 my_info.stream_id = fwp_participant_this->stream_id;
98 fwp_msg_hello_in(msgb->tail, &my_info);
99 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
101 /* Send hello to manager */
102 fwp_mngt_send(FWP_MSG_HELLO, msgb,
103 fwp_participant_this, participant);
105 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
106 participant_id.node_id, participant_id.app_id);
109 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
111 fwp_participant_t *participant;
113 /* Find participant */
114 if (!(participant = fwp_participant_table_find(&participant_id))){
118 fwp_participant_table_delete(participant);
119 fwp_send_endpoint_unbind(participant->epointd);
120 fwp_endpoint_destroy(participant->epointd);
121 fwp_vres_destroy(participant->vresd);
122 /* TODO: iterate through contract table and delete contracts */
123 fwp_participant_delete(participant);
125 FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id,
126 participant_id.app_id);
132 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
134 fwp_participant_t *participant;
135 fwp_contract_data_t *contdata;
137 /* Find participant */
138 if (!(participant = fwp_participant_table_find(&participant_id))){
142 contdata = fwp_contract_data_new();
144 /* Extract contract header */
145 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
146 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
147 /* Extract contract params */
148 fwp_msg_contract_out(msgb->data, &contdata->contract);
149 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
151 /*launch admission test */
152 fwp_admctrl_test(contdata);
155 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
156 sizeof(struct fwp_msg_contract) +
157 sizeof(struct fwp_msg_vres_params));
158 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
160 /*Add contract header*/
161 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
162 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
163 /* Add contract params */
164 /* No needed to send back if spare capacity is not considered
165 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
166 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
169 /*Send back contract reservation */
170 if (contdata->status == FWP_CONT_RESERVED) {
171 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
172 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
173 contdata->vres_params.budget,
174 contdata->vres_params.period_usec,
175 contdata->vres_params.ac_id);
176 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
177 /* Add contract to contract table */
178 fwp_contract_table_insert(&participant->contract_table,contdata);
179 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
185 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
186 fwp_participant_this, participant);
191 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
193 fwp_participant_t *participant;
194 fwp_contract_data_t *contdata;
195 fwp_contract_id_t id;
196 fwp_contract_status_t status;
198 /* Find participant */
199 if (!(participant = fwp_participant_table_find(&participant_id))){
203 fwp_msg_contracthdr_out(msgb->data, &id, &status);
204 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
205 FWP_DEBUG("Contract id=%d to commit\n", id);
207 contdata = fwp_contract_table_find(&participant->contract_table, id);
208 contdata->status = FWP_CONT_NEGOTIATED;
210 /* TODO: Send response to confirm reception */
216 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
218 fwp_participant_t *participant;
219 fwp_contract_data_t *contdata;
220 fwp_contract_id_t id;
221 fwp_contract_status_t status;
223 /* Find participant */
224 if (!(participant = fwp_participant_table_find(&participant_id))){
228 fwp_msg_contracthdr_out(msgb->data, &id, &status);
229 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
231 contdata = fwp_contract_table_find(&participant->contract_table, id);
232 contdata->status = FWP_CONT_NOTNEGOTIATED;
233 /* release vres - success only for local vres */
234 fwp_vres_destroy(contdata->vresd);
235 /* delete contract from contract table */
236 fwp_contract_table_delete(&participant->contract_table, contdata);
237 fwp_contract_destroy(contdata);
239 FWP_DEBUG("Contract id=%d to canceled\n", id);
244 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
246 fwp_msg_type_t msg_type;
247 fwp_participant_id_t participant_id;
249 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
250 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
254 FWP_DEBUG("Message HELLO received from nodeid = %d "
255 "appid = %d\n", participant_id.node_id,
256 participant_id.app_id);
257 fwp_mngr_hello(msgb, participant_id);
261 FWP_DEBUG("Message BYE received from nodeid = %d "
262 "appid = %d\n", participant_id.node_id,
263 participant_id.app_id);
264 fwp_mngr_bye(msgb, participant_id);
268 case FWP_MSG_RESERVE:
269 FWP_DEBUG("Message RESERVE received from nodeid = %d "
270 "appid = %d\n", participant_id.node_id,
271 participant_id.app_id);
272 fwp_mngr_contract_reserve(msgb, participant_id);
276 FWP_DEBUG("Message COMMIT received from nodeid = %d "
277 "appid = %d\n", participant_id.node_id,
278 participant_id.app_id);
279 fwp_mngr_contract_commit(msgb, participant_id);
283 FWP_DEBUG("Message CANCEL received from nodeid = %d "
284 "appid = %d\n", participant_id.node_id,
285 participant_id.app_id);
286 fwp_mngr_contract_cancel(msgb, participant_id);
290 printf("Invalid message\n.");
295 void fwp_mngr_main_loop()
297 struct fwp_msgb *msgb;
300 /* start admission control thread */
301 while (1 /*exit_flag*/){
302 rv = fwp_mngr_input(&msgb);
304 fwp_mngr_msg_handler(msgb);
305 FWP_DEBUG("Mngr waiting for next msg.\n");
312 fprintf(stderr,"FWP manager initialization failed.\n");
316 fwp_mngr_main_loop();