6 * Global mngt variables
9 fwp_participant_t *fwp_participant_this;
10 fwp_participant_t *fwp_participant_mngr;
12 /*fwp_endpoint_d_t fwp_mngt_repointd;*/
25 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
26 fwp_participant_t *source, fwp_participant_t *dest)
28 fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
29 fwp_msg_header_in(msgb->data, type, source->id);
31 fwp_send(dest->epointd, msgb->data, msgb->len, 0);
36 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
41 fwp_msgb_reset_data(msgb);
42 size = fwp_recv(fwp_participant_this->epointd, msgb->data,
43 msgb->buffer_size, 0);
44 fwp_msgb_put(msgb, size);
46 fwp_msg_header_out(msgb->data, type, participant_id);
47 fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
49 FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type,
50 participant_id->node_id, participant_id->app_id);
55 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
57 struct fwp_vres_params fwp_service_vparams;
59 /* TODO: Add to contract table */
60 /* create service vres */
61 fwp_service_vparams.ac_id = FWP_AC_BK;
62 fwp_service_vparams.budget = 100;
63 fwp_service_vparams.period_usec = 30;
65 if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
66 fprintf(stderr,"Unable to open service vres\n");
70 FWP_DEBUG("Service vres negotiated\n");
75 /* Launch discovery/connect process to
76 * introduce itself to fwp manager and get description of manager*/
77 int fwp_mngt_connect()
79 fwp_participant_info_t my_info, mngr_info;
80 fwp_participant_id_t participant_id;
82 fwp_msg_type_t msg_type;
83 fwp_endpoint_attr_t attr;
85 fwp_endpoint_attr_init(&attr);
86 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
88 /* Create discovery endpoint */
89 FWP_DEBUG("Service vres created\n");
90 fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
92 FWP_DEBUG("Discovery send endpoint created\n");
93 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
94 fwp_participant_mngr->stream_id,
95 &attr, &fwp_participant_mngr->epointd);
96 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
97 fwp_participant_mngr->vresd);
99 /* prepare hello message */
100 msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
101 sizeof(struct fwp_msg_hello));
102 fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
104 my_info.id = fwp_participant_this->id;
105 my_info.stream_id = fwp_participant_this->stream_id;
107 fwp_msg_hello_in(msgb->tail, &my_info);
108 fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
110 /* Send hello to manager */
111 fwp_mngt_send(FWP_MSG_HELLO, msgb,
112 fwp_participant_this, fwp_participant_mngr);
114 /* receive hello from manager */
115 fwp_mngt_recv(&msg_type, &participant_id, msgb);
116 FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
117 participant_id.node_id, participant_id.app_id);
119 /* Process hello msg from manager */
120 fwp_msg_hello_out(msgb->data, &mngr_info);
121 fwp_participant_mngr->id = mngr_info.id;
122 fwp_participant_mngr->stream_id = mngr_info.stream_id;
123 FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n",
124 mngr_info.id.node_id, mngr_info.id.app_id);
126 /* unbind and delete discovery mngr send endoint */
127 fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
128 /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
130 /* Create mngt send endpoint to manager */
131 fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
132 fwp_participant_mngr->stream_id, &attr,
133 &fwp_participant_mngr->epointd);
134 FWP_DEBUG("Management send endpoint created\n");
135 fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
136 fwp_participant_mngr->vresd);
142 fwp_participant_info_t my_info, mngr_info;
143 unsigned int node_id;
144 fwp_endpoint_attr_t attr;
147 fwp_endpoint_attr_init(&attr);
148 fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
150 /* Create fwp_participant_this */
151 my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
152 if (my_info.id.node_id == inet_addr("127.0.0.1")) {
153 /* if default then check env variable */
154 value = getenv("FWP_MY_ADDR");
156 my_info.id.node_id = inet_addr(value);
159 fwp_configuration.my_node_id = my_info.id.node_id;
160 my_info.id.app_id = getpid();
161 my_info.stream_id = fwp_configuration.my_stream_id;
163 fwp_participant_this = fwp_participant_create(&my_info);
164 fwp_receive_endpoint_create(my_info.stream_id, &attr,
165 &fwp_participant_this->epointd);
167 fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
168 &fwp_participant_this->stream_id,
170 fwp_participant_this->epointd);
172 fwp_endpoint_get_params(fwp_participant_this->epointd,
174 &fwp_participant_this->stream_id,
176 FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
177 fwp_participant_this->id.node_id,
178 fwp_participant_this->stream_id);
180 /* Create fwp_participant_mngr */
182 mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
183 FWP_DEBUG("mngr node=%s node_id=%d\n",
184 fwp_configuration.mngr_addr,
185 mngr_info.id.node_id);
186 if (mngr_info.id.node_id == inet_addr("255.255.255.255")) {
187 /* if default then check env variable */
188 value = getenv("FWP_MNGR_ADDR");
190 mngr_info.id.node_id = inet_addr(value);
193 fwp_configuration.mngr_node_id = mngr_info.id.node_id;
194 mngr_info.id.app_id = getpid();
195 mngr_info.stream_id = fwp_configuration.mngr_stream_id;
197 if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) &&
198 (my_info.stream_id == mngr_info.stream_id)) {
200 FWP_DEBUG("I am FWP manager\n");
201 fwp_participant_mngr = fwp_participant_this;
203 fwp_participant_mngr = fwp_participant_create(&mngr_info);
204 /* Connet to FWP manager */