]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/mngt/fwp_mngt.c
adding negt functions
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
1 #include "fwp_mngt.h"
2 #include "fwp_participant.h"
3
4 /* 
5  * Global mngt variables
6  */
7
8 fwp_participant_t       *fwp_participant_this;
9 fwp_participant_t       *fwp_participant_mngr;
10
11 /*fwp_endpoint_d_t      fwp_mngt_repointd;*/
12
13 /* temporarily*/
14 /**
15  * struct resource {
16  *      char name[10];
17  *      int id;
18  *      general_ops;
19  *      fna_ops;
20  * }
21  */
22
23 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
24                   fwp_participant_t *source, fwp_participant_t *dest)
25 {
26         fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
27         fwp_msg_header_deflate(msgb->data, type, source->id);
28
29         fwp_send(dest->epointd, msgb->data, msgb->len);
30
31         return 0;
32 }
33
34 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
35                         fwp_msgb_t *msgb)
36 {
37         int size;
38         
39         fwp_msgb_reset_data(msgb);
40         size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
41                         msgb->buffer_size);
42         fwp_msgb_put(msgb, size);
43
44         fwp_msg_header_inflate(msgb->data, type, participant_id);
45         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
46         
47         FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
48                         participant_id->node_id, participant_id->app_id);
49
50         return 0;
51
52
53 int fwp_negt_request(fwp_contract_t *contract)
54 {
55         fwp_msgb_t *msgb;
56         fwp_contract_data_t *contdata;
57
58         contdata = fwp_contract_data_new();
59         contdata->vresd = fwp_vres_alloc();
60         contdata->id = fwp_vres_get_id(contdata->vresd);
61         memcpy(&contdata->contract, contract, sizeof(*contract));
62         contdata->status = FWP_CONT_REQUESTED; 
63         
64         /* Add to contract table */
65         fwp_contract_table_insert(&fwp_participant_this->contract_table, 
66                                         contdata);
67         /* Send contract to manager */
68         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
69                                 sizeof(struct fwp_msg_contract));
70         fwp_msg_contract_deflate(msgb->data, contdata->id, &contdata->contract);
71         fwp_mngt_send(FWP_MSG_NEGT_REQ, msgb, 
72                         fwp_participant_this, fwp_participant_mngr);
73         return 0;
74 }
75
76 int fwp_negt_reserve()
77 {
78         fwp_msgb_t *msgb;
79         
80         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header));
81         fwp_mngt_send(FWP_MSG_NEGT_RESERVE, msgb, 
82                         fwp_participant_this, fwp_participant_mngr);
83         return 0;
84 }
85
86 int fwp_negt_commit()
87 {
88         fwp_msgb_t *msgb;
89         
90         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header));
91         fwp_mngt_send(FWP_MSG_NEGT_COMMIT, msgb, 
92                         fwp_participant_this, fwp_participant_mngr);
93         return 0;
94 }
95
96 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
97 {
98         struct fwp_vres_params  fwp_service_vparams;
99         
100         /* TODO: Add to contract table */
101         /* create service vres */
102         fwp_service_vparams.ac_id = FWP_AC_BK; 
103         fwp_service_vparams.budget = 100;
104         fwp_service_vparams.period_usec = 1000;
105         
106         if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
107                 fprintf(stderr,"Unable to open service vres\n");
108                 return -1;
109         }
110         
111         FWP_DEBUG("Service vres negotiated\n");
112         
113         return 0;
114 }
115
116 /* Launch discovery/connect process to 
117  * introduce itself to fwp manager and get description of manager*/
118 void fwp_mngt_connect()
119 {
120         fwp_participant_info_t  my_info, mngr_info;
121         fwp_participant_id_t    participant_id;
122         fwp_msgb_t              *msgb;
123         fwp_msg_type_t          msg_type;
124
125
126         /* prepare hello message */
127         msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
128                               sizeof(struct fwp_msg_hello));
129         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
130         
131         my_info.id = fwp_participant_this->id;
132         my_info.stream_id = fwp_participant_this->stream_id;
133
134         fwp_msg_hello_deflate(msgb->tail, &my_info);
135         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
136
137         /* Send hello to manager */
138         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
139                         fwp_participant_this, fwp_participant_mngr);
140
141
142         /* receive hello from manager */
143         fwp_mngt_recv(&msg_type, &participant_id, msgb);
144         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
145                         participant_id.node_id, participant_id.app_id);
146         
147         
148         /* Process hello msg from manager */
149         fwp_msg_hello_inflate(msgb->data, &mngr_info);
150         fwp_participant_mngr->id  = mngr_info.id;
151         fwp_participant_mngr->stream_id  = mngr_info.stream_id;
152         FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
153                         mngr_info.id.node_id, mngr_info.id.app_id);
154         
155         /* unbind and delete discovery mngr send endoint */
156         fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
157         /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
158
159         /* Create mngt send endpoint to manager */
160         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
161                                  fwp_participant_mngr->stream_id, 0,
162                                  &fwp_participant_mngr->epointd);
163         FWP_DEBUG("Management send endpoint created\n");
164         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
165                                 fwp_participant_mngr->vresd);
166 }
167
168 int fwp_mngt_init()
169 {
170         fwp_participant_info_t  my_info, mngr_info;
171         unsigned int node_id;
172         int flags;
173         
174         /* Create fwp_participant_this */
175         my_info.id.node_id = inet_addr("127.0.0.1");
176         my_info.id.app_id = getpid();
177         my_info.stream_id = 0;
178
179         fwp_participant_this = fwp_participant_create(&my_info);        
180         fwp_receive_endpoint_create(0, 0, &fwp_participant_this->epointd);
181         /* FIXME 
182         fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
183                                 &fwp_participant_this->stream_id,
184                                 &flags,
185                                 fwp_participant_this->epointd);
186         */
187         fwp_endpoint_get_params(&node_id, 
188                                 &fwp_participant_this->stream_id,
189                                 &flags,
190                                 fwp_participant_this->epointd);
191         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
192                         fwp_participant_this->id.node_id, 
193                         fwp_participant_this->stream_id);
194         
195         /* Create fwp_participant_mngr */
196         mngr_info.id.node_id = inet_addr("127.0.0.1");
197         /*mngr_info.id.node_id = inet_addr("255.255.255.255");*/
198         mngr_info.id.app_id = getpid();
199         mngr_info.stream_id = FWP_MNGR_STREAM_ID;
200         
201         fwp_participant_mngr = fwp_participant_create(&mngr_info);
202         
203         /* Create discovery endpoint */
204         FWP_DEBUG("Service vres created\n");
205         fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
206         
207         FWP_DEBUG("Discovery send endpoint created\n");
208         fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
209                                  fwp_participant_mngr->stream_id,
210                                  0, &fwp_participant_mngr->epointd);    
211         fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
212                                 fwp_participant_mngr->vresd);
213         
214         return 0;
215 }