+#include "fwp_conf.h"
#include "fwp_mngt.h"
+#include "fwp_endpoint.h"
-/*
+/**
* Global mngt variables
*/
+/**< Pointer to participant of this application */
fwp_participant_t *fwp_participant_this;
+/**< Pointer to manager participant */
fwp_participant_t *fwp_participant_mngr;
-/*fwp_endpoint_d_t fwp_mngt_repointd;*/
+static fwp_contract_t fwp_service_contract = {
+ .budget = 100,
+ .period_usec = 30,
+ .deadline_usec = 1000*1000
+};
+
+static fwp_vres_params_t fwp_service_vres_params = {
+ .id = 0,
+ .ac_id = FWP_AC_BK,
+ .budget = 100,
+ .period_usec = 30,
+};
/**
- * struct resource {
- * char name[10];
- * int id;
- * participant_my;
- * participant_mngr;
- * contract_ops;
- * fna_ops;
- * }
+ * Send management message to participant
+ *
*/
-
int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
fwp_participant_t *source, fwp_participant_t *dest)
{
+ int ret;
+
fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
- fwp_msg_header_deflate(msgb->data, type, source->id);
+ fwp_msg_header_in(msgb->data, type, msgb->len, source->id);
- fwp_send(dest->epointd, msgb->data, msgb->len);
+ ret = fwp_send(dest->epointd, msgb->data, msgb->len, 0);
- return 0;
+ return ret;
}
+/**
+ * Receives management message from participant
+ *
+ */
int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
fwp_msgb_t *msgb)
{
fwp_msgb_reset_data(msgb);
size = fwp_recv(fwp_participant_this->epointd, msgb->data,
- msgb->buffer_size);
+ msgb->buffer_size, 0);
+ if (size < 0)
+ return size;
fwp_msgb_put(msgb, size);
- fwp_msg_header_inflate(msgb->data, type, participant_id);
+ fwp_msg_header_out(msgb->data, type, participant_id);
fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type,
return 0;
}
-int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
+int fwp_mngt_service_vres_create(fwp_contract_d_t* contd, fwp_vres_d_t* vresdp)
{
- struct fwp_vres_params fwp_service_vparams;
+ fwp_contract_d_t contractd;
+ fwp_contract_data_t* contdata;
+ int ret;
- /* TODO: Add to contract table */
- /* create service vres */
- fwp_service_vparams.ac_id = FWP_AC_BK;
- fwp_service_vparams.budget = 100;
- fwp_service_vparams.period_usec = 1000;
+ /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
+ fprintf(stderr,"Unable to open service vres\n");
+ return -1;
+ }*/
- if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
- fprintf(stderr,"Unable to open service vres\n");
+ contractd = fwp_contract_create(&fwp_service_contract);
+ contdata = contractd;
+ if (!contdata)
return -1;
+
+ /* TODO: Consider to call _fwp_contract_commit */
+ contdata->status = FWP_CONT_NEGOTIATED;
+ /* Set parameters of vres
+ * and activate it if needed */
+ ret = fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
+ if (ret < 0) {
+ int e = errno;
+ fwp_contract_destroy(contractd);
+ errno = e;
+ return ret;
}
+ *vresdp = contdata->vresd;
FWP_DEBUG("Service vres negotiated\n");
-
+ *contd = contractd;
return 0;
}
-/* Launch discovery/connect process to
- * introduce itself to fwp manager and get description of manager*/
-void fwp_mngt_connect()
+/**
+ * Launches discovery/connect process to
+ * introduce itself to fwp manager and get description of manager
+ * */
+int fwp_mngt_connect()
{
fwp_participant_info_t my_info, mngr_info;
fwp_participant_id_t participant_id;
fwp_msgb_t *msgb;
fwp_msg_type_t msg_type;
+ fwp_endpoint_attr_t attr;
+ int ret, e;
+
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
-
+ /* Create discovery endpoint */
+ FWP_DEBUG("Service vres created\n");
+ fwp_mngt_service_vres_create(&fwp_participant_mngr->service_contract,
+ &fwp_participant_mngr->vresd);
+
+ FWP_DEBUG("Discovery send endpoint created\n");
+ ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
+ fwp_participant_mngr->stream_id,
+ &attr, &fwp_participant_mngr->epointd);
+ if (ret != 0) {
+ e = errno;
+ goto err_vres;
+ }
+
+ fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
+ fwp_participant_mngr->vresd);
+
/* prepare hello message */
msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
sizeof(struct fwp_msg_hello));
my_info.id = fwp_participant_this->id;
my_info.stream_id = fwp_participant_this->stream_id;
- fwp_msg_hello_deflate(msgb->tail, &my_info);
+ fwp_msg_hello_in(msgb->tail, &my_info);
fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
/* Send hello to manager */
- fwp_mngt_send(FWP_MSG_HELLO, msgb,
- fwp_participant_this, fwp_participant_mngr);
-
+ ret = fwp_mngt_send(FWP_MSG_HELLO, msgb,
+ fwp_participant_this, fwp_participant_mngr);
+ if (ret < 0) {
+ e = errno;
+ goto err_ep;
+ }
/* receive hello from manager */
- fwp_mngt_recv(&msg_type, &participant_id, msgb);
+ alarm(3); /* Timeout in secconds */
+ ret = fwp_mngt_recv(&msg_type, &participant_id, msgb);
+ alarm(0);
+ if (ret < 0) {
+ if (errno == EINTR) e = ETIMEDOUT;
+ else e = errno;
+ goto err_ep;
+ }
FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
participant_id.node_id, participant_id.app_id);
-
/* Process hello msg from manager */
- fwp_msg_hello_inflate(msgb->data, &mngr_info);
+ fwp_msg_hello_out(msgb->data, &mngr_info);
fwp_participant_mngr->id = mngr_info.id;
fwp_participant_mngr->stream_id = mngr_info.stream_id;
FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n",
mngr_info.id.node_id, mngr_info.id.app_id);
- /* unbind and delete discovery mngr send endoint */
+ /* unbind and delete discovery mngr send endpoint */
fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
- /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
+ fwp_endpoint_destroy(fwp_participant_mngr->epointd);
/* Create mngt send endpoint to manager */
- fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
- fwp_participant_mngr->stream_id, 0,
- &fwp_participant_mngr->epointd);
+ ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
+ fwp_participant_mngr->stream_id, &attr,
+ &fwp_participant_mngr->epointd);
+ if (ret != 0) {
+ e = errno;
+ goto err_vres;
+ }
+
FWP_DEBUG("Management send endpoint created\n");
fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
fwp_participant_mngr->vresd);
+ return 0;
+err_ep:
+ fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
+ fwp_endpoint_destroy(fwp_participant_mngr->epointd);
+err_vres:
+ fwp_vres_destroy(fwp_participant_mngr->vresd);
+ errno = e;
+ return ret;
}
+/**
+ * Disconnect from manager
+ *
+ */
+int fwp_mngt_disconnect()
+{
+ fwp_msgb_t *msgb;
+
+ msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
+ sizeof(struct fwp_msg_contracthdr));
+ fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
+
+ /* Say GoodBye to manager */
+ FWP_DEBUG("Send BYE to manager\n");
+ fwp_mngt_send(FWP_MSG_BYE, msgb,
+ fwp_participant_this, fwp_participant_mngr);
+
+ fwp_send_endpoint_unbind(fwp_participant_this->epointd);
+ fwp_endpoint_destroy(fwp_participant_this->epointd);
+ fwp_vres_destroy(fwp_participant_this->vresd);
+ /* TODO: iterate through contract table and delete contracts */
+ return 0;
+}
+
+/* TODO: Add atexti handler to remove all contracts on application
+ * exit/crash. */
+
+/**
+ * FWP Management initialization
+ * - creates and initializes fwp_participant_this
+ * - creates and initializes fwp_participant_mngr
+ * - calls fwp_mngt
+ */
int fwp_mngt_init()
{
fwp_participant_info_t my_info, mngr_info;
unsigned int node_id;
- int flags;
+ fwp_endpoint_attr_t attr;
+ char *value;
+ int ret;
- /* Create fwp_participant_this */
- my_info.id.node_id = inet_addr("127.0.0.1");
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
+
+ /* Create fwp_participant_this */
+ my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
+ if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
+ /* if default then check env variable */
+ value = getenv("FWP_MY_ADDR");
+ if (value) {
+ my_info.id.node_id = inet_addr(value);
+ }
+ }
+ fwp_configuration.my_node_id = my_info.id.node_id;
my_info.id.app_id = getpid();
- my_info.stream_id = 0;
+ my_info.stream_id = fwp_configuration.my_stream_id;
- fwp_participant_this = fwp_participant_create(&my_info);
- fwp_receive_endpoint_create(0, 0, &fwp_participant_this->epointd);
+ fwp_participant_this = fwp_participant_new(&my_info);
+ ret = fwp_receive_endpoint_create(my_info.stream_id, &attr,
+ &fwp_participant_this->epointd);
+ if (ret != 0)
+ return ret;
/* FIXME
fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
&fwp_participant_this->stream_id,
&flags,
fwp_participant_this->epointd);
*/
- fwp_endpoint_get_params(&node_id,
+ fwp_endpoint_get_params(fwp_participant_this->epointd,
+ &node_id,
&fwp_participant_this->stream_id,
- &flags,
- fwp_participant_this->epointd);
+ &attr);
FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
fwp_participant_this->id.node_id,
fwp_participant_this->stream_id);
/* Create fwp_participant_mngr */
- mngr_info.id.node_id = inet_addr("127.0.0.1");
- /*mngr_info.id.node_id = inet_addr("255.255.255.255");*/
- mngr_info.id.app_id = getpid();
- mngr_info.stream_id = FWP_MNGR_STREAM_ID;
-
- fwp_participant_mngr = fwp_participant_create(&mngr_info);
- /* Create discovery endpoint */
- FWP_DEBUG("Service vres created\n");
- fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
+ mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
+ /* Env. variable always overrides configured settings */
+ value = getenv("FWP_MNGR_ADDR");
+ if (value) {
+ mngr_info.id.node_id = inet_addr(value);
+ }
+ FWP_DEBUG("mngr node=%s node_id=%d\n",
+ fwp_configuration.mngr_addr,
+ mngr_info.id.node_id);
+ fwp_configuration.mngr_node_id = mngr_info.id.node_id;
+ mngr_info.id.app_id = getpid();
+ mngr_info.stream_id = fwp_configuration.mngr_stream_id;
- FWP_DEBUG("Discovery send endpoint created\n");
- fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
- fwp_participant_mngr->stream_id,
- 0, &fwp_participant_mngr->epointd);
- fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
- fwp_participant_mngr->vresd);
+ if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) &&
+ (my_info.stream_id == mngr_info.stream_id)) {
+ /* I am a manager */
+ FWP_DEBUG("I am FWP manager\n");
+ fwp_participant_mngr = fwp_participant_this;
+ } else {
+ fwp_participant_mngr = fwp_participant_new(&mngr_info);
+ /* Connet to FWP manager */
+ ret = fwp_mngt_connect();
+ if (ret != 0)
+ return ret;
+ }
return 0;
}