]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
06a47e36590099ffa02a3734e22c048eafdd2f24
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_conf.h"
2 #include "fwp_mngt.h"
3
4 #include <stdlib.h>
5 /** 
6  * Global mngt variables
7  */
8
9 /**< Pointer to participant of this application*/
10 fwp_participant_t       *fwp_participant_this;
11 /**< Pointer to manager participant record*/
12 fwp_participant_t       *fwp_participant_mngr;
13
14 /*fwp_endpoint_d_t      fwp_mngt_repointd;*/
15
16 /**
17  * struct resource {
18  *      char name[10];
19  *      int id;
20  *      participant_my;
21  *      participant_mngr;
22  *      contract_ops;
23  *      fna_ops;
24  * }
25  */
26
27 /**
28  * Send management message to participant
29  *
30  */
31 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
32                   fwp_participant_t *source, fwp_participant_t *dest)
33 {
34         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
35         fwp_msg_header_in(msgb->data, type, source->id);
36
37         fwp_send(dest->epointd, msgb->data, msgb->len, 0);
38
39         return 0;
40 }
41
42 /**
43  * Receives management message from participant
44  *
45  */
46 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
47                         fwp_msgb_t *msgb)
48 {
49         int size;
50         
51         fwp_msgb_reset_data(msgb);
52         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
53                         msgb->buffer_size, 0);
54         fwp_msgb_put(msgb, size);
55
56         fwp_msg_header_out(msgb->data, type, participant_id);
57         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
58         
59         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
60                         participant_id->node_id, participant_id->app_id);
61
62         return 0;
63
64
65 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
66 {
67         struct fwp_vres_params  fwp_service_vparams;
68         
69         /* TODO: Add to contract table */
70         /* create service vres */
71         fwp_service_vparams.ac_id = FWP_AC_BK; 
72         fwp_service_vparams.budget = 100;
73         fwp_service_vparams.period_usec = 30;
74         
75         if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
76                 fprintf(stderr,"Unable to open service vres\n");
77                 return -1;
78         }
79         
80         FWP_DEBUG("Service vres negotiated\n");
81         
82         return 0;
83 }
84
85 /**
86  * Launches discovery/connect process to 
87  * introduce itself to fwp manager and get description of manager
88  * */
89 int fwp_mngt_connect()
90 {
91         fwp_participant_info_t  my_info, mngr_info;
92         fwp_participant_id_t    participant_id;
93         fwp_msgb_t              *msgb;
94         fwp_msg_type_t          msg_type;
95         fwp_endpoint_attr_t     attr;
96         
97         fwp_endpoint_attr_init(&attr);
98         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
99
100         /* Create discovery endpoint */
101         FWP_DEBUG("Service vres created\n");
102         fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
103         
104         FWP_DEBUG("Discovery send endpoint created\n");
105         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
106                                  fwp_participant_mngr->stream_id,
107                                  &attr, &fwp_participant_mngr->epointd);        
108         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
109                                 fwp_participant_mngr->vresd);
110         
111         /* prepare hello message */
112         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
113                               sizeof(struct fwp_msg_hello));
114         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
115         
116         my_info.id = fwp_participant_this->id;
117         my_info.stream_id = fwp_participant_this->stream_id;
118
119         fwp_msg_hello_in(msgb->tail, &my_info);
120         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
121
122         /* Send hello to manager */
123         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
124                         fwp_participant_this, fwp_participant_mngr);
125
126         /* receive hello from manager */
127         fwp_mngt_recv(&msg_type, &participant_id, msgb);
128         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
129                         participant_id.node_id, participant_id.app_id);
130         
131         /* Process hello msg from manager */
132         fwp_msg_hello_out(msgb->data, &mngr_info);
133         fwp_participant_mngr->id  = mngr_info.id;
134         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
135         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
136                         mngr_info.id.node_id, mngr_info.id.app_id);
137         
138         /* unbind and delete discovery mngr send endoint */
139         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
140         fwp_endpoint_destroy(fwp_participant_mngr->epointd);
141
142         /* Create mngt send endpoint to manager */
143         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
144                                  fwp_participant_mngr->stream_id, &attr,
145                                  &fwp_participant_mngr->epointd);
146         FWP_DEBUG("Management send endpoint created\n");
147         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
148                                 fwp_participant_mngr->vresd);
149         return 0;
150 }
151
152 int fwp_mngt_init()
153 {
154         fwp_participant_info_t  my_info, mngr_info;
155         unsigned int node_id;
156         fwp_endpoint_attr_t attr;
157         char *value;
158         
159         fwp_endpoint_attr_init(&attr);
160         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
161
162         /* Create fwp_participant_this */       
163         my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
164         if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
165                 /* if default then check env variable */
166                 value = getenv("FWP_MY_ADDR");
167                 if (value) {
168                         my_info.id.node_id = inet_addr(value);
169                 }       
170         }
171         fwp_configuration.my_node_id = my_info.id.node_id;
172         my_info.id.app_id = getpid();
173         my_info.stream_id = fwp_configuration.my_stream_id;
174
175         fwp_participant_this = fwp_participant_create(&my_info);        
176         fwp_receive_endpoint_create(my_info.stream_id, &attr,
177                                         &fwp_participant_this->epointd);
178         /* FIXME 
179         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
180                                 &fwp_participant_this->stream_id,
181                                 &flags,
182                                 fwp_participant_this->epointd);
183         */
184         fwp_endpoint_get_params(fwp_participant_this->epointd, 
185                                 &node_id, 
186                                 &fwp_participant_this->stream_id,
187                                 &attr);
188         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
189                         fwp_participant_this->id.node_id, 
190                         fwp_participant_this->stream_id);
191         
192         /* Create fwp_participant_mngr */
193         
194         mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
195         FWP_DEBUG("mngr node=%s node_id=%d\n", 
196                         fwp_configuration.mngr_addr,
197                         mngr_info.id.node_id);
198         if (mngr_info.id.node_id == inet_addr(FWP_MNGR_ADDR_DEFAULT)) {
199                 /* if default then check env variable */
200                 value = getenv("FWP_MNGR_ADDR");
201                 if (value) {
202                         mngr_info.id.node_id = inet_addr(value);
203                 }       
204         }
205         fwp_configuration.mngr_node_id = mngr_info.id.node_id;
206         mngr_info.id.app_id = getpid();
207         mngr_info.stream_id = fwp_configuration.mngr_stream_id;
208         
209         if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
210                 (my_info.stream_id == mngr_info.stream_id)) {
211                 /* I am a manager  */
212                 FWP_DEBUG("I am FWP manager\n");
213                 fwp_participant_mngr = fwp_participant_this;
214         } else {
215                 fwp_participant_mngr = fwp_participant_create(&mngr_info);
216                 /* Connet to FWP manager */
217                 fwp_mngt_connect();
218         }       
219         
220         return 0;
221 }