]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Changed fwp_endpoint_send/recv API , added fwp endpoint attributes field and functions
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6 #include "fwp_participant_table.h"
7 #include "fwp_admctrl.h"
8
9 #define FWP_MTU         2346  
10 #define BUFFSIZE        FWP_MTU 
11
12 /* buffer and socket for incomming message */
13 static unsigned char    buffer[FWP_MTU];
14
15 /* Admission control test */
16 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
17
18 /**
19  * fwp_mngt_input 
20  *
21  * Function waits for remote or local message 
22  * 
23  * @msgb  received message 
24  * \return 
25  * On success, it returns 0 and the pointer to received message in msgb parameter.
26  * On error, it returns negative error code
27  *
28  */
29 int fwp_mngr_input(struct fwp_msgb **pmsgb)
30 {
31         struct fwp_msgb *msgb;
32         ssize_t size;
33
34         FWP_DEBUG("Waiting for messages\n");
35         /* TODO: consider to replace with fwp_mngt_recv call */
36         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
37          
38         /* For future: fwp_socket could be allocated behind data in msgb*/
39         if (!(msgb = fwp_msgb_alloc(size))) {
40                 perror("No memory available.\n");
41                 return -ENOMEM;
42         }
43         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
44         msgb->data = buffer;
45         fwp_msgb_put(msgb, size);
46         
47         *pmsgb = msgb;
48         return (0);
49 }
50
51 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
52 {
53         fwp_participant_info_t participant_info, my_info;
54         fwp_participant_t *participant;
55
56         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
57                         participant_id.node_id, participant_id.app_id);
58
59         /* Create a new participant */
60         fwp_msg_hello_out(msgb->data, &participant_info);
61         participant = fwp_participant_create(&participant_info);
62         fwp_mngt_service_vres_create(&participant->vresd);
63         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
64                                         0, &participant->epointd);
65         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
66         fwp_contract_table_init(&participant->contract_table);
67
68         /* Insert participant into table */
69         fwp_participant_table_insert(participant);
70
71         /* Send back hello msg with mngr`s info */
72         /* prepare hello message */
73         fwp_msgb_reset_data(msgb);
74         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
75         
76         my_info.id = fwp_participant_this->id;
77         my_info.stream_id = fwp_participant_this->stream_id;
78
79         fwp_msg_hello_in(msgb->tail, &my_info);
80         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
81
82         /* Send hello to manager */
83         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
84                         fwp_participant_this, participant);
85
86         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
87                         participant_id.node_id, participant_id.app_id);
88 }
89
90 int 
91 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
92 {
93         fwp_participant_t *participant;
94         fwp_contract_data_t *contdata;
95
96         /* Find participant */
97         if (!(participant = fwp_participant_table_find(&participant_id))){
98                 return -EPERM;
99         }
100
101         contdata = fwp_contract_data_new();
102         
103         /* Extract contract header */
104         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
105         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
106         /* Extract contract params */
107         fwp_msg_contract_out(msgb->data, &contdata->contract);
108         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
109
110         /*launch admission test */
111         fwp_admctrl_test(contdata);             
112         
113         free(msgb);
114         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
115                                         sizeof(struct fwp_msg_contract) +
116                                         sizeof(struct fwp_msg_vres_params));
117         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
118         
119         /*Add contract header*/
120         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
121         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
122         /* Add contract params */
123         /* No needed to send back if spare capacity is not considered
124          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
125          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
126          * */
127         
128         /*Send back contract reservation */
129         if (contdata->status == FWP_CONT_RESERVED) {
130                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
131                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
132                                 contdata->vres_params.budget,
133                                 contdata->vres_params.period_usec,
134                                 contdata->vres_params.ac_id);
135                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
136                 /* Add contract to contract table */
137                 fwp_contract_table_insert(&participant->contract_table,contdata);
138                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
139
140         } else {
141                 free(contdata);
142         }       
143         
144         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
145                         fwp_participant_this, participant);
146         return 0;
147 }
148
149 int 
150 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
151 {
152         fwp_participant_t *participant;
153         fwp_contract_data_t *contdata;
154         fwp_contract_id_t  id;
155         fwp_contract_status_t  status;
156
157         /* Find participant */
158         if (!(participant = fwp_participant_table_find(&participant_id))){
159                 return -EPERM;
160         }
161
162         fwp_msg_contracthdr_out(msgb->data, &id, &status);
163         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
164         FWP_DEBUG("Contract id=%d to commit\n", contdata->id);
165         
166         contdata = fwp_contract_table_find(&participant->contract_table, id);
167         contdata->status = FWP_CONT_NEGOTIATED; 
168         
169         return 0;       
170 }
171
172 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
173 {
174         fwp_msg_type_t msg_type;
175         fwp_participant_id_t    participant_id;
176
177         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
178         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
179         
180         switch (msg_type) {
181                 case  FWP_MSG_HELLO:
182                         FWP_DEBUG("Message HELLO received from nodeid = %d "
183                                   "appid = %d\n", participant_id.node_id, 
184                                         participant_id.app_id);
185                         fwp_mngr_hello(msgb, participant_id);
186                         break;
187
188                 case  FWP_MSG_RESERVE: 
189                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
190                                   "appid = %d\n", participant_id.node_id, 
191                                         participant_id.app_id);
192                         fwp_mngr_contract_reserve(msgb, participant_id);
193                         break;
194
195                 case  FWP_MSG_COMMIT: 
196                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
197                                   "appid = %d\n", participant_id.node_id, 
198                                         participant_id.app_id);
199                         fwp_mngr_contract_commit(msgb, participant_id);
200                         break;  
201                 default:
202                         printf("Invalid message\n.");
203                         fwp_msgb_free(msgb);
204         }
205 }
206
207 void fwp_mngr_main_loop()
208 {
209         struct fwp_msgb *msgb;
210
211         /* start admission control thread */
212         while (1 /*exit_flag*/){
213                 fwp_mngr_input(&msgb);
214                 if (msgb)
215                         fwp_mngr_msg_handler(msgb);
216                 FWP_DEBUG("Mngr waiting for next msg.\n");
217         }
218 }
219
220 #if 0
221 int fwp_mngr_init()
222 {
223         fwp_participant_info_t  my_info;
224         int rv;
225
226         if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
227             (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
228
229                 return rv;
230         }
231         
232         /* Create fwp_participant_this */
233         my_info.id.node_id = inet_addr("127.0.0.1");
234         my_info.id.app_id = getpid();
235         my_info.stream_id = FWP_MNGR_STREAM_ID;
236
237         fwp_participant_this = fwp_participant_create(&my_info);
238         fwp_participant_mngr = fwp_participant_this;
239         fwp_receive_endpoint_create(my_info.stream_id, 0,
240                                         &fwp_participant_this->epointd);
241         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
242                                fwp_participant_this->id.node_id,
243                                fwp_participant_this->stream_id);
244         return 0;
245
246 }
247 #endif
248
249 int main()
250 {
251         if (fwp_init()) {
252                 fprintf(stderr,"FWP manager initialization failed.\n");
253                 exit(1);
254         }
255
256         fwp_mngr_main_loop();
257         
258         return 0;       
259