]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Added contract negotiation to fwp_mngrtest
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4 #include "fwp_admctrl.h"
5
6 #define FWP_MTU         2346  
7 #define BUFFSIZE        FWP_MTU 
8
9 /* buffer and socket for incomming message */
10 static unsigned char    buffer[FWP_MTU];
11
12 /* Admission control test */
13 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
14
15 /**
16  * fwp_mngt_input 
17  *
18  * Function waits for remote or local message 
19  * 
20  * @msgb  received message 
21  * \return 
22  * On success, it returns 0 and the pointer to received message in msgb parameter.
23  * On error, it returns negative error code
24  *
25  */
26 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 {
28         struct fwp_msgb *msgb;
29         ssize_t size;
30
31         FWP_DEBUG("Waiting for messages\n");
32         /* TODO: consider to replace with fwp_mngt_recv call */
33         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34          
35         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
36         /* For future: fwp_socket could be allocated behind data in msgb*/
37         if (!(msgb = fwp_msgb_alloc(size))) {
38                 perror("No memory available.\n");
39                 return -ENOMEM;
40         }
41         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42         msgb->data = buffer;
43         fwp_msgb_put(msgb, size);
44         
45         *pmsgb = msgb;
46         return (0);
47 }
48
49 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 {
51         fwp_participant_info_t participant_info, my_info;
52         fwp_participant_t *participant;
53
54         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
55                         participant_id.node_id, participant_id.app_id);
56
57         /* Create a new participant */
58         fwp_msg_hello_out(msgb->data, &participant_info);
59         participant = fwp_participant_create(&participant_info);
60         fwp_mngt_service_vres_create(&participant->vresd);
61         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
62                                         0, &participant->epointd);
63         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
64         fwp_contract_table_init(&participant->contract_table);
65
66         /* Insert participant into table */
67         fwp_participant_table_insert(participant);
68
69         /* Send back hello msg with mngr`s info */
70         /* prepare hello message */
71         fwp_msgb_reset_data(msgb);
72         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73         
74         my_info.id = fwp_participant_this->id;
75         my_info.stream_id = fwp_participant_this->stream_id;
76
77         fwp_msg_hello_in(msgb->tail, &my_info);
78         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79
80         /* Send hello to manager */
81         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
82         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
83                         fwp_participant_this, participant);
84
85         FWP_DEBUG("Sent HELLO msg \n");
86 }
87
88 int 
89 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
90 {
91         fwp_participant_t *participant;
92         fwp_contract_data_t *contdata;
93
94         /* Find participant */
95         if (!(participant = fwp_participant_table_find(&participant_id))){
96                 return -EPERM;
97         }
98
99         contdata = fwp_contract_data_new();
100         
101         /* Extract contract header */
102         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
103         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
104         /* Extract contract params */
105         fwp_msg_contract_out(msgb->data, &contdata->contract);
106         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
107
108         /*launch admission test */
109         fwp_admctrl_test(contdata);             
110         
111         free(msgb);
112         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
113                                         sizeof(struct fwp_msg_contract) +
114                                         sizeof(struct fwp_msg_vres_params));
115         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
116         
117         /*Add contract header*/
118         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
119         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
120         /* Add contract params */
121         fwp_msg_contract_in(msgb->tail, &contdata->contract);
122         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
123         
124         /*Send back contract reservation */
125         if (contdata->status == FWP_CONT_RESERVED) {
126                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
127                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
128                 /* Add contract to contract table */
129                 fwp_contract_table_insert(&participant->contract_table,contdata);
130
131         } else {
132                 free(contdata);
133         }       
134         
135         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
136                         fwp_participant_this, participant);
137
138         return 0;
139 }
140
141 int 
142 fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
143 {
144         fwp_participant_t *participant;
145         fwp_contract_data_t *contdata;
146         fwp_contract_id_t  id;
147         fwp_contract_status_t  status;
148
149         /* Find participant */
150         if (!(participant = fwp_participant_table_find(&participant_id))){
151                 return -EPERM;
152         }
153
154         fwp_msg_contracthdr_out(msgb->tail, &id, &status);
155         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
156         
157         contdata = fwp_contract_table_find(&participant->contract_table, id);
158         contdata->status = FWP_CONT_NEGOTIATED; 
159         
160         return 0;       
161 }
162
163 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
164 {
165         fwp_msg_type_t msg_type;
166         fwp_participant_id_t    participant_id;
167
168         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
169         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
170         
171         switch (msg_type) {
172                 case  FWP_MSG_HELLO:
173                         FWP_DEBUG("Message HELLO received from nodeid = %d\
174                                         appid = %d\n", participant_id.node_id, 
175                                         participant_id.app_id);
176                         fwp_mngr_hello(msgb, participant_id);
177                         break;
178
179                 case  FWP_MSG_RESERVE: 
180                         FWP_DEBUG("Message RESERVE received from nodeid = %d\
181                                         appid = %d\n", participant_id.node_id, 
182                                         participant_id.app_id);
183                         fwp_mngr_contract_reserve(msgb, participant_id);
184                         break;
185
186                 case  FWP_MSG_COMMIT: 
187                         FWP_DEBUG("Message COMMIT received from nodeid = %d\
188                                         appid = %d\n", participant_id.node_id, 
189                                         participant_id.app_id);
190                         fwp_mngr_contract_commit(msgb, participant_id);
191                         break;  
192                 default:
193                         printf("Invalid message\n.");
194                         fwp_msgb_free(msgb);
195         }
196 }
197
198 void fwp_mngr_main_loop()
199 {
200         struct fwp_msgb *msgb;
201
202         /* start admission control thread */
203         while (1 /*exit_flag*/){
204                 fwp_mngr_input(&msgb);
205                 if (msgb)
206                         fwp_mngr_msg_handler(msgb);
207                 FWP_DEBUG("Mngr waiting for next msg.\n");
208         }
209 }
210
211 int fwp_mngr_init()
212 {
213         fwp_participant_info_t  my_info;
214         int rv;
215
216         if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
217             (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
218
219                 return rv;
220         }
221         
222         /* Create fwp_participant_this */
223         my_info.id.node_id = inet_addr("127.0.0.1");
224         my_info.id.app_id = getpid();
225         my_info.stream_id = FWP_MNGR_STREAM_ID;
226
227         fwp_participant_this = fwp_participant_create(&my_info);
228         fwp_participant_mngr = fwp_participant_this;
229         fwp_receive_endpoint_create(my_info.stream_id, 0,
230                                         &fwp_participant_this->epointd);
231         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
232                                fwp_participant_this->id.node_id,
233                                fwp_participant_this->stream_id);
234         return 0;
235
236 }
237
238 int main()
239 {
240         if (fwp_mngr_init()) {
241                 fprintf(stderr,"FWP manager initialization failed.\n");
242                 exit(1);
243         }
244
245         fwp_mngr_main_loop();
246         
247         return 0;       
248