]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
e8fef7cbfae4133644ae64d0ebb111c034bb4783
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_conf.h"
2 #include "fwp_mngt.h"
3 #include "fwp_endpoint.h"
4
5 /** 
6  * Global mngt variables
7  */
8
9 /**< Pointer to participant of this application*/
10 fwp_participant_t       *fwp_participant_this;
11 /**< Pointer to manager participant record*/
12 fwp_participant_t       *fwp_participant_mngr;
13
14 static fwp_contract_t fwp_service_contract = {
15         .budget = 100,
16         .period_usec = 30,
17 };
18
19 static fwp_vres_params_t fwp_service_vres_params = {
20         .id = 0,
21         .ac_id = FWP_AC_BK, 
22         .budget = 100,
23         .period_usec = 30,
24 };
25
26 /**
27  * Send management message to participant
28  *
29  */
30 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
31                   fwp_participant_t *source, fwp_participant_t *dest)
32 {
33         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
34         fwp_msg_header_in(msgb->data, type, source->id);
35
36         fwp_send(dest->epointd, msgb->data, msgb->len, 0);
37
38         return 0;
39 }
40
41 /**
42  * Receives management message from participant
43  *
44  */
45 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
46                         fwp_msgb_t *msgb)
47 {
48         int size;
49         
50         fwp_msgb_reset_data(msgb);
51         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
52                         msgb->buffer_size, 0);
53         fwp_msgb_put(msgb, size);
54
55         fwp_msg_header_out(msgb->data, type, participant_id);
56         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
57         
58         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
59                         participant_id->node_id, participant_id->app_id);
60
61         return 0;
62
63
64 int fwp_mngt_service_vres_create(fwp_vres_d_t* vresdp)
65 {
66         fwp_contract_d_t        contractd;
67         fwp_contract_data_t*    contdata;
68         
69         //contractd = fwp_contract_create(&fwp_service_contract);
70         if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
71                 fprintf(stderr,"Unable to open service vres\n");
72                 return -1;
73         }
74         //contdata = contractd;
75                 
76         /* TODO: Consider to call _fwp_contract_commit */
77         //contdata->status = FWP_CONT_NEGOTIATED;
78         /* Set parameters of vres 
79          * and activate it if needed */
80         //fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
81         //*vresdp = contdata->vresd;
82         
83         FWP_DEBUG("Service vres negotiated\n");
84         return 0;
85 }
86
87 /**
88  * Launches discovery/connect process to 
89  * introduce itself to fwp manager and get description of manager
90  * */
91 int fwp_mngt_connect()
92 {
93         fwp_participant_info_t  my_info, mngr_info;
94         fwp_participant_id_t    participant_id;
95         fwp_msgb_t              *msgb;
96         fwp_msg_type_t          msg_type;
97         fwp_endpoint_attr_t     attr;
98         
99         fwp_endpoint_attr_init(&attr);
100         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
101
102         /* Create discovery endpoint */
103         FWP_DEBUG("Service vres created\n");
104         fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
105         
106         FWP_DEBUG("Discovery send endpoint created\n");
107         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
108                                  fwp_participant_mngr->stream_id,
109                                  &attr, &fwp_participant_mngr->epointd);        
110         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
111                                 fwp_participant_mngr->vresd);
112         
113         /* prepare hello message */
114         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
115                               sizeof(struct fwp_msg_hello));
116         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
117         
118         my_info.id = fwp_participant_this->id;
119         my_info.stream_id = fwp_participant_this->stream_id;
120
121         fwp_msg_hello_in(msgb->tail, &my_info);
122         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
123
124         /* Send hello to manager */
125         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
126                         fwp_participant_this, fwp_participant_mngr);
127
128         /* receive hello from manager */
129         fwp_mngt_recv(&msg_type, &participant_id, msgb);
130         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
131                         participant_id.node_id, participant_id.app_id);
132         
133         /* Process hello msg from manager */
134         fwp_msg_hello_out(msgb->data, &mngr_info);
135         fwp_participant_mngr->id  = mngr_info.id;
136         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
137         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
138                         mngr_info.id.node_id, mngr_info.id.app_id);
139         
140         /* unbind and delete discovery mngr send endpoint */
141         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
142         fwp_endpoint_destroy(fwp_participant_mngr->epointd);
143
144         /* Create mngt send endpoint to manager */
145         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
146                                  fwp_participant_mngr->stream_id, &attr,
147                                  &fwp_participant_mngr->epointd);
148         FWP_DEBUG("Management send endpoint created\n");
149         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
150                                 fwp_participant_mngr->vresd);
151         return 0;
152 }
153
154 int fwp_mngt_disconnect()
155 {
156         fwp_msgb_t *msgb;
157
158         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
159                                 sizeof(struct fwp_msg_contracthdr)); 
160         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
161         
162         /* Say GoodBye to manager */
163         fwp_mngt_send(FWP_MSG_BYE, msgb, 
164                         fwp_participant_this, fwp_participant_mngr);
165         
166         /* TODO: iterate through contract table and delete contracts */
167         return 0;
168 }
169
170
171 int fwp_mngt_init()
172 {
173         fwp_participant_info_t  my_info, mngr_info;
174         unsigned int node_id;
175         fwp_endpoint_attr_t attr;
176         char *value;
177         
178         fwp_endpoint_attr_init(&attr);
179         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
180
181         /* Create fwp_participant_this */       
182         my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
183         if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
184                 /* if default then check env variable */
185                 value = getenv("FWP_MY_ADDR");
186                 if (value) {
187                         my_info.id.node_id = inet_addr(value);
188                 }       
189         }
190         fwp_configuration.my_node_id = my_info.id.node_id;
191         my_info.id.app_id = getpid();
192         my_info.stream_id = fwp_configuration.my_stream_id;
193
194         fwp_participant_this = fwp_participant_create(&my_info);        
195         fwp_receive_endpoint_create(my_info.stream_id, &attr,
196                                         &fwp_participant_this->epointd);
197         /* FIXME 
198         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
199                                 &fwp_participant_this->stream_id,
200                                 &flags,
201                                 fwp_participant_this->epointd);
202         */
203         fwp_endpoint_get_params(fwp_participant_this->epointd, 
204                                 &node_id, 
205                                 &fwp_participant_this->stream_id,
206                                 &attr);
207         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
208                         fwp_participant_this->id.node_id, 
209                         fwp_participant_this->stream_id);
210         
211         /* Create fwp_participant_mngr */
212         
213         mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
214         FWP_DEBUG("mngr node=%s node_id=%d\n", 
215                         fwp_configuration.mngr_addr,
216                         mngr_info.id.node_id);
217         if (mngr_info.id.node_id == inet_addr(FWP_MNGR_ADDR_DEFAULT)) {
218                 /* if default then check env variable */
219                 value = getenv("FWP_MNGR_ADDR");
220                 if (value) {
221                         mngr_info.id.node_id = inet_addr(value);
222                 }       
223         }
224         fwp_configuration.mngr_node_id = mngr_info.id.node_id;
225         mngr_info.id.app_id = getpid();
226         mngr_info.stream_id = fwp_configuration.mngr_stream_id;
227         
228         if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
229                 (my_info.stream_id == mngr_info.stream_id)) {
230                 /* I am a manager  */
231                 FWP_DEBUG("I am FWP manager\n");
232                 fwp_participant_mngr = fwp_participant_this;
233         } else {
234                 fwp_participant_mngr = fwp_participant_create(&mngr_info);
235                 /* Connet to FWP manager */
236                 fwp_mngt_connect();
237         }       
238         
239         return 0;
240 }