1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
4 #include "fwp_confdefs.h"
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
13 #define BUFFSIZE FWP_MTU
15 /* buffer and socket for incomming message */
16 static unsigned char buffer[FWP_MTU];
18 /* Admission control test */
19 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
24 * Function waits for remote or local message
26 * @msgb received message
28 * On success, it returns 0 and the pointer to received message in msgb parameter.
29 * On error, it returns negative error code
32 int fwp_mngr_input(struct fwp_msgb **pmsgb)
34 struct fwp_msgb *msgb;
37 FWP_DEBUG("Waiting for messages\n");
38 /* TODO: consider to replace with fwp_mngt_recv call */
39 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
41 /* For future: fwp_socket could be allocated behind data in msgb*/
42 if (!(msgb = fwp_msgb_alloc(size))) {
43 perror("No memory available.\n");
46 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
48 fwp_msgb_put(msgb, size);
54 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
56 fwp_participant_info_t participant_info, my_info;
57 fwp_participant_t *participant;
58 fwp_endpoint_attr_t attr;
60 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
61 participant_id.node_id, participant_id.app_id);
63 fwp_endpoint_attr_init(&attr);
64 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
65 /* Create a new participant */
66 fwp_msg_hello_out(msgb->data, &participant_info);
67 participant = fwp_participant_create(&participant_info);
68 fwp_mngt_service_vres_create(&participant->vresd);
69 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
70 &attr, &participant->epointd);
71 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
72 fwp_contract_table_init(&participant->contract_table);
74 /* Insert participant into table */
75 fwp_participant_table_insert(participant);
77 /* Send back hello msg with mngr`s info */
78 /* prepare hello message */
79 fwp_msgb_reset_data(msgb);
80 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
82 my_info.id = fwp_participant_this->id;
83 my_info.stream_id = fwp_participant_this->stream_id;
85 fwp_msg_hello_in(msgb->tail, &my_info);
86 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
88 /* Send hello to manager */
89 fwp_mngt_send(FWP_MSG_HELLO, msgb,
90 fwp_participant_this, participant);
92 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
93 participant_id.node_id, participant_id.app_id);
96 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
98 fwp_participant_t *participant;
100 /* Find participant */
101 if (!(participant = fwp_participant_table_find(&participant_id))){
105 /* TODO: iterate through contract table and delete contracts */
106 fwp_participant_table_delete(participant);
108 FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id,
109 participant_id.app_id);
115 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
117 fwp_participant_t *participant;
118 fwp_contract_data_t *contdata;
120 /* Find participant */
121 if (!(participant = fwp_participant_table_find(&participant_id))){
125 contdata = fwp_contract_data_new();
127 /* Extract contract header */
128 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
129 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
130 /* Extract contract params */
131 fwp_msg_contract_out(msgb->data, &contdata->contract);
132 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
134 /*launch admission test */
135 fwp_admctrl_test(contdata);
138 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
139 sizeof(struct fwp_msg_contract) +
140 sizeof(struct fwp_msg_vres_params));
141 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
143 /*Add contract header*/
144 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
145 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
146 /* Add contract params */
147 /* No needed to send back if spare capacity is not considered
148 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
149 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
152 /*Send back contract reservation */
153 if (contdata->status == FWP_CONT_RESERVED) {
154 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
155 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
156 contdata->vres_params.budget,
157 contdata->vres_params.period_usec,
158 contdata->vres_params.ac_id);
159 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
160 /* Add contract to contract table */
161 fwp_contract_table_insert(&participant->contract_table,contdata);
162 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
168 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
169 fwp_participant_this, participant);
174 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
176 fwp_participant_t *participant;
177 fwp_contract_data_t *contdata;
178 fwp_contract_id_t id;
179 fwp_contract_status_t status;
181 /* Find participant */
182 if (!(participant = fwp_participant_table_find(&participant_id))){
186 fwp_msg_contracthdr_out(msgb->data, &id, &status);
187 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
188 FWP_DEBUG("Contract id=%d to commit\n", id);
190 contdata = fwp_contract_table_find(&participant->contract_table, id);
191 contdata->status = FWP_CONT_NEGOTIATED;
197 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
199 fwp_participant_t *participant;
200 fwp_contract_data_t *contdata;
201 fwp_contract_id_t id;
202 fwp_contract_status_t status;
204 /* Find participant */
205 if (!(participant = fwp_participant_table_find(&participant_id))){
209 fwp_msg_contracthdr_out(msgb->data, &id, &status);
210 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
212 contdata = fwp_contract_table_find(&participant->contract_table, id);
213 contdata->status = FWP_CONT_NOTNEGOTIATED;
214 /* release vres - success only for local vres */
215 fwp_vres_destroy(contdata->vresd);
216 /* delete contract from contract table */
217 fwp_contract_table_delete(&participant->contract_table, contdata);
218 fwp_contract_destroy(contdata);
220 FWP_DEBUG("Contract id=%d to canceled\n", id);
225 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
227 fwp_msg_type_t msg_type;
228 fwp_participant_id_t participant_id;
230 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
231 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
235 FWP_DEBUG("Message HELLO received from nodeid = %d "
236 "appid = %d\n", participant_id.node_id,
237 participant_id.app_id);
238 fwp_mngr_hello(msgb, participant_id);
242 FWP_DEBUG("Message BYE received from nodeid = %d "
243 "appid = %d\n", participant_id.node_id,
244 participant_id.app_id);
245 fwp_mngr_bye(msgb, participant_id);
249 case FWP_MSG_RESERVE:
250 FWP_DEBUG("Message RESERVE received from nodeid = %d "
251 "appid = %d\n", participant_id.node_id,
252 participant_id.app_id);
253 fwp_mngr_contract_reserve(msgb, participant_id);
257 FWP_DEBUG("Message COMMIT received from nodeid = %d "
258 "appid = %d\n", participant_id.node_id,
259 participant_id.app_id);
260 fwp_mngr_contract_commit(msgb, participant_id);
264 FWP_DEBUG("Message CANCEL received from nodeid = %d "
265 "appid = %d\n", participant_id.node_id,
266 participant_id.app_id);
267 fwp_mngr_contract_cancel(msgb, participant_id);
271 printf("Invalid message\n.");
276 void fwp_mngr_main_loop()
278 struct fwp_msgb *msgb;
280 /* start admission control thread */
281 while (1 /*exit_flag*/){
282 fwp_mngr_input(&msgb);
284 fwp_mngr_msg_handler(msgb);
285 FWP_DEBUG("Mngr waiting for next msg.\n");
292 fwp_participant_info_t my_info;
295 if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
296 (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
301 /* Create fwp_participant_this */
302 my_info.id.node_id = inet_addr("127.0.0.1");
303 my_info.id.app_id = getpid();
304 my_info.stream_id = FWP_MNGR_STREAM_ID;
306 fwp_participant_this = fwp_participant_create(&my_info);
307 fwp_participant_mngr = fwp_participant_this;
308 fwp_receive_endpoint_create(my_info.stream_id, 0,
309 &fwp_participant_this->epointd);
310 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
311 fwp_participant_this->id.node_id,
312 fwp_participant_this->stream_id);
321 fprintf(stderr,"FWP manager initialization failed.\n");
325 fwp_mngr_main_loop();