1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
4 #include "fwp_confdefs.h"
7 #include "fwp_contract_table.h"
8 #include "fwp_participant_table.h"
9 #include "fwp_admctrl.h"
13 #define BUFFSIZE FWP_MTU
15 /* buffer and socket for incomming message */
16 static unsigned char buffer[FWP_MTU];
17 /* FIXME: This could be moved to local static variable in
20 /* Admission control test */
21 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
26 * Function waits for remote or local message
28 * @msgb received message
30 * On success, it returns 0 and the pointer to received message in msgb parameter.
31 * On error, it returns negative error code
34 int fwp_mngr_input(struct fwp_msgb **pmsgb)
36 struct fwp_msgb *msgb;
39 FWP_DEBUG("Waiting for messages\n");
40 /* TODO: consider to replace with fwp_mngt_recv call */
41 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
43 /* For future: fwp_socket could be allocated behind data in msgb*/
44 if (!(msgb = fwp_msgb_alloc(size))) {
45 perror("No memory available.\n");
48 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
50 fwp_msgb_put(msgb, size);
56 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
58 fwp_participant_info_t participant_info, my_info;
59 fwp_participant_t *participant;
60 fwp_endpoint_attr_t attr;
62 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
63 participant_id.node_id, participant_id.app_id);
65 fwp_endpoint_attr_init(&attr);
66 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
67 /* Create a new participant */
68 fwp_msg_hello_out(msgb->data, &participant_info);
69 participant = fwp_participant_new(&participant_info);
70 fwp_mngt_service_vres_create(&participant->vresd);
71 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
72 &attr, &participant->epointd);
73 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
74 fwp_contract_table_init(&participant->contract_table);
76 /* Insert participant into table */
77 fwp_participant_table_insert(participant);
79 /* Send back hello msg with mngr`s info */
80 /* prepare hello message */
81 fwp_msgb_reset_data(msgb);
82 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
84 my_info.id = fwp_participant_this->id;
85 my_info.stream_id = fwp_participant_this->stream_id;
87 fwp_msg_hello_in(msgb->tail, &my_info);
88 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
90 /* Send hello to manager */
91 fwp_mngt_send(FWP_MSG_HELLO, msgb,
92 fwp_participant_this, participant);
94 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
95 participant_id.node_id, participant_id.app_id);
98 int fwp_mngr_bye(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
100 fwp_participant_t *participant;
102 /* Find participant */
103 if (!(participant = fwp_participant_table_find(&participant_id))){
107 fwp_participant_table_delete(participant);
108 fwp_send_endpoint_unbind(participant->epointd);
109 fwp_endpoint_destroy(participant->epointd);
110 fwp_vres_destroy(participant->vresd);
111 /* TODO: iterate through contract table and delete contracts */
112 fwp_participant_delete(participant);
114 FWP_DEBUG("BYE nodeid = %d appid = %d\n", participant_id.node_id,
115 participant_id.app_id);
121 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
123 fwp_participant_t *participant;
124 fwp_contract_data_t *contdata;
126 /* Find participant */
127 if (!(participant = fwp_participant_table_find(&participant_id))){
131 contdata = fwp_contract_data_new();
133 /* Extract contract header */
134 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
135 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
136 /* Extract contract params */
137 fwp_msg_contract_out(msgb->data, &contdata->contract);
138 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
140 /*launch admission test */
141 fwp_admctrl_test(contdata);
144 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
145 sizeof(struct fwp_msg_contract) +
146 sizeof(struct fwp_msg_vres_params));
147 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
149 /*Add contract header*/
150 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
151 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
152 /* Add contract params */
153 /* No needed to send back if spare capacity is not considered
154 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
155 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
158 /*Send back contract reservation */
159 if (contdata->status == FWP_CONT_RESERVED) {
160 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
161 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
162 contdata->vres_params.budget,
163 contdata->vres_params.period_usec,
164 contdata->vres_params.ac_id);
165 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
166 /* Add contract to contract table */
167 fwp_contract_table_insert(&participant->contract_table,contdata);
168 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
174 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
175 fwp_participant_this, participant);
180 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
182 fwp_participant_t *participant;
183 fwp_contract_data_t *contdata;
184 fwp_contract_id_t id;
185 fwp_contract_status_t status;
187 /* Find participant */
188 if (!(participant = fwp_participant_table_find(&participant_id))){
192 fwp_msg_contracthdr_out(msgb->data, &id, &status);
193 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
194 FWP_DEBUG("Contract id=%d to commit\n", id);
196 contdata = fwp_contract_table_find(&participant->contract_table, id);
197 contdata->status = FWP_CONT_NEGOTIATED;
203 fwp_mngr_contract_cancel(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
205 fwp_participant_t *participant;
206 fwp_contract_data_t *contdata;
207 fwp_contract_id_t id;
208 fwp_contract_status_t status;
210 /* Find participant */
211 if (!(participant = fwp_participant_table_find(&participant_id))){
215 fwp_msg_contracthdr_out(msgb->data, &id, &status);
216 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
218 contdata = fwp_contract_table_find(&participant->contract_table, id);
219 contdata->status = FWP_CONT_NOTNEGOTIATED;
220 /* release vres - success only for local vres */
221 fwp_vres_destroy(contdata->vresd);
222 /* delete contract from contract table */
223 fwp_contract_table_delete(&participant->contract_table, contdata);
224 fwp_contract_destroy(contdata);
226 FWP_DEBUG("Contract id=%d to canceled\n", id);
231 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
233 fwp_msg_type_t msg_type;
234 fwp_participant_id_t participant_id;
236 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
237 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
241 FWP_DEBUG("Message HELLO received from nodeid = %d "
242 "appid = %d\n", participant_id.node_id,
243 participant_id.app_id);
244 fwp_mngr_hello(msgb, participant_id);
248 FWP_DEBUG("Message BYE received from nodeid = %d "
249 "appid = %d\n", participant_id.node_id,
250 participant_id.app_id);
251 fwp_mngr_bye(msgb, participant_id);
255 case FWP_MSG_RESERVE:
256 FWP_DEBUG("Message RESERVE received from nodeid = %d "
257 "appid = %d\n", participant_id.node_id,
258 participant_id.app_id);
259 fwp_mngr_contract_reserve(msgb, participant_id);
263 FWP_DEBUG("Message COMMIT received from nodeid = %d "
264 "appid = %d\n", participant_id.node_id,
265 participant_id.app_id);
266 fwp_mngr_contract_commit(msgb, participant_id);
270 FWP_DEBUG("Message CANCEL received from nodeid = %d "
271 "appid = %d\n", participant_id.node_id,
272 participant_id.app_id);
273 fwp_mngr_contract_cancel(msgb, participant_id);
277 printf("Invalid message\n.");
282 void fwp_mngr_main_loop()
284 struct fwp_msgb *msgb;
287 /* start admission control thread */
288 while (1 /*exit_flag*/){
289 rv = fwp_mngr_input(&msgb);
291 fwp_mngr_msg_handler(msgb);
292 FWP_DEBUG("Mngr waiting for next msg.\n");
299 fprintf(stderr,"FWP manager initialization failed.\n");
303 fwp_mngr_main_loop();