3 #include "fwp_participant_table.h"
6 #define BUFFSIZE FWP_MTU
8 /* buffer and socket for incomming message */
9 static unsigned char buffer[FWP_MTU];
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
17 * Function waits for remote or local message
19 * @msgb received message
21 * On success, it returns 0 and the pointer to received message in msgb parameter.
22 * On error, it returns negative error code
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 struct fwp_msgb *msgb;
30 FWP_DEBUG("Waiting for messages\n");
31 /* TODO: consider to replace with fwp_mngt_recv call */
32 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34 FWP_DEBUG("Creating fwp msgb len=%d\n", size);
35 /* For future: fwp_socket could be allocated behind data in msgb*/
36 if (!(msgb = fwp_msgb_alloc(size))) {
37 perror("No memory available.\n");
40 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42 fwp_msgb_put(msgb, size);
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 fwp_participant_info_t participant_info, my_info;
51 fwp_participant_t *participant;
53 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
54 participant_id.node_id, participant_id.app_id);
56 /* Create a new participant */
57 fwp_msg_hello_inflate(msgb->data, &participant_info);
58 participant = fwp_participant_create(&participant_info);
59 fwp_mngt_service_vres_create(&participant->vresd);
60 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61 0, &participant->epointd);
62 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63 fwp_contract_table_init(&participant->contract_table);
65 /* Insert participant into table */
66 fwp_participant_table_insert(participant);
68 /* Send back hello msg with mngr`s info */
69 /* prepare hello message */
70 fwp_msgb_reset_data(msgb);
71 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73 my_info.id = fwp_participant_this->id;
74 my_info.stream_id = fwp_participant_this->stream_id;
76 fwp_msg_hello_deflate(msgb->tail, &my_info);
77 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79 /* Send hello to manager */
80 FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
81 fwp_mngt_send(FWP_MSG_HELLO, msgb,
82 fwp_participant_this, participant);
84 FWP_DEBUG("Sent HELLO msg \n");
87 /*void fwp_mngr_negt_request(msgb, participant_id)
93 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
95 fwp_msg_type_t msg_type;
96 fwp_participant_id_t participant_id;
98 fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
99 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
100 FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type,
101 participant_id.node_id, participant_id.app_id);
105 fwp_mngr_hello(msgb, participant_id);
108 case FWP_CONTNEGT_REQ:
109 FWP_DEBUG("Negotiation Request received\n");
110 fwp_mngr_negt_request(msgb, participant_id);
114 printf("Invalid message\n.");
121 void fwp_mngr_main_loop()
123 struct fwp_msgb *msgb;
125 /* start admission control thread */
126 while (1 /*exit_flag*/){
127 fwp_mngr_input(&msgb);
129 fwp_mngr_msg_handler(msgb);
130 FWP_DEBUG("Mngr waiting for next msg.\n");
136 fwp_participant_info_t my_info;
139 if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
140 (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
145 /* Create fwp_participant_this */
146 my_info.id.node_id = inet_addr("127.0.0.1");
147 my_info.id.app_id = getpid();
148 my_info.stream_id = FWP_MNGR_STREAM_ID;
150 fwp_participant_this = fwp_participant_create(&my_info);
151 fwp_participant_mngr = fwp_participant_this;
152 fwp_receive_endpoint_create(my_info.stream_id, 0,
153 &fwp_participant_this->epointd);
154 FWP_DEBUG("Management receive endpoint created, stream id= %d\n",
155 fwp_participant_this->stream_id);
156 FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
164 if (fwp_mngr_init()) {
165 fprintf(stderr,"FWP manager initialization failed.\n");
169 fwp_mngr_main_loop();