]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
fwp_mngr_hello simplification by use of fwp_mngt_send
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4
5 #define FWP_MTU         2346  
6 #define BUFFSIZE        FWP_MTU 
7
8 /* buffer and socket for incomming message */
9 static unsigned char    buffer[FWP_MTU];
10
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
13
14 /**
15  * fwp_mngt_input 
16  *
17  * Function waits for remote or local message 
18  * 
19  * @msgb  received message 
20  * \return 
21  * On success, it returns 0 and the pointer to received message in msgb parameter.
22  * On error, it returns negative error code
23  *
24  */
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
26 {
27         struct fwp_msgb *msgb;
28         ssize_t size;
29
30         FWP_DEBUG("Waiting for messages\n");
31         /* TODO: consider to replace with fwp_mngt_recv call */
32         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
33          
34         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
35         /* For future: fwp_socket could be allocated behind data in msgb*/
36         if (!(msgb = fwp_msgb_alloc(size))) {
37                 perror("No memory available.\n");
38                 return -ENOMEM;
39         }
40         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
41         msgb->data = buffer;
42         fwp_msgb_put(msgb, size);
43         
44         *pmsgb = msgb;
45         return (0);
46 }
47
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
49 {
50         fwp_participant_info_t participant_info, my_info;
51         fwp_participant_t *participant;
52
53         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
54                         participant_id.node_id, participant_id.app_id);
55
56         /* Create a new participant */
57         fwp_msg_hello_inflate(msgb->data, &participant_info);
58         participant = fwp_participant_create(&participant_info);
59         fwp_mngt_service_vres_create(&participant->vresd);
60         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61                                         0, &participant->epointd);
62         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63         fwp_contract_table_init(&participant->contract_table);
64
65         /* Insert participant into table */
66         fwp_participant_table_insert(participant);
67
68         /* Send back hello msg with mngr`s info */
69         /* prepare hello message */
70         fwp_msgb_reset_data(msgb);
71         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
72         
73         my_info.id = fwp_participant_this->id;
74         my_info.stream_id = fwp_participant_this->stream_id;
75
76         fwp_msg_hello_deflate(msgb->tail, &my_info);
77         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
78
79         /* Send hello to manager */
80         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
81                         fwp_participant_this, participant);
82
83         FWP_DEBUG("Sent HELLO msg \n");
84 }
85
86 /*void fwp_mngr_negt_request(msgb, participant_id)
87 {
88         
89
90 }*/
91
92 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
93 {
94         fwp_msg_type_t msg_type;
95         fwp_participant_id_t    participant_id;
96
97         fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
98         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
99         FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type, 
100                         participant_id.node_id, participant_id.app_id);
101         
102         switch (msg_type) {
103                 case  FWP_MSG_HELLO:
104                         fwp_mngr_hello(msgb, participant_id);
105                         break;
106 #if 0
107                 case  FWP_CONTNEGT_REQ: 
108                         FWP_DEBUG("Negotiation Request received\n");
109                         fwp_mngr_negt_request(msgb, participant_id);
110                         break;  
111 #endif
112                 default:
113                         printf("Invalid message\n.");
114                         fwp_msgb_free(msgb);
115         }
116
117
118 }
119
120 void fwp_mngr_main_loop()
121 {
122         struct fwp_msgb *msgb;
123
124         /* start admission control thread */
125         while (1 /*exit_flag*/){
126                 fwp_mngr_input(&msgb);
127                 if (msgb)
128                         fwp_mngr_msg_handler(msgb);
129                 FWP_DEBUG("Mngr waiting for next msg.\n");
130         }
131 }
132
133 int main()
134 {
135         if (fwp_init() < 0) {
136                 fprintf(stderr,"FWP initialization failed.\n");
137                 exit(1);
138         }
139
140         /* After initialization, destroy fwp_mngt_repointd and create the new one
141          * with specified parameters */
142         
143         FWP_DEBUG("Management receive endpoint created\n");
144         fwp_receive_endpoint_create(FWP_MNGR_STREAM_ID, 0, 
145                                         &fwp_participant_this->epointd);
146                         
147         fwp_mngr_main_loop();
148         
149         return 0;       
150