3 #include "fwp_participant_table.h"
6 #define BUFFSIZE FWP_MTU
8 /* buffer and socket for incomming message */
9 static unsigned char buffer[FWP_MTU];
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
17 * Function waits for remote or local message
19 * @msgb received message
21 * On success, it returns 0 and the pointer to received message in msgb parameter.
22 * On error, it returns negative error code
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 struct fwp_msgb *msgb;
30 FWP_DEBUG("Waiting for messages\n");
31 /* TODO: consider to replace with fwp_mngt_recv call */
32 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34 FWP_DEBUG("Creating fwp msgb len=%d\n", size);
35 /* For future: fwp_socket could be allocated behind data in msgb*/
36 if (!(msgb = fwp_msgb_alloc(size))) {
37 perror("No memory available.\n");
40 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42 fwp_msgb_put(msgb, size);
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 fwp_participant_info_t participant_info, my_info;
51 fwp_participant_t *participant;
53 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
54 participant_id.node_id, participant_id.app_id);
56 /* Create a new participant */
57 fwp_msg_hello_inflate(msgb->data, &participant_info);
58 participant = fwp_participant_create(&participant_info);
59 fwp_mngt_service_vres_create(&participant->vresd);
60 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61 0, &participant->epointd);
62 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63 fwp_contract_table_init(&participant->contract_table);
65 /* Insert participant into table */
66 fwp_participant_table_insert(participant);
68 /* Send back hello msg with mngr`s info */
69 /* prepare hello message */
70 fwp_msgb_reset_data(msgb);
71 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73 my_info.id = fwp_participant_this->id;
74 my_info.stream_id = fwp_participant_this->stream_id;
76 fwp_msg_hello_deflate(msgb->tail, &my_info);
77 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79 /* Send hello to manager */
80 FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
81 fwp_mngt_send(FWP_MSG_HELLO, msgb,
82 fwp_participant_this, participant);
84 FWP_DEBUG("Sent HELLO msg \n");
87 int fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
89 fwp_participant_t *participant;
90 fwp_contract_data_t *contdata;
92 /* Find participant */
93 if (!(participant = fwp_participant_table_find(&participant_id))){
97 contdata = fwp_contract_data_new();
98 fwp_msg_contract_inflate(msgb->data, &contdata->id, &contdata->contract);
99 contdata->status = FWP_CONT_REQUESTED;
101 /* Add contract to contract table */
102 fwp_contract_table_insert(&participant->contract_table, contdata);
103 /*launch admission test */
108 int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
110 fwp_participant_t *participant;
111 fwp_contract_data_t *contdata;
112 fwp_contract_t contract;
113 fwp_contract_id_t id;
115 /* Find participant */
116 if (!(participant = fwp_participant_table_find(&participant_id))){
120 fwp_msg_contract_inflate(msgb->data, &id, &contract);
121 contdata = fwp_contract_table_find(&participant->contract_table, id);
122 contdata->status = FWP_CONT_ACCEPTED;
124 /* Add to contract accept list */
128 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
130 fwp_msg_type_t msg_type;
131 fwp_participant_id_t participant_id;
133 fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
134 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
135 FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type,
136 participant_id.node_id, participant_id.app_id);
140 fwp_mngr_hello(msgb, participant_id);
143 case FWP_MSG_NEGT_REQ:
144 FWP_DEBUG("Negotiation Request received\n");
145 fwp_mngr_contract_reserve(msgb, participant_id);
148 printf("Invalid message\n.");
155 void fwp_mngr_main_loop()
157 struct fwp_msgb *msgb;
159 /* start admission control thread */
160 while (1 /*exit_flag*/){
161 fwp_mngr_input(&msgb);
163 fwp_mngr_msg_handler(msgb);
164 FWP_DEBUG("Mngr waiting for next msg.\n");
170 fwp_participant_info_t my_info;
173 if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
174 (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
179 /* Create fwp_participant_this */
180 my_info.id.node_id = inet_addr("127.0.0.1");
181 my_info.id.app_id = getpid();
182 my_info.stream_id = FWP_MNGR_STREAM_ID;
184 fwp_participant_this = fwp_participant_create(&my_info);
185 fwp_participant_mngr = fwp_participant_this;
186 fwp_receive_endpoint_create(my_info.stream_id, 0,
187 &fwp_participant_this->epointd);
188 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
189 fwp_participant_this->id.node_id,
190 fwp_participant_this->stream_id);
197 if (fwp_mngr_init()) {
198 fprintf(stderr,"FWP manager initialization failed.\n");
202 fwp_mngr_main_loop();