]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
libfwp config fixes
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1
2 #include "fwp_confdefs.h"
3 #include "fwp_mngt.h"
4 #include "fwp_participant_table.h"
5 #include "fwp_admctrl.h"
6
7 #define FWP_MTU         2346  
8 #define BUFFSIZE        FWP_MTU 
9
10 /* buffer and socket for incomming message */
11 static unsigned char    buffer[FWP_MTU];
12
13 /* Admission control test */
14 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
15
16 /**
17  * fwp_mngt_input 
18  *
19  * Function waits for remote or local message 
20  * 
21  * @msgb  received message 
22  * \return 
23  * On success, it returns 0 and the pointer to received message in msgb parameter.
24  * On error, it returns negative error code
25  *
26  */
27 int fwp_mngr_input(struct fwp_msgb **pmsgb)
28 {
29         struct fwp_msgb *msgb;
30         ssize_t size;
31
32         FWP_DEBUG("Waiting for messages\n");
33         /* TODO: consider to replace with fwp_mngt_recv call */
34         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
35          
36         /* For future: fwp_socket could be allocated behind data in msgb*/
37         if (!(msgb = fwp_msgb_alloc(size))) {
38                 perror("No memory available.\n");
39                 return -ENOMEM;
40         }
41         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42         msgb->data = buffer;
43         fwp_msgb_put(msgb, size);
44         
45         *pmsgb = msgb;
46         return (0);
47 }
48
49 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 {
51         fwp_participant_info_t participant_info, my_info;
52         fwp_participant_t *participant;
53
54         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
55                         participant_id.node_id, participant_id.app_id);
56
57         /* Create a new participant */
58         fwp_msg_hello_out(msgb->data, &participant_info);
59         participant = fwp_participant_create(&participant_info);
60         fwp_mngt_service_vres_create(&participant->vresd);
61         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
62                                         0, &participant->epointd);
63         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
64         fwp_contract_table_init(&participant->contract_table);
65
66         /* Insert participant into table */
67         fwp_participant_table_insert(participant);
68
69         /* Send back hello msg with mngr`s info */
70         /* prepare hello message */
71         fwp_msgb_reset_data(msgb);
72         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73         
74         my_info.id = fwp_participant_this->id;
75         my_info.stream_id = fwp_participant_this->stream_id;
76
77         fwp_msg_hello_in(msgb->tail, &my_info);
78         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79
80         /* Send hello to manager */
81         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
82                         fwp_participant_this, participant);
83
84         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
85                         participant_id.node_id, participant_id.app_id);
86 }
87
88 int 
89 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
90 {
91         fwp_participant_t *participant;
92         fwp_contract_data_t *contdata;
93
94         /* Find participant */
95         if (!(participant = fwp_participant_table_find(&participant_id))){
96                 return -EPERM;
97         }
98
99         contdata = fwp_contract_data_new();
100         
101         /* Extract contract header */
102         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
103         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
104         /* Extract contract params */
105         fwp_msg_contract_out(msgb->data, &contdata->contract);
106         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
107
108         /*launch admission test */
109         fwp_admctrl_test(contdata);             
110         
111         free(msgb);
112         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
113                                         sizeof(struct fwp_msg_contract) +
114                                         sizeof(struct fwp_msg_vres_params));
115         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
116         
117         /*Add contract header*/
118         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
119         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
120         /* Add contract params */
121         /* No needed to send back if spare capacity is not considered
122          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
123          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
124          * */
125         
126         /*Send back contract reservation */
127         if (contdata->status == FWP_CONT_RESERVED) {
128                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
129                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
130                                 contdata->vres_params.budget,
131                                 contdata->vres_params.period_usec,
132                                 contdata->vres_params.ac_id);
133                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
134                 /* Add contract to contract table */
135                 fwp_contract_table_insert(&participant->contract_table,contdata);
136                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
137
138         } else {
139                 free(contdata);
140         }       
141         
142         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
143                         fwp_participant_this, participant);
144         return 0;
145 }
146
147 int 
148 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
149 {
150         fwp_participant_t *participant;
151         fwp_contract_data_t *contdata;
152         fwp_contract_id_t  id;
153         fwp_contract_status_t  status;
154
155         /* Find participant */
156         if (!(participant = fwp_participant_table_find(&participant_id))){
157                 return -EPERM;
158         }
159
160         fwp_msg_contracthdr_out(msgb->data, &id, &status);
161         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
162         FWP_DEBUG("Contract id=%d to commit\n", contdata->id);
163         
164         contdata = fwp_contract_table_find(&participant->contract_table, id);
165         contdata->status = FWP_CONT_NEGOTIATED; 
166         
167         return 0;       
168 }
169
170 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
171 {
172         fwp_msg_type_t msg_type;
173         fwp_participant_id_t    participant_id;
174
175         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
176         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
177         
178         switch (msg_type) {
179                 case  FWP_MSG_HELLO:
180                         FWP_DEBUG("Message HELLO received from nodeid = %d "
181                                   "appid = %d\n", participant_id.node_id, 
182                                         participant_id.app_id);
183                         fwp_mngr_hello(msgb, participant_id);
184                         break;
185
186                 case  FWP_MSG_RESERVE: 
187                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
188                                   "appid = %d\n", participant_id.node_id, 
189                                         participant_id.app_id);
190                         fwp_mngr_contract_reserve(msgb, participant_id);
191                         break;
192
193                 case  FWP_MSG_COMMIT: 
194                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
195                                   "appid = %d\n", participant_id.node_id, 
196                                         participant_id.app_id);
197                         fwp_mngr_contract_commit(msgb, participant_id);
198                         break;  
199                 default:
200                         printf("Invalid message\n.");
201                         fwp_msgb_free(msgb);
202         }
203 }
204
205 void fwp_mngr_main_loop()
206 {
207         struct fwp_msgb *msgb;
208
209         /* start admission control thread */
210         while (1 /*exit_flag*/){
211                 fwp_mngr_input(&msgb);
212                 if (msgb)
213                         fwp_mngr_msg_handler(msgb);
214                 FWP_DEBUG("Mngr waiting for next msg.\n");
215         }
216 }
217
218 int fwp_mngr_init()
219 {
220         fwp_participant_info_t  my_info;
221         int rv;
222
223         if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
224             (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
225
226                 return rv;
227         }
228         
229         /* Create fwp_participant_this */
230         my_info.id.node_id = inet_addr("127.0.0.1");
231         my_info.id.app_id = getpid();
232         my_info.stream_id = FWP_MNGR_STREAM_ID;
233
234         fwp_participant_this = fwp_participant_create(&my_info);
235         fwp_participant_mngr = fwp_participant_this;
236         fwp_receive_endpoint_create(my_info.stream_id, 0,
237                                         &fwp_participant_this->epointd);
238         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
239                                fwp_participant_this->id.node_id,
240                                fwp_participant_this->stream_id);
241         return 0;
242
243 }
244
245 int main()
246 {
247         if (fwp_mngr_init()) {
248                 fprintf(stderr,"FWP manager initialization failed.\n");
249                 exit(1);
250         }
251
252         fwp_mngr_main_loop();
253         
254         return 0;       
255