]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Changing name of serialization funmctions; added functions for contract id, status...
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4 #include "fwp_admctrl.h"
5
6 #define FWP_MTU         2346  
7 #define BUFFSIZE        FWP_MTU 
8
9 /* buffer and socket for incomming message */
10 static unsigned char    buffer[FWP_MTU];
11
12 /* Admission control test */
13 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
14
15 /**
16  * fwp_mngt_input 
17  *
18  * Function waits for remote or local message 
19  * 
20  * @msgb  received message 
21  * \return 
22  * On success, it returns 0 and the pointer to received message in msgb parameter.
23  * On error, it returns negative error code
24  *
25  */
26 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 {
28         struct fwp_msgb *msgb;
29         ssize_t size;
30
31         FWP_DEBUG("Waiting for messages\n");
32         /* TODO: consider to replace with fwp_mngt_recv call */
33         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34          
35         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
36         /* For future: fwp_socket could be allocated behind data in msgb*/
37         if (!(msgb = fwp_msgb_alloc(size))) {
38                 perror("No memory available.\n");
39                 return -ENOMEM;
40         }
41         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42         msgb->data = buffer;
43         fwp_msgb_put(msgb, size);
44         
45         *pmsgb = msgb;
46         return (0);
47 }
48
49 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 {
51         fwp_participant_info_t participant_info, my_info;
52         fwp_participant_t *participant;
53
54         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
55                         participant_id.node_id, participant_id.app_id);
56
57         /* Create a new participant */
58         fwp_msg_hello_inflate(msgb->data, &participant_info);
59         participant = fwp_participant_create(&participant_info);
60         fwp_mngt_service_vres_create(&participant->vresd);
61         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
62                                         0, &participant->epointd);
63         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
64         fwp_contract_table_init(&participant->contract_table);
65
66         /* Insert participant into table */
67         fwp_participant_table_insert(participant);
68
69         /* Send back hello msg with mngr`s info */
70         /* prepare hello message */
71         fwp_msgb_reset_data(msgb);
72         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73         
74         my_info.id = fwp_participant_this->id;
75         my_info.stream_id = fwp_participant_this->stream_id;
76
77         fwp_msg_hello_deflate(msgb->tail, &my_info);
78         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79
80         /* Send hello to manager */
81         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
82         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
83                         fwp_participant_this, participant);
84
85         FWP_DEBUG("Sent HELLO msg \n");
86 }
87
88 int 
89 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
90 {
91         fwp_participant_t *participant;
92         fwp_contract_data_t *contdata;
93
94         /* Find participant */
95         if (!(participant = fwp_participant_table_find(&participant_id))){
96                 return -EPERM;
97         }
98
99         contdata = fwp_contract_data_new();
100
101         contdata->id = ntohl((uint32_t)*msgb->data);
102         fwp_msgb_pull(msgb, 4);
103         contdata->status = (int8_t)*msgb->data;
104         fwp_msgb_pull(msgb, 1);
105
106         fwp_msg_contract_inflate(msgb->data, &contdata->contract);
107         /*launch admission test */
108         fwp_admctrl_test(contdata);             
109         
110         free(msgb);
111         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
112                                         sizeof(struct fwp_msg_contract) +
113                                         sizeof(struct fwp_msg_vres_params));
114         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
115         /* Add contract_id and status */
116         msgb->tail = htonl(contdata->id);
117         fwp_msgb_put(msgb, 4);
118         msgb->tail = contdata->status;
119         fwp_msgb_put(msgb, 1);
120                 
121         
122         /*Send back contract reservation */
123         if (contdata->status == FWP_CONT_RESERVED) {
124                 fwp_msg_vres_params_deflate(msgb->tail,&contdata->vres_params);
125                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
126                 /* Add contract to contract table */
127                 fwp_contract_table_insert(&participant->contract_table,contdata);
128
129         } else {
130                 free(contdata);
131         }       
132         
133         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
134                         fwp_participant_this, participant);
135
136         return 0;
137 }
138
139 int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
140 {
141         fwp_participant_t *participant;
142         fwp_contract_data_t *contdata;
143         fwp_contract_id_t  id;
144
145         /* Find participant */
146         if (!(participant = fwp_participant_table_find(&participant_id))){
147                 return -EPERM;
148         }
149
150         id = msgb->data;
151         fwp_msgb_pull(msgb, 4);
152         
153         contdata = fwp_contract_table_find(&participant->contract_table, id);
154         contdata->status = FWP_CONT_NEGOTIATED; 
155         
156         return 0;       
157 }
158
159 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
160 {
161         fwp_msg_type_t msg_type;
162         fwp_participant_id_t    participant_id;
163
164         fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
165         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
166         
167         switch (msg_type) {
168                 case  FWP_MSG_HELLO:
169                         FWP_DEBUG("Message HELLO received from nodeid = %d\
170                                         appid = %d\n", participant_id.node_id, 
171                                         participant_id.app_id);
172                         fwp_mngr_hello(msgb, participant_id);
173                         break;
174
175                 case  FWP_MSG_RESERVE: 
176                         FWP_DEBUG("Message RESERVE received from nodeid = %d\
177                                         appid = %d\n", participant_id.node_id, 
178                                         participant_id.app_id);
179                         fwp_mngr_contract_reserve(msgb, participant_id);
180                         break;
181
182                 case  FWP_MSG_COMMIT: 
183                         FWP_DEBUG("Message COMMIT received from nodeid = %d\
184                                         appid = %d\n", participant_id.node_id, 
185                                         participant_id.app_id);
186                         fwp_mngr_contract_commit(msgb, participant_id);
187                         break;  
188                 default:
189                         printf("Invalid message\n.");
190                         fwp_msgb_free(msgb);
191         }
192 }
193
194 void fwp_mngr_main_loop()
195 {
196         struct fwp_msgb *msgb;
197
198         /* start admission control thread */
199         while (1 /*exit_flag*/){
200                 fwp_mngr_input(&msgb);
201                 if (msgb)
202                         fwp_mngr_msg_handler(msgb);
203                 FWP_DEBUG("Mngr waiting for next msg.\n");
204         }
205 }
206
207 int fwp_mngr_init()
208 {
209         fwp_participant_info_t  my_info;
210         int rv;
211
212         if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
213             (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
214
215                 return rv;
216         }
217         
218         /* Create fwp_participant_this */
219         my_info.id.node_id = inet_addr("127.0.0.1");
220         my_info.id.app_id = getpid();
221         my_info.stream_id = FWP_MNGR_STREAM_ID;
222
223         fwp_participant_this = fwp_participant_create(&my_info);
224         fwp_participant_mngr = fwp_participant_this;
225         fwp_receive_endpoint_create(my_info.stream_id, 0,
226                                         &fwp_participant_this->epointd);
227         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
228                                fwp_participant_this->id.node_id,
229                                fwp_participant_this->stream_id);
230         return 0;
231
232 }
233
234 int main()
235 {
236         if (fwp_mngr_init()) {
237                 fprintf(stderr,"FWP manager initialization failed.\n");
238                 exit(1);
239         }
240
241         fwp_mngr_main_loop();
242         
243         return 0;       
244