]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
contract negotiation rework using rserve and commit contract_ops
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4 #include "fwp_admctrl.h"
5
6 #define FWP_MTU         2346  
7 #define BUFFSIZE        FWP_MTU 
8
9 /* buffer and socket for incomming message */
10 static unsigned char    buffer[FWP_MTU];
11
12 /* Admission control test */
13 fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
14
15 /**
16  * fwp_mngt_input 
17  *
18  * Function waits for remote or local message 
19  * 
20  * @msgb  received message 
21  * \return 
22  * On success, it returns 0 and the pointer to received message in msgb parameter.
23  * On error, it returns negative error code
24  *
25  */
26 int fwp_mngr_input(struct fwp_msgb **pmsgb)
27 {
28         struct fwp_msgb *msgb;
29         ssize_t size;
30
31         FWP_DEBUG("Waiting for messages\n");
32         /* TODO: consider to replace with fwp_mngt_recv call */
33         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
34          
35         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
36         /* For future: fwp_socket could be allocated behind data in msgb*/
37         if (!(msgb = fwp_msgb_alloc(size))) {
38                 perror("No memory available.\n");
39                 return -ENOMEM;
40         }
41         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
42         msgb->data = buffer;
43         fwp_msgb_put(msgb, size);
44         
45         *pmsgb = msgb;
46         return (0);
47 }
48
49 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
50 {
51         fwp_participant_info_t participant_info, my_info;
52         fwp_participant_t *participant;
53
54         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
55                         participant_id.node_id, participant_id.app_id);
56
57         /* Create a new participant */
58         fwp_msg_hello_out(msgb->data, &participant_info);
59         participant = fwp_participant_create(&participant_info);
60         fwp_mngt_service_vres_create(&participant->vresd);
61         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
62                                         0, &participant->epointd);
63         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
64         fwp_contract_table_init(&participant->contract_table);
65
66         /* Insert participant into table */
67         fwp_participant_table_insert(participant);
68
69         /* Send back hello msg with mngr`s info */
70         /* prepare hello message */
71         fwp_msgb_reset_data(msgb);
72         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
73         
74         my_info.id = fwp_participant_this->id;
75         my_info.stream_id = fwp_participant_this->stream_id;
76
77         fwp_msg_hello_in(msgb->tail, &my_info);
78         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
79
80         /* Send hello to manager */
81         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
82         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
83                         fwp_participant_this, participant);
84
85         FWP_DEBUG("Sent HELLO msg \n");
86 }
87
88 int 
89 fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
90 {
91         fwp_participant_t *participant;
92         fwp_contract_data_t *contdata;
93
94         /* Find participant */
95         if (!(participant = fwp_participant_table_find(&participant_id))){
96                 return -EPERM;
97         }
98
99         contdata = fwp_contract_data_new();
100         
101         /* Extract contract header */
102         fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
103         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
104         /* Extract contract params */
105         fwp_msg_contract_out(msgb->data, &contdata->contract);
106         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
107
108         /*launch admission test */
109         fwp_admctrl_test(contdata);             
110         
111         free(msgb);
112         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
113                                         sizeof(struct fwp_msg_contract) +
114                                         sizeof(struct fwp_msg_vres_params));
115         fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
116         
117         /*Add contract header*/
118         fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
119         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
120         /* Add contract params */
121         fwp_msg_contract_in(msgb->tail, &contdata->contract);
122         fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
123         
124         /*Send back contract reservation */
125         if (contdata->status == FWP_CONT_RESERVED) {
126                 fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
127                 fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
128                 /* Add contract to contract table */
129                 fwp_contract_table_insert(&participant->contract_table,contdata);
130
131         } else {
132                 free(contdata);
133         }       
134         
135         fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
136                         fwp_participant_this, participant);
137
138         return 0;
139 }
140
141 int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
142 {
143         fwp_participant_t *participant;
144         fwp_contract_data_t *contdata;
145         fwp_contract_id_t  id;
146         fwp_contract_status_t  status;
147
148         /* Find participant */
149         if (!(participant = fwp_participant_table_find(&participant_id))){
150                 return -EPERM;
151         }
152
153         fwp_msg_contracthdr_out(msgb->tail, &id, &status);
154         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
155         
156         contdata = fwp_contract_table_find(&participant->contract_table, id);
157         contdata->status = FWP_CONT_NEGOTIATED; 
158         
159         return 0;       
160 }
161
162 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
163 {
164         fwp_msg_type_t msg_type;
165         fwp_participant_id_t    participant_id;
166
167         fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
168         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
169         
170         switch (msg_type) {
171                 case  FWP_MSG_HELLO:
172                         FWP_DEBUG("Message HELLO received from nodeid = %d\
173                                         appid = %d\n", participant_id.node_id, 
174                                         participant_id.app_id);
175                         fwp_mngr_hello(msgb, participant_id);
176                         break;
177
178                 case  FWP_MSG_RESERVE: 
179                         FWP_DEBUG("Message RESERVE received from nodeid = %d\
180                                         appid = %d\n", participant_id.node_id, 
181                                         participant_id.app_id);
182                         fwp_mngr_contract_reserve(msgb, participant_id);
183                         break;
184
185                 case  FWP_MSG_COMMIT: 
186                         FWP_DEBUG("Message COMMIT received from nodeid = %d\
187                                         appid = %d\n", participant_id.node_id, 
188                                         participant_id.app_id);
189                         fwp_mngr_contract_commit(msgb, participant_id);
190                         break;  
191                 default:
192                         printf("Invalid message\n.");
193                         fwp_msgb_free(msgb);
194         }
195 }
196
197 void fwp_mngr_main_loop()
198 {
199         struct fwp_msgb *msgb;
200
201         /* start admission control thread */
202         while (1 /*exit_flag*/){
203                 fwp_mngr_input(&msgb);
204                 if (msgb)
205                         fwp_mngr_msg_handler(msgb);
206                 FWP_DEBUG("Mngr waiting for next msg.\n");
207         }
208 }
209
210 int fwp_mngr_init()
211 {
212         fwp_participant_info_t  my_info;
213         int rv;
214
215         if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
216             (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
217
218                 return rv;
219         }
220         
221         /* Create fwp_participant_this */
222         my_info.id.node_id = inet_addr("127.0.0.1");
223         my_info.id.app_id = getpid();
224         my_info.stream_id = FWP_MNGR_STREAM_ID;
225
226         fwp_participant_this = fwp_participant_create(&my_info);
227         fwp_participant_mngr = fwp_participant_this;
228         fwp_receive_endpoint_create(my_info.stream_id, 0,
229                                         &fwp_participant_this->epointd);
230         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
231                                fwp_participant_this->id.node_id,
232                                fwp_participant_this->stream_id);
233         return 0;
234
235 }
236
237 int main()
238 {
239         if (fwp_mngr_init()) {
240                 fprintf(stderr,"FWP manager initialization failed.\n");
241                 exit(1);
242         }
243
244         fwp_mngr_main_loop();
245         
246         return 0;       
247