3 #include "fwp_participant_table.h"
6 #define BUFFSIZE FWP_MTU
8 /* buffer and socket for incomming message */
9 static unsigned char buffer[FWP_MTU];
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
17 * Function waits for remote or local message
19 * @msgb received message
21 * On success, it returns 0 and the pointer to received message in msgb parameter.
22 * On error, it returns negative error code
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 struct fwp_msgb *msgb;
30 FWP_DEBUG("Waiting for messages\n");
31 /* TODO: consider to replace with fwp_mngt_recv call */
32 size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34 FWP_DEBUG("Creating fwp msgb len=%d\n", size);
35 /* For future: fwp_socket could be allocated behind data in msgb*/
36 if (!(msgb = fwp_msgb_alloc(size))) {
37 perror("No memory available.\n");
40 /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42 fwp_msgb_put(msgb, size);
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 fwp_participant_info_t participant_info, my_info;
51 fwp_participant_t *participant;
53 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
54 participant_id.node_id, participant_id.app_id);
56 /* Create a new participant */
57 fwp_msg_hello_inflate(msgb->data, &participant_info);
58 participant = fwp_participant_create(&participant_info);
59 fwp_mngt_service_vres_create(&participant->vresd);
60 fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61 0, &participant->epointd);
62 fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63 fwp_contract_table_init(&participant->contract_table);
65 /* Insert participant into table */
66 fwp_participant_table_insert(participant);
68 /* Send back hello msg with mngr`s info */
69 /* prepare hello message */
70 fwp_msgb_reset_data(msgb);
71 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73 my_info.id = fwp_participant_this->id;
74 my_info.stream_id = fwp_participant_this->stream_id;
76 fwp_msg_hello_deflate(msgb->tail, &my_info);
77 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79 /* Send hello to manager */
80 fwp_mngt_send(FWP_MSG_HELLO, msgb,
81 fwp_participant_this, participant);
83 FWP_DEBUG("Sent HELLO msg \n");
86 /*void fwp_mngr_negt_request(msgb, participant_id)
92 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
94 fwp_msg_type_t msg_type;
95 fwp_participant_id_t participant_id;
97 fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
98 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
99 FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type,
100 participant_id.node_id, participant_id.app_id);
104 fwp_mngr_hello(msgb, participant_id);
107 case FWP_CONTNEGT_REQ:
108 FWP_DEBUG("Negotiation Request received\n");
109 fwp_mngr_negt_request(msgb, participant_id);
113 printf("Invalid message\n.");
120 void fwp_mngr_main_loop()
122 struct fwp_msgb *msgb;
124 /* start admission control thread */
125 while (1 /*exit_flag*/){
126 fwp_mngr_input(&msgb);
128 fwp_mngr_msg_handler(msgb);
129 FWP_DEBUG("Mngr waiting for next msg.\n");
135 if (fwp_init() < 0) {
136 fprintf(stderr,"FWP initialization failed.\n");
140 /* After initialization, destroy fwp_mngt_repointd and create the new one
141 * with specified parameters */
143 FWP_DEBUG("Management receive endpoint created\n");
144 fwp_receive_endpoint_create(FWP_MNGR_STREAM_ID, 0,
145 &fwp_participant_this->epointd);
147 fwp_mngr_main_loop();