3 #include "fwp_endpoint.h"
6 * Global mngt variables
9 /**< Pointer to participant of this application*/
10 fwp_participant_t *fwp_participant_this;
11 /**< Pointer to manager participant record*/
12 fwp_participant_t *fwp_participant_mngr;
14 static fwp_contract_t fwp_service_contract = {
19 static fwp_vres_params_t fwp_service_vres_params = {
27 * Send management message to participant
30 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
31 fwp_participant_t *source, fwp_participant_t *dest)
33 fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
34 fwp_msg_header_in(msgb->data, type, source->id);
36 fwp_send(dest->epointd, msgb->data, msgb->len, 0);
42 * Receives management message from participant
45 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
50 fwp_msgb_reset_data(msgb);
51 size = fwp_recv(fwp_participant_this->epointd, msgb->data,
52 msgb->buffer_size, 0);
53 fwp_msgb_put(msgb, size);
55 fwp_msg_header_out(msgb->data, type, participant_id);
56 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
58 FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type,
59 participant_id->node_id, participant_id->app_id);
64 int fwp_mngt_service_vres_create(fwp_vres_d_t* vresdp)
66 fwp_contract_d_t contractd;
67 fwp_contract_data_t* contdata;
69 contractd = fwp_contract_create(&fwp_service_contract);
70 /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
71 fprintf(stderr,"Unable to open service vres\n");
76 /* TODO: Consider to call _fwp_contract_commit */
77 contdata->status = FWP_CONT_NEGOTIATED;
78 /* Set parameters of vres
79 * and activate it if needed */
80 fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
81 *vresdp = contdata->vresd;
83 FWP_DEBUG("Service vres negotiated\n");
88 * Launches discovery/connect process to
89 * introduce itself to fwp manager and get description of manager
91 int fwp_mngt_connect()
93 fwp_participant_info_t my_info, mngr_info;
94 fwp_participant_id_t participant_id;
96 fwp_msg_type_t msg_type;
97 fwp_endpoint_attr_t attr;
99 fwp_endpoint_attr_init(&attr);
100 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
102 /* Create discovery endpoint */
103 FWP_DEBUG("Service vres created\n");
104 fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
106 FWP_DEBUG("Discovery send endpoint created\n");
107 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
108 fwp_participant_mngr->stream_id,
109 &attr, &fwp_participant_mngr->epointd);
110 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
111 fwp_participant_mngr->vresd);
113 /* prepare hello message */
114 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
115 sizeof(struct fwp_msg_hello));
116 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
118 my_info.id = fwp_participant_this->id;
119 my_info.stream_id = fwp_participant_this->stream_id;
121 fwp_msg_hello_in(msgb->tail, &my_info);
122 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
124 /* Send hello to manager */
125 fwp_mngt_send(FWP_MSG_HELLO, msgb,
126 fwp_participant_this, fwp_participant_mngr);
128 /* receive hello from manager */
129 fwp_mngt_recv(&msg_type, &participant_id, msgb);
130 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
131 participant_id.node_id, participant_id.app_id);
133 /* Process hello msg from manager */
134 fwp_msg_hello_out(msgb->data, &mngr_info);
135 fwp_participant_mngr->id = mngr_info.id;
136 fwp_participant_mngr->stream_id = mngr_info.stream_id;
137 FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n",
138 mngr_info.id.node_id, mngr_info.id.app_id);
140 /* unbind and delete discovery mngr send endpoint */
141 fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
142 fwp_endpoint_destroy(fwp_participant_mngr->epointd);
144 /* Create mngt send endpoint to manager */
145 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
146 fwp_participant_mngr->stream_id, &attr,
147 &fwp_participant_mngr->epointd);
148 FWP_DEBUG("Management send endpoint created\n");
149 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
150 fwp_participant_mngr->vresd);
154 int fwp_mngt_disconnect()
158 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
159 sizeof(struct fwp_msg_contracthdr));
160 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
162 /* Say GoodBye to manager */
163 fwp_mngt_send(FWP_MSG_BYE, msgb,
164 fwp_participant_this, fwp_participant_mngr);
166 /* TODO: iterate through contract table and delete contracts */
173 fwp_participant_info_t my_info, mngr_info;
174 unsigned int node_id;
175 fwp_endpoint_attr_t attr;
178 fwp_endpoint_attr_init(&attr);
179 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
181 /* Create fwp_participant_this */
182 my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
183 if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
184 /* if default then check env variable */
185 value = getenv("FWP_MY_ADDR");
187 my_info.id.node_id = inet_addr(value);
190 fwp_configuration.my_node_id = my_info.id.node_id;
191 my_info.id.app_id = getpid();
192 my_info.stream_id = fwp_configuration.my_stream_id;
194 fwp_participant_this = fwp_participant_create(&my_info);
195 fwp_receive_endpoint_create(my_info.stream_id, &attr,
196 &fwp_participant_this->epointd);
198 fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
199 &fwp_participant_this->stream_id,
201 fwp_participant_this->epointd);
203 fwp_endpoint_get_params(fwp_participant_this->epointd,
205 &fwp_participant_this->stream_id,
207 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
208 fwp_participant_this->id.node_id,
209 fwp_participant_this->stream_id);
211 /* Create fwp_participant_mngr */
213 mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
214 FWP_DEBUG("mngr node=%s node_id=%d\n",
215 fwp_configuration.mngr_addr,
216 mngr_info.id.node_id);
217 if (mngr_info.id.node_id == inet_addr(FWP_MNGR_ADDR_DEFAULT)) {
218 /* if default then check env variable */
219 value = getenv("FWP_MNGR_ADDR");
221 mngr_info.id.node_id = inet_addr(value);
224 fwp_configuration.mngr_node_id = mngr_info.id.node_id;
225 mngr_info.id.app_id = getpid();
226 mngr_info.stream_id = fwp_configuration.mngr_stream_id;
228 if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) &&
229 (my_info.stream_id == mngr_info.stream_id)) {
231 FWP_DEBUG("I am FWP manager\n");
232 fwp_participant_mngr = fwp_participant_this;
234 fwp_participant_mngr = fwp_participant_create(&mngr_info);
235 /* Connet to FWP manager */