3 #include "fwp_endpoint.h"
6 * Global mngt variables
9 /**< Pointer to participant of this application */
10 fwp_participant_t *fwp_participant_this;
11 /**< Pointer to manager participant */
12 fwp_participant_t *fwp_participant_mngr;
14 static fwp_contract_t fwp_service_contract = {
17 .deadline_usec = 1000*1000
20 static fwp_vres_params_t fwp_service_vres_params = {
28 * Send management message to participant
31 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
32 fwp_participant_t *source, fwp_participant_t *dest)
36 fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
37 fwp_msg_header_in(msgb->data, type, msgb->len, source->id);
39 ret = fwp_send(dest->epointd, msgb->data, msgb->len, 0);
45 * Receives management message from participant
48 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
53 fwp_msgb_reset_data(msgb);
54 size = fwp_recv(fwp_participant_this->epointd, msgb->data,
55 msgb->buffer_size, 0);
58 fwp_msgb_put(msgb, size);
60 fwp_msg_header_out(msgb->data, type, participant_id);
61 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
63 FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type,
64 participant_id->node_id, participant_id->app_id);
69 int fwp_mngt_service_vres_create(fwp_contract_d_t* contd, fwp_vres_d_t* vresdp)
71 fwp_contract_d_t contractd;
72 fwp_contract_data_t* contdata;
75 /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
76 fprintf(stderr,"Unable to open service vres\n");
80 contractd = fwp_contract_create(&fwp_service_contract);
85 /* TODO: Consider to call _fwp_contract_commit */
86 contdata->status = FWP_CONT_NEGOTIATED;
87 /* Set parameters of vres
88 * and activate it if needed */
89 ret = fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
92 fwp_contract_destroy(contractd);
96 *vresdp = contdata->vresd;
98 FWP_DEBUG("Service vres negotiated\n");
104 * Launches discovery/connect process to
105 * introduce itself to fwp manager and get description of manager
107 int fwp_mngt_connect()
109 fwp_participant_info_t my_info, mngr_info;
110 fwp_participant_id_t participant_id;
112 fwp_msg_type_t msg_type;
113 fwp_endpoint_attr_t attr;
116 fwp_endpoint_attr_init(&attr);
117 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
119 /* Create discovery endpoint */
120 FWP_DEBUG("Service vres created\n");
121 fwp_mngt_service_vres_create(&fwp_participant_mngr->service_contract,
122 &fwp_participant_mngr->vresd);
124 FWP_DEBUG("Discovery send endpoint created\n");
125 ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
126 fwp_participant_mngr->stream_id,
127 &attr, &fwp_participant_mngr->epointd);
133 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
134 fwp_participant_mngr->vresd);
136 /* prepare hello message */
137 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
138 sizeof(struct fwp_msg_hello));
139 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
141 my_info.id = fwp_participant_this->id;
142 my_info.stream_id = fwp_participant_this->stream_id;
144 fwp_msg_hello_in(msgb->tail, &my_info);
145 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
147 /* Send hello to manager */
148 ret = fwp_mngt_send(FWP_MSG_HELLO, msgb,
149 fwp_participant_this, fwp_participant_mngr);
155 /* receive hello from manager */
156 alarm(3); /* Timeout in secconds */
157 ret = fwp_mngt_recv(&msg_type, &participant_id, msgb);
160 if (errno == EINTR) e = ETIMEDOUT;
164 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
165 participant_id.node_id, participant_id.app_id);
167 /* Process hello msg from manager */
168 fwp_msg_hello_out(msgb->data, &mngr_info);
169 fwp_participant_mngr->id = mngr_info.id;
170 fwp_participant_mngr->stream_id = mngr_info.stream_id;
171 FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n",
172 mngr_info.id.node_id, mngr_info.id.app_id);
174 /* unbind and delete discovery mngr send endpoint */
175 fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
176 fwp_endpoint_destroy(fwp_participant_mngr->epointd);
178 /* Create mngt send endpoint to manager */
179 ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
180 fwp_participant_mngr->stream_id, &attr,
181 &fwp_participant_mngr->epointd);
187 FWP_DEBUG("Management send endpoint created\n");
188 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
189 fwp_participant_mngr->vresd);
192 fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
193 fwp_endpoint_destroy(fwp_participant_mngr->epointd);
195 fwp_vres_destroy(fwp_participant_mngr->vresd);
200 * Disconnect from manager
203 int fwp_mngt_disconnect()
207 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
208 sizeof(struct fwp_msg_contracthdr));
209 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
211 /* Say GoodBye to manager */
212 FWP_DEBUG("Send BYE to manager\n");
213 fwp_mngt_send(FWP_MSG_BYE, msgb,
214 fwp_participant_this, fwp_participant_mngr);
216 fwp_send_endpoint_unbind(fwp_participant_this->epointd);
217 fwp_endpoint_destroy(fwp_participant_this->epointd);
218 fwp_vres_destroy(fwp_participant_this->vresd);
219 /* TODO: iterate through contract table and delete contracts */
224 /* TODO: Add atexti handler to remove all contracts on application
228 * FWP Management initialization
229 * - creates and initializes fwp_participant_this
230 * - creates and initializes fwp_participant_mngr
235 fwp_participant_info_t my_info, mngr_info;
236 unsigned int node_id;
237 fwp_endpoint_attr_t attr;
241 fwp_endpoint_attr_init(&attr);
242 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
244 /* Create fwp_participant_this */
245 my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
246 if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
247 /* if default then check env variable */
248 value = getenv("FWP_MY_ADDR");
250 my_info.id.node_id = inet_addr(value);
253 fwp_configuration.my_node_id = my_info.id.node_id;
254 my_info.id.app_id = getpid();
255 my_info.stream_id = fwp_configuration.my_stream_id;
257 fwp_participant_this = fwp_participant_new(&my_info);
258 ret = fwp_receive_endpoint_create(my_info.stream_id, &attr,
259 &fwp_participant_this->epointd);
263 fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
264 &fwp_participant_this->stream_id,
266 fwp_participant_this->epointd);
268 fwp_endpoint_get_params(fwp_participant_this->epointd,
270 &fwp_participant_this->stream_id,
272 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
273 fwp_participant_this->id.node_id,
274 fwp_participant_this->stream_id);
276 /* Create fwp_participant_mngr */
278 mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
279 /* Env. variable always overrides configured settings */
280 value = getenv("FWP_MNGR_ADDR");
282 mngr_info.id.node_id = inet_addr(value);
284 FWP_DEBUG("mngr node=%s node_id=%d\n",
285 fwp_configuration.mngr_addr,
286 mngr_info.id.node_id);
287 fwp_configuration.mngr_node_id = mngr_info.id.node_id;
288 mngr_info.id.app_id = getpid();
289 mngr_info.stream_id = fwp_configuration.mngr_stream_id;
291 if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) &&
292 (my_info.stream_id == mngr_info.stream_id)) {
294 FWP_DEBUG("I am FWP manager\n");
295 fwp_participant_mngr = fwp_participant_this;
297 fwp_participant_mngr = fwp_participant_new(&mngr_info);
298 /* Connet to FWP manager */
299 ret = fwp_mngt_connect();