]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
FWP tests got to the functional state
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_conf.h"
2 #include "fwp_mngt.h"
3
4 #include <stdlib.h>
5 /* 
6  * Global mngt variables
7  */
8
9 fwp_participant_t       *fwp_participant_this;
10 fwp_participant_t       *fwp_participant_mngr;
11
12 /*fwp_endpoint_d_t      fwp_mngt_repointd;*/
13
14 /**
15  * struct resource {
16  *      char name[10];
17  *      int id;
18  *      participant_my;
19  *      participant_mngr;
20  *      contract_ops;
21  *      fna_ops;
22  * }
23  */
24
25 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
26                   fwp_participant_t *source, fwp_participant_t *dest)
27 {
28         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
29         fwp_msg_header_in(msgb->data, type, source->id);
30
31         fwp_send(dest->epointd, msgb->data, msgb->len, 0);
32
33         return 0;
34 }
35
36 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
37                         fwp_msgb_t *msgb)
38 {
39         int size;
40         
41         fwp_msgb_reset_data(msgb);
42         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
43                         msgb->buffer_size, 0);
44         fwp_msgb_put(msgb, size);
45
46         fwp_msg_header_out(msgb->data, type, participant_id);
47         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
48         
49         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
50                         participant_id->node_id, participant_id->app_id);
51
52         return 0;
53
54
55 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
56 {
57         struct fwp_vres_params  fwp_service_vparams;
58         
59         /* TODO: Add to contract table */
60         /* create service vres */
61         fwp_service_vparams.ac_id = FWP_AC_BK; 
62         fwp_service_vparams.budget = 100;
63         fwp_service_vparams.period_usec = 30;
64         
65         if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
66                 fprintf(stderr,"Unable to open service vres\n");
67                 return -1;
68         }
69         
70         FWP_DEBUG("Service vres negotiated\n");
71         
72         return 0;
73 }
74
75 /* Launch discovery/connect process to 
76  * introduce itself to fwp manager and get description of manager*/
77 int fwp_mngt_connect()
78 {
79         fwp_participant_info_t  my_info, mngr_info;
80         fwp_participant_id_t    participant_id;
81         fwp_msgb_t              *msgb;
82         fwp_msg_type_t          msg_type;
83         fwp_endpoint_attr_t     attr;
84         
85         fwp_endpoint_attr_init(&attr);
86         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
87
88         /* Create discovery endpoint */
89         FWP_DEBUG("Service vres created\n");
90         fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
91         
92         FWP_DEBUG("Discovery send endpoint created\n");
93         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
94                                  fwp_participant_mngr->stream_id,
95                                  &attr, &fwp_participant_mngr->epointd);        
96         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
97                                 fwp_participant_mngr->vresd);
98         
99         /* prepare hello message */
100         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
101                               sizeof(struct fwp_msg_hello));
102         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
103         
104         my_info.id = fwp_participant_this->id;
105         my_info.stream_id = fwp_participant_this->stream_id;
106
107         fwp_msg_hello_in(msgb->tail, &my_info);
108         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
109
110         /* Send hello to manager */
111         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
112                         fwp_participant_this, fwp_participant_mngr);
113
114         /* receive hello from manager */
115         fwp_mngt_recv(&msg_type, &participant_id, msgb);
116         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
117                         participant_id.node_id, participant_id.app_id);
118         
119         /* Process hello msg from manager */
120         fwp_msg_hello_out(msgb->data, &mngr_info);
121         fwp_participant_mngr->id  = mngr_info.id;
122         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
123         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
124                         mngr_info.id.node_id, mngr_info.id.app_id);
125         
126         /* unbind and delete discovery mngr send endoint */
127         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
128         /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
129
130         /* Create mngt send endpoint to manager */
131         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
132                                  fwp_participant_mngr->stream_id, &attr,
133                                  &fwp_participant_mngr->epointd);
134         FWP_DEBUG("Management send endpoint created\n");
135         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
136                                 fwp_participant_mngr->vresd);
137         return 0;
138 }
139
140 int fwp_mngt_init()
141 {
142         fwp_participant_info_t  my_info, mngr_info;
143         unsigned int node_id;
144         fwp_endpoint_attr_t attr;
145         char *value;
146         
147         fwp_endpoint_attr_init(&attr);
148         fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
149
150         /* Create fwp_participant_this */       
151         my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
152         if (my_info.id.node_id == inet_addr("127.0.0.1")) {
153                 /* if default then check env variable */
154                 value = getenv("FWP_MY_ADDR");
155                 if (value) {
156                         my_info.id.node_id = inet_addr(value);
157                 }       
158         }
159         fwp_configuration.my_node_id = my_info.id.node_id;
160         my_info.id.app_id = getpid();
161         my_info.stream_id = fwp_configuration.my_stream_id;
162
163         fwp_participant_this = fwp_participant_create(&my_info);        
164         fwp_receive_endpoint_create(my_info.stream_id, &attr,
165                                         &fwp_participant_this->epointd);
166         /* FIXME 
167         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
168                                 &fwp_participant_this->stream_id,
169                                 &flags,
170                                 fwp_participant_this->epointd);
171         */
172         fwp_endpoint_get_params(fwp_participant_this->epointd, 
173                                 &node_id, 
174                                 &fwp_participant_this->stream_id,
175                                 &attr);
176         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
177                         fwp_participant_this->id.node_id, 
178                         fwp_participant_this->stream_id);
179         
180         /* Create fwp_participant_mngr */
181         
182         mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
183         FWP_DEBUG("mngr node=%s node_id=%d\n", 
184                         fwp_configuration.mngr_addr,
185                         mngr_info.id.node_id);
186         if (mngr_info.id.node_id == inet_addr("255.255.255.255")) {
187                 /* if default then check env variable */
188                 value = getenv("FWP_MNGR_ADDR");
189                 if (value) {
190                         mngr_info.id.node_id = inet_addr(value);
191                 }       
192         }
193         fwp_configuration.mngr_node_id = mngr_info.id.node_id;
194         mngr_info.id.app_id = getpid();
195         mngr_info.stream_id = fwp_configuration.mngr_stream_id;
196         
197         if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
198                 (my_info.stream_id == mngr_info.stream_id)) {
199                 /* I am a manager  */
200                 FWP_DEBUG("I am FWP manager\n");
201                 fwp_participant_mngr = fwp_participant_this;
202         } else {
203                 fwp_participant_mngr = fwp_participant_create(&mngr_info);
204                 /* Connet to FWP manager */
205                 fwp_mngt_connect();
206         }       
207         
208         return 0;
209 }