]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
Added: mngr- delete participant from table after BYE message was received
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_conf.h"
2 #include "fwp_mngt.h"
3 #include "fwp_endpoint.h"
4
5 /** 
6  * Global mngt variables
7  */
8
9 /**< Pointer to participant of this application*/
10 fwp_participant_t       *fwp_participant_this;
11 /**< Pointer to manager participant record*/
12 fwp_participant_t       *fwp_participant_mngr;
13
14 static fwp_contract_t fwp_service_contract = {
15         .budget = 100,
16         .period_usec = 30,
17 };
18
19 static fwp_vres_params_t fwp_service_vres_params = {
20         .id = 0,
21         .ac_id = FWP_AC_BK, 
22         .budget = 100,
23         .period_usec = 30,
24 };
25
26 /**
27  * Send management message to participant
28  *
29  */
30 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
31                   fwp_participant_t *source, fwp_participant_t *dest)
32 {
33         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
34         fwp_msg_header_in(msgb->data, type, source->id);
35
36         fwp_send(dest->epointd, msgb->data, msgb->len, 0);
37
38         return 0;
39 }
40
41 /**
42  * Receives management message from participant
43  *
44  */
45 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
46                         fwp_msgb_t *msgb)
47 {
48         int size;
49         
50         fwp_msgb_reset_data(msgb);
51         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
52                         msgb->buffer_size, 0);
53         fwp_msgb_put(msgb, size);
54
55         fwp_msg_header_out(msgb->data, type, participant_id);
56         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
57         
58         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
59                         participant_id->node_id, participant_id->app_id);
60
61         return 0;
62
63
64 int fwp_mngt_service_vres_create(fwp_vres_d_t* vresdp)
65 {
66         fwp_contract_d_t        contractd;
67         fwp_contract_data_t*    contdata;
68         
69         /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
70                  fprintf(stderr,"Unable to open service vres\n");
71                  return -1;
72         }*/
73         
74         contractd = fwp_contract_create(&fwp_service_contract);
75         contdata = contractd;
76                 
77         /* TODO: Consider to call _fwp_contract_commit */
78         contdata->status = FWP_CONT_NEGOTIATED;
79         /* Set parameters of vres 
80          * and activate it if needed */
81         fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
82         *vresdp = contdata->vresd;
83         
84         FWP_DEBUG("Service vres negotiated\n");
85         return 0;
86 }
87
88 /**
89  * Launches discovery/connect process to 
90  * introduce itself to fwp manager and get description of manager
91  * */
92 int fwp_mngt_connect()
93 {
94         fwp_participant_info_t  my_info, mngr_info;
95         fwp_participant_id_t    participant_id;
96         fwp_msgb_t              *msgb;
97         fwp_msg_type_t          msg_type;
98         fwp_endpoint_attr_t     attr;
99         
100         fwp_endpoint_attr_init(&attr);
101         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
102
103         /* Create discovery endpoint */
104         FWP_DEBUG("Service vres created\n");
105         fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
106         
107         FWP_DEBUG("Discovery send endpoint created\n");
108         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
109                                  fwp_participant_mngr->stream_id,
110                                  &attr, &fwp_participant_mngr->epointd);        
111         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
112                                 fwp_participant_mngr->vresd);
113         
114         /* prepare hello message */
115         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
116                               sizeof(struct fwp_msg_hello));
117         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
118         
119         my_info.id = fwp_participant_this->id;
120         my_info.stream_id = fwp_participant_this->stream_id;
121
122         fwp_msg_hello_in(msgb->tail, &my_info);
123         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
124
125         /* Send hello to manager */
126         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
127                         fwp_participant_this, fwp_participant_mngr);
128
129         /* receive hello from manager */
130         fwp_mngt_recv(&msg_type, &participant_id, msgb);
131         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
132                         participant_id.node_id, participant_id.app_id);
133         
134         /* Process hello msg from manager */
135         fwp_msg_hello_out(msgb->data, &mngr_info);
136         fwp_participant_mngr->id  = mngr_info.id;
137         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
138         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
139                         mngr_info.id.node_id, mngr_info.id.app_id);
140         
141         /* unbind and delete discovery mngr send endpoint */
142         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
143         fwp_endpoint_destroy(fwp_participant_mngr->epointd);
144
145         /* Create mngt send endpoint to manager */
146         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
147                                  fwp_participant_mngr->stream_id, &attr,
148                                  &fwp_participant_mngr->epointd);
149         FWP_DEBUG("Management send endpoint created\n");
150         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
151                                 fwp_participant_mngr->vresd);
152         return 0;
153 }
154
155 int fwp_mngt_disconnect()
156 {
157         fwp_msgb_t *msgb;
158
159         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
160                                 sizeof(struct fwp_msg_contracthdr)); 
161         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
162         
163         /* Say GoodBye to manager */
164         fwp_mngt_send(FWP_MSG_BYE, msgb, 
165                         fwp_participant_this, fwp_participant_mngr);
166         
167         fwp_send_endpoint_unbind(fwp_participant_this->epointd);
168         fwp_endpoint_destroy(fwp_participant_this->epointd);
169         fwp_vres_destroy(fwp_participant_this->vresd);
170         /* TODO: iterate through contract table and delete contracts */
171
172         return 0;
173 }
174
175
176 int fwp_mngt_init()
177 {
178         fwp_participant_info_t  my_info, mngr_info;
179         unsigned int node_id;
180         fwp_endpoint_attr_t attr;
181         char *value;
182         
183         fwp_endpoint_attr_init(&attr);
184         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
185
186         /* Create fwp_participant_this */       
187         my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
188         if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
189                 /* if default then check env variable */
190                 value = getenv("FWP_MY_ADDR");
191                 if (value) {
192                         my_info.id.node_id = inet_addr(value);
193                 }       
194         }
195         fwp_configuration.my_node_id = my_info.id.node_id;
196         my_info.id.app_id = getpid();
197         my_info.stream_id = fwp_configuration.my_stream_id;
198
199         fwp_participant_this = fwp_participant_new(&my_info);   
200         fwp_receive_endpoint_create(my_info.stream_id, &attr,
201                                         &fwp_participant_this->epointd);
202         /* FIXME 
203         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
204                                 &fwp_participant_this->stream_id,
205                                 &flags,
206                                 fwp_participant_this->epointd);
207         */
208         fwp_endpoint_get_params(fwp_participant_this->epointd, 
209                                 &node_id, 
210                                 &fwp_participant_this->stream_id,
211                                 &attr);
212         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
213                         fwp_participant_this->id.node_id, 
214                         fwp_participant_this->stream_id);
215         
216         /* Create fwp_participant_mngr */
217         
218         mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
219         FWP_DEBUG("mngr node=%s node_id=%d\n", 
220                         fwp_configuration.mngr_addr,
221                         mngr_info.id.node_id);
222         if (mngr_info.id.node_id == inet_addr(FWP_MNGR_ADDR_DEFAULT)) {
223                 /* if default then check env variable */
224                 value = getenv("FWP_MNGR_ADDR");
225                 if (value) {
226                         mngr_info.id.node_id = inet_addr(value);
227                 }       
228         }
229         fwp_configuration.mngr_node_id = mngr_info.id.node_id;
230         mngr_info.id.app_id = getpid();
231         mngr_info.stream_id = fwp_configuration.mngr_stream_id;
232         
233         if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
234                 (my_info.stream_id == mngr_info.stream_id)) {
235                 /* I am a manager  */
236                 FWP_DEBUG("I am FWP manager\n");
237                 fwp_participant_mngr = fwp_participant_this;
238         } else {
239                 fwp_participant_mngr = fwp_participant_new(&mngr_info);
240                 /* Connet to FWP manager */
241                 fwp_mngt_connect();
242         }       
243         
244         return 0;
245 }