2 #include "fwp_participant.h"
5 * Global mngt variables
8 fwp_participant_t *fwp_participant_this;
9 fwp_participant_t *fwp_participant_mngr;
11 /*fwp_endpoint_d_t fwp_mngt_repointd;*/
23 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
24 fwp_participant_t *source, fwp_participant_t *dest)
26 fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
27 fwp_msg_header_deflate(msgb->data, type, source->id);
29 fwp_send(dest->epointd, msgb->data, msgb->len);
34 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
39 fwp_msgb_reset_data(msgb);
40 size = fwp_recv(fwp_participant_this->epointd, msgb->data,
42 fwp_msgb_put(msgb, size);
44 fwp_msg_header_inflate(msgb->data, type, participant_id);
45 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
47 FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type,
48 participant_id->node_id, participant_id->app_id);
53 int fwp_negt_request(fwp_contract_t *contract)
56 fwp_contract_data_t *contdata;
58 contdata = fwp_contract_data_new();
59 contdata->vresd = fwp_vres_alloc();
60 contdata->id = fwp_vres_get_id(contdata->vresd);
61 memcpy(&contdata->contract, contract, sizeof(*contract));
62 contdata->status = FWP_CONT_REQUESTED;
64 /* Add to contract table */
65 fwp_contract_table_insert(&fwp_participant_this->contract_table,
67 /* Send contract to manager */
68 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
69 sizeof(struct fwp_msg_contract));
70 fwp_msg_contract_deflate(msgb->data, contdata->id, &contdata->contract);
71 fwp_mngt_send(FWP_MSG_NEGT_REQ, msgb,
72 fwp_participant_this, fwp_participant_mngr);
76 int fwp_negt_reserve()
80 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header));
81 fwp_mngt_send(FWP_MSG_NEGT_RESERVE, msgb,
82 fwp_participant_this, fwp_participant_mngr);
90 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header));
91 fwp_mngt_send(FWP_MSG_NEGT_COMMIT, msgb,
92 fwp_participant_this, fwp_participant_mngr);
96 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
98 struct fwp_vres_params fwp_service_vparams;
100 /* TODO: Add to contract table */
101 /* create service vres */
102 fwp_service_vparams.ac_id = FWP_AC_BK;
103 fwp_service_vparams.budget = 100;
104 fwp_service_vparams.period_usec = 1000;
106 if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
107 fprintf(stderr,"Unable to open service vres\n");
111 FWP_DEBUG("Service vres negotiated\n");
116 /* Launch discovery/connect process to
117 * introduce itself to fwp manager and get description of manager*/
118 void fwp_mngt_connect()
120 fwp_participant_info_t my_info, mngr_info;
121 fwp_participant_id_t participant_id;
123 fwp_msg_type_t msg_type;
126 /* prepare hello message */
127 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
128 sizeof(struct fwp_msg_hello));
129 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
131 my_info.id = fwp_participant_this->id;
132 my_info.stream_id = fwp_participant_this->stream_id;
134 fwp_msg_hello_deflate(msgb->tail, &my_info);
135 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
137 /* Send hello to manager */
138 fwp_mngt_send(FWP_MSG_HELLO, msgb,
139 fwp_participant_this, fwp_participant_mngr);
142 /* receive hello from manager */
143 fwp_mngt_recv(&msg_type, &participant_id, msgb);
144 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
145 participant_id.node_id, participant_id.app_id);
148 /* Process hello msg from manager */
149 fwp_msg_hello_inflate(msgb->data, &mngr_info);
150 fwp_participant_mngr->id = mngr_info.id;
151 fwp_participant_mngr->stream_id = mngr_info.stream_id;
152 FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n",
153 mngr_info.id.node_id, mngr_info.id.app_id);
155 /* unbind and delete discovery mngr send endoint */
156 fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
157 /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
159 /* Create mngt send endpoint to manager */
160 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
161 fwp_participant_mngr->stream_id, 0,
162 &fwp_participant_mngr->epointd);
163 FWP_DEBUG("Management send endpoint created\n");
164 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
165 fwp_participant_mngr->vresd);
170 fwp_participant_info_t my_info, mngr_info;
171 unsigned int node_id;
174 /* Create fwp_participant_this */
175 my_info.id.node_id = inet_addr("127.0.0.1");
176 my_info.id.app_id = getpid();
177 my_info.stream_id = 0;
179 fwp_participant_this = fwp_participant_create(&my_info);
180 fwp_receive_endpoint_create(0, 0, &fwp_participant_this->epointd);
182 fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
183 &fwp_participant_this->stream_id,
185 fwp_participant_this->epointd);
187 fwp_endpoint_get_params(&node_id,
188 &fwp_participant_this->stream_id,
190 fwp_participant_this->epointd);
191 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
192 fwp_participant_this->id.node_id,
193 fwp_participant_this->stream_id);
195 /* Create fwp_participant_mngr */
196 mngr_info.id.node_id = inet_addr("127.0.0.1");
197 /*mngr_info.id.node_id = inet_addr("255.255.255.255");*/
198 mngr_info.id.app_id = getpid();
199 mngr_info.stream_id = FWP_MNGR_STREAM_ID;
201 fwp_participant_mngr = fwp_participant_create(&mngr_info);
203 /* Create discovery endpoint */
204 FWP_DEBUG("Service vres created\n");
205 fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
207 FWP_DEBUG("Discovery send endpoint created\n");
208 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
209 fwp_participant_mngr->stream_id,
210 0, &fwp_participant_mngr->epointd);
211 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
212 fwp_participant_mngr->vresd);