3 #include "fwp_participant_table.h"
4 #include "fwp_admctrl.h"
7 #define BUFFSIZE FWP_MTU
9 /* buffer and socket for incomming message */
10 static unsigned char buffer[FWP_MTU];
12 /* Admission control test */
13 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
18 * Function waits for remote or local message
20 * @msgb received message
22 * On success, it returns 0 and the pointer to received message in msgb parameter.
23 * On error, it returns negative error code
26 int fwp_mngr_input(struct fwp_msgb **pmsgb)
28 struct fwp_msgb *msgb;
31 FWP_DEBUG("Waiting for messages\n");
32 /* TODO: consider to replace with fwp_mngt_recv call */
33 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
35 FWP_DEBUG("Creating fwp msgb len=%d\n", size);
36 /* For future: fwp_socket could be allocated behind data in msgb*/
37 if (!(msgb = fwp_msgb_alloc(size))) {
38 perror("No memory available.\n");
41 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
43 fwp_msgb_put(msgb, size);
49 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
51 fwp_participant_info_t participant_info, my_info;
52 fwp_participant_t *participant;
54 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
55 participant_id.node_id, participant_id.app_id);
57 /* Create a new participant */
58 fwp_msg_hello_out(msgb->data, &participant_info);
59 participant = fwp_participant_create(&participant_info);
60 fwp_mngt_service_vres_create(&participant->vresd);
61 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
62 0, &participant->epointd);
63 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
64 fwp_contract_table_init(&participant->contract_table);
66 /* Insert participant into table */
67 fwp_participant_table_insert(participant);
69 /* Send back hello msg with mngr`s info */
70 /* prepare hello message */
71 fwp_msgb_reset_data(msgb);
72 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
74 my_info.id = fwp_participant_this->id;
75 my_info.stream_id = fwp_participant_this->stream_id;
77 fwp_msg_hello_in(msgb->tail, &my_info);
78 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
80 /* Send hello to manager */
81 FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
82 fwp_mngt_send(FWP_MSG_HELLO, msgb,
83 fwp_participant_this, participant);
85 FWP_DEBUG("Sent HELLO msg \n");
89 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
91 fwp_participant_t *participant;
92 fwp_contract_data_t *contdata;
94 /* Find participant */
95 if (!(participant = fwp_participant_table_find(&participant_id))){
99 contdata = fwp_contract_data_new();
101 /* Extract contract header */
102 fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
103 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
104 /* Extract contract params */
105 fwp_msg_contract_out(msgb->data, &contdata->contract);
106 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
108 /*launch admission test */
109 fwp_admctrl_test(contdata);
112 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
113 sizeof(struct fwp_msg_contract) +
114 sizeof(struct fwp_msg_vres_params));
115 fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
117 /*Add contract header*/
118 fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
119 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
120 /* Add contract params */
121 fwp_msg_contract_in(msgb->tail, &contdata->contract);
122 fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
124 /*Send back contract reservation */
125 if (contdata->status == FWP_CONT_RESERVED) {
126 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
127 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
128 /* Add contract to contract table */
129 fwp_contract_table_insert(&participant->contract_table,contdata);
135 fwp_mngt_send(FWP_MSG_RESERVE, msgb,
136 fwp_participant_this, participant);
141 int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
143 fwp_participant_t *participant;
144 fwp_contract_data_t *contdata;
145 fwp_contract_id_t id;
146 fwp_contract_status_t status;
148 /* Find participant */
149 if (!(participant = fwp_participant_table_find(&participant_id))){
153 fwp_msg_contracthdr_out(msgb->tail, &id, &status);
154 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
156 contdata = fwp_contract_table_find(&participant->contract_table, id);
157 contdata->status = FWP_CONT_NEGOTIATED;
162 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
164 fwp_msg_type_t msg_type;
165 fwp_participant_id_t participant_id;
167 fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
168 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
172 FWP_DEBUG("Message HELLO received from nodeid = %d\
173 appid = %d\n", participant_id.node_id,
174 participant_id.app_id);
175 fwp_mngr_hello(msgb, participant_id);
178 case FWP_MSG_RESERVE:
179 FWP_DEBUG("Message RESERVE received from nodeid = %d\
180 appid = %d\n", participant_id.node_id,
181 participant_id.app_id);
182 fwp_mngr_contract_reserve(msgb, participant_id);
186 FWP_DEBUG("Message COMMIT received from nodeid = %d\
187 appid = %d\n", participant_id.node_id,
188 participant_id.app_id);
189 fwp_mngr_contract_commit(msgb, participant_id);
192 printf("Invalid message\n.");
197 void fwp_mngr_main_loop()
199 struct fwp_msgb *msgb;
201 /* start admission control thread */
202 while (1 /*exit_flag*/){
203 fwp_mngr_input(&msgb);
205 fwp_mngr_msg_handler(msgb);
206 FWP_DEBUG("Mngr waiting for next msg.\n");
212 fwp_participant_info_t my_info;
215 if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
216 (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
221 /* Create fwp_participant_this */
222 my_info.id.node_id = inet_addr("127.0.0.1");
223 my_info.id.app_id = getpid();
224 my_info.stream_id = FWP_MNGR_STREAM_ID;
226 fwp_participant_this = fwp_participant_create(&my_info);
227 fwp_participant_mngr = fwp_participant_this;
228 fwp_receive_endpoint_create(my_info.stream_id, 0,
229 &fwp_participant_this->epointd);
230 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
231 fwp_participant_this->id.node_id,
232 fwp_participant_this->stream_id);
239 if (fwp_mngr_init()) {
240 fprintf(stderr,"FWP manager initialization failed.\n");
244 fwp_mngr_main_loop();