]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
Added support for contract negotiation
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_conf.h"
2 #include "fwp_mngt.h"
3 #include "fwp_endpoint.h"
4
5 /** 
6  * Global mngt variables
7  */
8
9 /**< Pointer to participant of this application */
10 fwp_participant_t       *fwp_participant_this;
11 /**< Pointer to manager participant */
12 fwp_participant_t       *fwp_participant_mngr;
13
14 static fwp_contract_t fwp_service_contract = {
15         .budget = 100,
16         .period_usec = 30,
17         .deadline_usec = 1000*1000
18 };
19
20 static fwp_vres_params_t fwp_service_vres_params = {
21         .id = 0,
22         .ac_id = FWP_AC_BK, 
23         .budget = 100,
24         .period_usec = 30,
25 };
26
27 /**
28  * Send management message to participant
29  *
30  */
31 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
32                   fwp_participant_t *source, fwp_participant_t *dest)
33 {
34         int ret;
35         
36         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
37         fwp_msg_header_in(msgb->data, type, msgb->len, source->id);
38
39         ret = fwp_send(dest->epointd, msgb->data, msgb->len, 0);
40
41         return ret;
42 }
43
44 /**
45  * Receives management message from participant
46  *
47  */
48 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
49                         fwp_msgb_t *msgb)
50 {
51         int size;
52         
53         fwp_msgb_reset_data(msgb);
54         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
55                         msgb->buffer_size, 0);
56         if (size < 0)
57                 return size;
58         fwp_msgb_put(msgb, size);
59
60         fwp_msg_header_out(msgb->data, type, participant_id);
61         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
62         
63         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
64                         participant_id->node_id, participant_id->app_id);
65
66         return 0;
67
68
69 int fwp_mngt_service_vres_create(fwp_contract_d_t* contd, fwp_vres_d_t* vresdp)
70 {
71         fwp_contract_d_t        contractd;
72         fwp_contract_data_t*    contdata;
73         int ret;
74         
75         /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
76                  fprintf(stderr,"Unable to open service vres\n");
77                  return -1;
78         }*/
79         
80         contractd = fwp_contract_create(&fwp_service_contract);
81         contdata = contractd;
82         if (!contdata)
83                 return -1;
84                 
85         /* TODO: Consider to call _fwp_contract_commit */
86         contdata->status = FWP_CONT_NEGOTIATED;
87         /* Set parameters of vres 
88          * and activate it if needed */
89         ret = fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
90         if (ret < 0) {
91                 int e = errno;
92                 fwp_contract_destroy(contractd);
93                 errno = e;
94                 return ret;
95         }
96         *vresdp = contdata->vresd;
97         
98         FWP_DEBUG("Service vres negotiated\n");
99         *contd = contractd;
100         return 0;
101 }
102
103 /**
104  * Launches discovery/connect process to 
105  * introduce itself to fwp manager and get description of manager
106  * */
107 int fwp_mngt_connect()
108 {
109         fwp_participant_info_t  my_info, mngr_info;
110         fwp_participant_id_t    participant_id;
111         fwp_msgb_t              *msgb;
112         fwp_msg_type_t          msg_type;
113         fwp_endpoint_attr_t     attr;
114         int ret, e;
115         
116         fwp_endpoint_attr_init(&attr);
117         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
118
119         /* Create discovery endpoint */
120         FWP_DEBUG("Service vres created\n");
121         fwp_mngt_service_vres_create(&fwp_participant_mngr->service_contract,
122                                      &fwp_participant_mngr->vresd);
123         
124         FWP_DEBUG("Discovery send endpoint created\n");
125         ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
126                                        fwp_participant_mngr->stream_id,
127                                        &attr, &fwp_participant_mngr->epointd);
128         if (ret != 0) {
129                 e = errno;
130                 goto err_vres;
131         }
132         
133         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
134                                 fwp_participant_mngr->vresd);
135         
136         /* prepare hello message */
137         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
138                               sizeof(struct fwp_msg_hello));
139         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
140         
141         my_info.id = fwp_participant_this->id;
142         my_info.stream_id = fwp_participant_this->stream_id;
143
144         fwp_msg_hello_in(msgb->tail, &my_info);
145         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
146
147         /* Send hello to manager */
148         ret = fwp_mngt_send(FWP_MSG_HELLO, msgb, 
149                             fwp_participant_this, fwp_participant_mngr);
150         if (ret < 0) {
151                 e = errno;
152                 goto err_ep;
153         }
154
155         /* receive hello from manager */
156         alarm(3);               /* Timeout in secconds */
157         ret = fwp_mngt_recv(&msg_type, &participant_id, msgb);
158         alarm(0);
159         if (ret < 0) {
160                 if (errno == EINTR) e = ETIMEDOUT;
161                 else e = errno;
162                 goto err_ep;
163         }
164         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
165                         participant_id.node_id, participant_id.app_id);
166         
167         /* Process hello msg from manager */
168         fwp_msg_hello_out(msgb->data, &mngr_info);
169         fwp_participant_mngr->id  = mngr_info.id;
170         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
171         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
172                         mngr_info.id.node_id, mngr_info.id.app_id);
173         
174         /* unbind and delete discovery mngr send endpoint */
175         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
176         fwp_endpoint_destroy(fwp_participant_mngr->epointd);
177
178         /* Create mngt send endpoint to manager */
179         ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
180                                        fwp_participant_mngr->stream_id, &attr,
181                                        &fwp_participant_mngr->epointd);
182         if (ret != 0) {
183                 e = errno;
184                 goto err_vres;
185         }
186         
187         FWP_DEBUG("Management send endpoint created\n");
188         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
189                                 fwp_participant_mngr->vresd);
190         return 0;
191 err_ep:
192         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
193         fwp_endpoint_destroy(fwp_participant_mngr->epointd);
194 err_vres:
195         fwp_vres_destroy(fwp_participant_mngr->vresd);
196         errno = e;
197         return ret;
198 }
199 /**
200  * Disconnect from manager
201  *
202  */
203 int fwp_mngt_disconnect()
204 {
205         fwp_msgb_t *msgb;
206
207         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
208                                 sizeof(struct fwp_msg_contracthdr)); 
209         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
210         
211         /* Say GoodBye to manager */
212         FWP_DEBUG("Send BYE to manager\n");
213         fwp_mngt_send(FWP_MSG_BYE, msgb, 
214                         fwp_participant_this, fwp_participant_mngr);
215         
216         fwp_send_endpoint_unbind(fwp_participant_this->epointd);
217         fwp_endpoint_destroy(fwp_participant_this->epointd);
218         fwp_vres_destroy(fwp_participant_this->vresd);
219         /* TODO: iterate through contract table and delete contracts */
220
221         return 0;
222 }
223
224 /* TODO: Add atexti handler to remove all contracts on application
225  * exit/crash. */
226
227 /**
228  * FWP Management initialization 
229  * - creates and initializes fwp_participant_this 
230  * - creates and initializes fwp_participant_mngr
231  * - calls fwp_mngt
232  */
233 int fwp_mngt_init()
234 {
235         fwp_participant_info_t  my_info, mngr_info;
236         unsigned int node_id;
237         fwp_endpoint_attr_t attr;
238         char *value;
239         int ret;
240         
241         fwp_endpoint_attr_init(&attr);
242         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
243
244         /* Create fwp_participant_this */       
245         my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
246         if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
247                 /* if default then check env variable */
248                 value = getenv("FWP_MY_ADDR");
249                 if (value) {
250                         my_info.id.node_id = inet_addr(value);
251                 }       
252         }
253         fwp_configuration.my_node_id = my_info.id.node_id;
254         my_info.id.app_id = getpid();
255         my_info.stream_id = fwp_configuration.my_stream_id;
256
257         fwp_participant_this = fwp_participant_new(&my_info);   
258         ret = fwp_receive_endpoint_create(my_info.stream_id, &attr,
259                                           &fwp_participant_this->epointd);
260         if (ret != 0)
261                 return ret;
262         /* FIXME 
263         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
264                                 &fwp_participant_this->stream_id,
265                                 &flags,
266                                 fwp_participant_this->epointd);
267         */
268         fwp_endpoint_get_params(fwp_participant_this->epointd, 
269                                 &node_id, 
270                                 &fwp_participant_this->stream_id,
271                                 &attr);
272         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
273                         fwp_participant_this->id.node_id, 
274                         fwp_participant_this->stream_id);
275         
276         /* Create fwp_participant_mngr */
277         
278         mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
279         /* Env. variable always overrides configured settings */
280         value = getenv("FWP_MNGR_ADDR");
281         if (value) {
282                 mngr_info.id.node_id = inet_addr(value);
283         }       
284         FWP_DEBUG("mngr node=%s node_id=%d\n", 
285                         fwp_configuration.mngr_addr,
286                         mngr_info.id.node_id);
287         fwp_configuration.mngr_node_id = mngr_info.id.node_id;
288         mngr_info.id.app_id = getpid();
289         mngr_info.stream_id = fwp_configuration.mngr_stream_id;
290         
291         if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
292                 (my_info.stream_id == mngr_info.stream_id)) {
293                 /* I am a manager  */
294                 FWP_DEBUG("I am FWP manager\n");
295                 fwp_participant_mngr = fwp_participant_this;
296         } else {
297                 fwp_participant_mngr = fwp_participant_new(&mngr_info);
298                 /* Connet to FWP manager */
299                 ret = fwp_mngt_connect();
300                 if (ret != 0)
301                         return ret;
302         }       
303         
304         return 0;
305 }