1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
4 #include "fwp_confdefs.h"
6 #include "fwp_participant_table.h"
7 #include "fwp_admctrl.h"
10 #define BUFFSIZE FWP_MTU
12 /* buffer and socket for incomming message */
13 static unsigned char buffer[FWP_MTU];
15 /* Admission control test */
16 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
21 * Function waits for remote or local message
23 * @msgb received message
25 * On success, it returns 0 and the pointer to received message in msgb parameter.
26 * On error, it returns negative error code
29 int fwp_mngr_input(struct fwp_msgb **pmsgb)
31 struct fwp_msgb *msgb;
34 FWP_DEBUG("Waiting for messages\n");
35 /* TODO: consider to replace with fwp_mngt_recv call */
36 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
38 /* For future: fwp_socket could be allocated behind data in msgb*/
39 if (!(msgb = fwp_msgb_alloc(size))) {
40 perror("No memory available.\n");
43 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
45 fwp_msgb_put(msgb, size);
51 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
53 fwp_participant_info_t participant_info, my_info;
54 fwp_participant_t *participant;
55 fwp_endpoint_attr_t attr;
57 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
58 participant_id.node_id, participant_id.app_id);
60 fwp_endpoint_attr_init(&attr);
61 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
62 /* Create a new participant */
63 fwp_msg_hello_out(msgb->data, &participant_info);
64 participant = fwp_participant_create(&participant_info);
65 fwp_mngt_service_vres_create(&participant->vresd);
66 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
67 &attr, &participant->epointd);
68 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
69 fwp_contract_table_init(&participant->contract_table);
71 /* Insert participant into table */
72 fwp_participant_table_insert(participant);
74 /* Send back hello msg with mngr`s info */
75 /* prepare hello message */
76 fwp_msgb_reset_data(msgb);
77 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
79 my_info.id = fwp_participant_this->id;
80 my_info.stream_id = fwp_participant_this->stream_id;
82 fwp_msg_hello_in(msgb->tail, &my_info);
83 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
85 /* Send hello to manager */
86 fwp_mngt_send(FWP_MSG_HELLO, msgb,
87 fwp_participant_this, participant);
89 FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
90 participant_id.node_id, participant_id.app_id);
94 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
96 fwp_participant_t *participant;
97 fwp_contract_data_t *contdata;
99 /* Find participant */
100 if (!(participant = fwp_participant_table_find(&participant_id))){
104 contdata = fwp_contract_data_new();
106 /* Extract contract header */
107 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
108 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
109 /* Extract contract params */
110 fwp_msg_contract_out(msgb->data, &contdata->contract);
111 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
113 /*launch admission test */
114 fwp_admctrl_test(contdata);
117 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
118 sizeof(struct fwp_msg_contract) +
119 sizeof(struct fwp_msg_vres_params));
120 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
122 /*Add contract header*/
123 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
124 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
125 /* Add contract params */
126 /* No needed to send back if spare capacity is not considered
127 * fwp_msg_contract_in(msgb->tail, &contdata->contract);
128 * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
131 /*Send back contract reservation */
132 if (contdata->status == FWP_CONT_RESERVED) {
133 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
134 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
135 contdata->vres_params.budget,
136 contdata->vres_params.period_usec,
137 contdata->vres_params.ac_id);
138 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
139 /* Add contract to contract table */
140 fwp_contract_table_insert(&participant->contract_table,contdata);
141 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
147 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
148 fwp_participant_this, participant);
153 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
155 fwp_participant_t *participant;
156 fwp_contract_data_t *contdata;
157 fwp_contract_id_t id;
158 fwp_contract_status_t status;
160 /* Find participant */
161 if (!(participant = fwp_participant_table_find(&participant_id))){
165 fwp_msg_contracthdr_out(msgb->data, &id, &status);
166 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
167 FWP_DEBUG("Contract id=%d to commit\n", id);
169 contdata = fwp_contract_table_find(&participant->contract_table, id);
170 contdata->status = FWP_CONT_NEGOTIATED;
175 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
177 fwp_msg_type_t msg_type;
178 fwp_participant_id_t participant_id;
180 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
181 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
185 FWP_DEBUG("Message HELLO received from nodeid = %d "
186 "appid = %d\n", participant_id.node_id,
187 participant_id.app_id);
188 fwp_mngr_hello(msgb, participant_id);
191 case FWP_MSG_RESERVE:
192 FWP_DEBUG("Message RESERVE received from nodeid = %d "
193 "appid = %d\n", participant_id.node_id,
194 participant_id.app_id);
195 fwp_mngr_contract_reserve(msgb, participant_id);
199 FWP_DEBUG("Message COMMIT received from nodeid = %d "
200 "appid = %d\n", participant_id.node_id,
201 participant_id.app_id);
202 fwp_mngr_contract_commit(msgb, participant_id);
205 printf("Invalid message\n.");
210 void fwp_mngr_main_loop()
212 struct fwp_msgb *msgb;
214 /* start admission control thread */
215 while (1 /*exit_flag*/){
216 fwp_mngr_input(&msgb);
218 fwp_mngr_msg_handler(msgb);
219 FWP_DEBUG("Mngr waiting for next msg.\n");
226 fwp_participant_info_t my_info;
229 if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
230 (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
235 /* Create fwp_participant_this */
236 my_info.id.node_id = inet_addr("127.0.0.1");
237 my_info.id.app_id = getpid();
238 my_info.stream_id = FWP_MNGR_STREAM_ID;
240 fwp_participant_this = fwp_participant_create(&my_info);
241 fwp_participant_mngr = fwp_participant_this;
242 fwp_receive_endpoint_create(my_info.stream_id, 0,
243 &fwp_participant_this->epointd);
244 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
245 fwp_participant_this->id.node_id,
246 fwp_participant_this->stream_id);
255 fprintf(stderr,"FWP manager initialization failed.\n");
259 fwp_mngr_main_loop();