]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Clean-up header files
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #define CONFIGURE_FWP_MY_STREAM_ID 3000
2 #define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
3
4 #include "fwp_confdefs.h"
5 #include "fwp.h"
6 #include "fwp_contract_table.h"
7 #include "fwp_participant_table.h"
8 #include "fwp_admctrl.h"
9 #include "fwp_mngt.h"
10
11 #define FWP_MTU         2346  
12 #define BUFFSIZE        FWP_MTU 
13
14 /* buffer and socket for incomming message */
15 static unsigned char    buffer[FWP_MTU];
16
17 /* Admission control test */
18 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
19
20 /**
21  * fwp_mngt_input 
22  *
23  * Function waits for remote or local message 
24  * 
25  * @msgb  received message 
26  * \return 
27  * On success, it returns 0 and the pointer to received message in msgb parameter.
28  * On error, it returns negative error code
29  *
30  */
31 int fwp_mngr_input(struct fwp_msgb **pmsgb)
32 {
33         struct fwp_msgb *msgb;
34         ssize_t size;
35
36         FWP_DEBUG("Waiting for messages\n");
37         /* TODO: consider to replace with fwp_mngt_recv call */
38         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
39          
40         /* For future: fwp_socket could be allocated behind data in msgb*/
41         if (!(msgb = fwp_msgb_alloc(size))) {
42                 perror("No memory available.\n");
43                 return -ENOMEM;
44         }
45         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
46         msgb->data = buffer;
47         fwp_msgb_put(msgb, size);
48         
49         *pmsgb = msgb;
50         return (0);
51 }
52
53 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
54 {
55         fwp_participant_info_t participant_info, my_info;
56         fwp_participant_t *participant;
57         fwp_endpoint_attr_t attr;
58
59         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
60                         participant_id.node_id, participant_id.app_id);
61
62         fwp_endpoint_attr_init(&attr);
63         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
64         /* Create a new participant */
65         fwp_msg_hello_out(msgb->data, &participant_info);
66         participant = fwp_participant_create(&participant_info);
67         fwp_mngt_service_vres_create(&participant->vresd);
68         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
69                                         &attr, &participant->epointd);
70         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
71         fwp_contract_table_init(&participant->contract_table);
72
73         /* Insert participant into table */
74         fwp_participant_table_insert(participant);
75
76         /* Send back hello msg with mngr`s info */
77         /* prepare hello message */
78         fwp_msgb_reset_data(msgb);
79         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
80         
81         my_info.id = fwp_participant_this->id;
82         my_info.stream_id = fwp_participant_this->stream_id;
83
84         fwp_msg_hello_in(msgb->tail, &my_info);
85         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
86
87         /* Send hello to manager */
88         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
89                         fwp_participant_this, participant);
90
91         FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
92                         participant_id.node_id, participant_id.app_id);
93 }
94
95 int 
96 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
97 {
98         fwp_participant_t *participant;
99         fwp_contract_data_t *contdata;
100
101         /* Find participant */
102         if (!(participant = fwp_participant_table_find(&participant_id))){
103                 return -EPERM;
104         }
105
106         contdata = fwp_contract_data_new();
107         
108         /* Extract contract header */
109         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
110         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
111         /* Extract contract params */
112         fwp_msg_contract_out(msgb->data, &contdata->contract);
113         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
114
115         /*launch admission test */
116         fwp_admctrl_test(contdata);             
117         
118         free(msgb);
119         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
120                                         sizeof(struct fwp_msg_contract) +
121                                         sizeof(struct fwp_msg_vres_params));
122         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
123         
124         /*Add contract header*/
125         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
126         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
127         /* Add contract params */
128         /* No needed to send back if spare capacity is not considered
129          * fwp_msg_contract_in(msgb->tail, &contdata->contract);
130          * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
131          * */
132         
133         /*Send back contract reservation */
134         if (contdata->status == FWP_CONT_RESERVED) {
135                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
136                 FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
137                                 contdata->vres_params.budget,
138                                 contdata->vres_params.period_usec,
139                                 contdata->vres_params.ac_id);
140                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
141                 /* Add contract to contract table */
142                 fwp_contract_table_insert(&participant->contract_table,contdata);
143                 FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
144
145         } else {
146                 free(contdata);
147         }       
148         
149         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
150                         fwp_participant_this, participant);
151         return 0;
152 }
153
154 int 
155 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
156 {
157         fwp_participant_t *participant;
158         fwp_contract_data_t *contdata;
159         fwp_contract_id_t  id;
160         fwp_contract_status_t  status;
161
162         /* Find participant */
163         if (!(participant = fwp_participant_table_find(&participant_id))){
164                 return -EPERM;
165         }
166
167         fwp_msg_contracthdr_out(msgb->data, &id, &status);
168         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
169         FWP_DEBUG("Contract id=%d to commit\n", id);
170         
171         contdata = fwp_contract_table_find(&participant->contract_table, id);
172         contdata->status = FWP_CONT_NEGOTIATED; 
173         
174         return 0;       
175 }
176
177 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
178 {
179         fwp_msg_type_t msg_type;
180         fwp_participant_id_t    participant_id;
181
182         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
183         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
184         
185         switch (msg_type) {
186                 case  FWP_MSG_HELLO:
187                         FWP_DEBUG("Message HELLO received from nodeid = %d "
188                                   "appid = %d\n", participant_id.node_id, 
189                                         participant_id.app_id);
190                         fwp_mngr_hello(msgb, participant_id);
191                         break;
192
193                 case  FWP_MSG_RESERVE: 
194                         FWP_DEBUG("Message RESERVE received from nodeid = %d " 
195                                   "appid = %d\n", participant_id.node_id, 
196                                         participant_id.app_id);
197                         fwp_mngr_contract_reserve(msgb, participant_id);
198                         break;
199
200                 case  FWP_MSG_COMMIT: 
201                         FWP_DEBUG("Message COMMIT received from nodeid = %d "
202                                   "appid = %d\n", participant_id.node_id, 
203                                         participant_id.app_id);
204                         fwp_mngr_contract_commit(msgb, participant_id);
205                         break;  
206                 default:
207                         printf("Invalid message\n.");
208                         fwp_msgb_free(msgb);
209         }
210 }
211
212 void fwp_mngr_main_loop()
213 {
214         struct fwp_msgb *msgb;
215
216         /* start admission control thread */
217         while (1 /*exit_flag*/){
218                 fwp_mngr_input(&msgb);
219                 if (msgb)
220                         fwp_mngr_msg_handler(msgb);
221                 FWP_DEBUG("Mngr waiting for next msg.\n");
222         }
223 }
224
225 #if 0
226 int fwp_mngr_init()
227 {
228         fwp_participant_info_t  my_info;
229         int rv;
230
231         if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
232             (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
233
234                 return rv;
235         }
236         
237         /* Create fwp_participant_this */
238         my_info.id.node_id = inet_addr("127.0.0.1");
239         my_info.id.app_id = getpid();
240         my_info.stream_id = FWP_MNGR_STREAM_ID;
241
242         fwp_participant_this = fwp_participant_create(&my_info);
243         fwp_participant_mngr = fwp_participant_this;
244         fwp_receive_endpoint_create(my_info.stream_id, 0,
245                                         &fwp_participant_this->epointd);
246         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
247                                fwp_participant_this->id.node_id,
248                                fwp_participant_this->stream_id);
249         return 0;
250
251 }
252 #endif
253
254 int main()
255 {
256         if (fwp_init()) {
257                 fprintf(stderr,"FWP manager initialization failed.\n");
258                 exit(1);
259         }
260
261         fwp_mngr_main_loop();
262         
263         return 0;       
264