]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/mngt/fwp_mngt.c
Added support for contract negotiation
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
index db7b1a9fac921c7e1e6571c7234989b186e4fb74..49c85a7766a0584784ab1e9f687f90f2f11c1619 100644 (file)
@@ -1,36 +1,50 @@
+#include "fwp_conf.h"
 #include "fwp_mngt.h"
+#include "fwp_endpoint.h"
 
-/* 
+/** 
  * Global mngt variables
  */
 
+/**< Pointer to participant of this application */
 fwp_participant_t      *fwp_participant_this;
+/**< Pointer to manager participant */
 fwp_participant_t      *fwp_participant_mngr;
 
-/*fwp_endpoint_d_t     fwp_mngt_repointd;*/
+static fwp_contract_t fwp_service_contract = {
+       .budget = 100,
+       .period_usec = 30,
+       .deadline_usec = 1000*1000
+};
+
+static fwp_vres_params_t fwp_service_vres_params = {
+       .id = 0,
+       .ac_id = FWP_AC_BK, 
+       .budget = 100,
+       .period_usec = 30,
+};
 
 /**
- * struct resource {
- *     char name[10];
- *     int id;
- *     participant_my;
- *     participant_mngr;
- *     contract_ops;
- *     fna_ops;
- * }
+ * Send management message to participant
+ *
  */
-
 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
                  fwp_participant_t *source, fwp_participant_t *dest)
 {
+       int ret;
+       
        fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
-       fwp_msg_header_deflate(msgb->data, type, source->id);
+       fwp_msg_header_in(msgb->data, type, msgb->len, source->id);
 
-       fwp_send(dest->epointd, msgb->data, msgb->len);
+       ret = fwp_send(dest->epointd, msgb->data, msgb->len, 0);
 
-       return 0;
+       return ret;
 }
 
+/**
+ * Receives management message from participant
+ *
+ */
 int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
                        fwp_msgb_t *msgb)
 {
@@ -38,10 +52,12 @@ int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
        
        fwp_msgb_reset_data(msgb);
        size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
-                       msgb->buffer_size);
+                       msgb->buffer_size, 0);
+       if (size < 0)
+               return size;
        fwp_msgb_put(msgb, size);
 
-       fwp_msg_header_inflate(msgb->data, type, participant_id);
+       fwp_msg_header_out(msgb->data, type, participant_id);
        fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
        
        FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
@@ -50,36 +66,73 @@ int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
        return 0;
 } 
 
-int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
+int fwp_mngt_service_vres_create(fwp_contract_d_t* contd, fwp_vres_d_t* vresdp)
 {
-       struct fwp_vres_params  fwp_service_vparams;
+       fwp_contract_d_t        contractd;
+       fwp_contract_data_t*    contdata;
+       int ret;
        
-       /* TODO: Add to contract table */
-       /* create service vres */
-       fwp_service_vparams.ac_id = FWP_AC_BK; 
-       fwp_service_vparams.budget = 100;
-       fwp_service_vparams.period_usec = 1000;
+       /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
+                fprintf(stderr,"Unable to open service vres\n");
+                return -1;
+       }*/
        
-       if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
-               fprintf(stderr,"Unable to open service vres\n");
+       contractd = fwp_contract_create(&fwp_service_contract);
+       contdata = contractd;
+       if (!contdata)
                return -1;
+               
+       /* TODO: Consider to call _fwp_contract_commit */
+       contdata->status = FWP_CONT_NEGOTIATED;
+       /* Set parameters of vres 
+        * and activate it if needed */
+       ret = fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
+       if (ret < 0) {
+               int e = errno;
+               fwp_contract_destroy(contractd);
+               errno = e;
+               return ret;
        }
+       *vresdp = contdata->vresd;
        
        FWP_DEBUG("Service vres negotiated\n");
-       
+       *contd = contractd;
        return 0;
 }
 
-/* Launch discovery/connect process to 
- * introduce itself to fwp manager and get description of manager*/
-void fwp_mngt_connect()
+/**
+ * Launches discovery/connect process to 
+ * introduce itself to fwp manager and get description of manager
+ * */
+int fwp_mngt_connect()
 {
        fwp_participant_info_t  my_info, mngr_info;
        fwp_participant_id_t    participant_id;
        fwp_msgb_t              *msgb;
        fwp_msg_type_t          msg_type;
+       fwp_endpoint_attr_t     attr;
+       int ret, e;
+       
+       fwp_endpoint_attr_init(&attr);
+       fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
 
-
+       /* Create discovery endpoint */
+       FWP_DEBUG("Service vres created\n");
+       fwp_mngt_service_vres_create(&fwp_participant_mngr->service_contract,
+                                    &fwp_participant_mngr->vresd);
+       
+       FWP_DEBUG("Discovery send endpoint created\n");
+       ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
+                                      fwp_participant_mngr->stream_id,
+                                      &attr, &fwp_participant_mngr->epointd);
+       if (ret != 0) {
+               e = errno;
+               goto err_vres;
+       }
+       
+       fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
+                               fwp_participant_mngr->vresd);
+       
        /* prepare hello message */
        msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
                              sizeof(struct fwp_msg_hello));
@@ -88,85 +141,165 @@ void fwp_mngt_connect()
        my_info.id = fwp_participant_this->id;
        my_info.stream_id = fwp_participant_this->stream_id;
 
-       fwp_msg_hello_deflate(msgb->tail, &my_info);
+       fwp_msg_hello_in(msgb->tail, &my_info);
        fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
 
        /* Send hello to manager */
-       fwp_mngt_send(FWP_MSG_HELLO, msgb, 
-                       fwp_participant_this, fwp_participant_mngr);
-
+       ret = fwp_mngt_send(FWP_MSG_HELLO, msgb, 
+                           fwp_participant_this, fwp_participant_mngr);
+       if (ret < 0) {
+               e = errno;
+               goto err_ep;
+       }
 
        /* receive hello from manager */
-       fwp_mngt_recv(&msg_type, &participant_id, msgb);
+       alarm(3);               /* Timeout in secconds */
+       ret = fwp_mngt_recv(&msg_type, &participant_id, msgb);
+       alarm(0);
+       if (ret < 0) {
+               if (errno == EINTR) e = ETIMEDOUT;
+               else e = errno;
+               goto err_ep;
+       }
        FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
                        participant_id.node_id, participant_id.app_id);
        
-       
        /* Process hello msg from manager */
-       fwp_msg_hello_inflate(msgb->data, &mngr_info);
+       fwp_msg_hello_out(msgb->data, &mngr_info);
        fwp_participant_mngr->id  = mngr_info.id;
        fwp_participant_mngr->stream_id  = mngr_info.stream_id;
        FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", 
                        mngr_info.id.node_id, mngr_info.id.app_id);
        
-       /* unbind and delete discovery mngr send endoint */
+       /* unbind and delete discovery mngr send endpoint */
        fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
-       /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
+       fwp_endpoint_destroy(fwp_participant_mngr->epointd);
 
        /* Create mngt send endpoint to manager */
-       fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
-                                fwp_participant_mngr->stream_id, 0,
-                                &fwp_participant_mngr->epointd);
+       ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
+                                      fwp_participant_mngr->stream_id, &attr,
+                                      &fwp_participant_mngr->epointd);
+       if (ret != 0) {
+               e = errno;
+               goto err_vres;
+       }
+       
        FWP_DEBUG("Management send endpoint created\n");
        fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
                                fwp_participant_mngr->vresd);
+       return 0;
+err_ep:
+       fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
+       fwp_endpoint_destroy(fwp_participant_mngr->epointd);
+err_vres:
+       fwp_vres_destroy(fwp_participant_mngr->vresd);
+       errno = e;
+       return ret;
 }
+/**
+ * Disconnect from manager
+ *
+ */
+int fwp_mngt_disconnect()
+{
+       fwp_msgb_t *msgb;
+
+       msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
+                               sizeof(struct fwp_msg_contracthdr)); 
+       fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
+       
+       /* Say GoodBye to manager */
+       FWP_DEBUG("Send BYE to manager\n");
+       fwp_mngt_send(FWP_MSG_BYE, msgb, 
+                       fwp_participant_this, fwp_participant_mngr);
+       
+       fwp_send_endpoint_unbind(fwp_participant_this->epointd);
+       fwp_endpoint_destroy(fwp_participant_this->epointd);
+       fwp_vres_destroy(fwp_participant_this->vresd);
+       /* TODO: iterate through contract table and delete contracts */
 
+       return 0;
+}
+
+/* TODO: Add atexti handler to remove all contracts on application
+ * exit/crash. */
+
+/**
+ * FWP Management initialization 
+ * - creates and initializes fwp_participant_this 
+ * - creates and initializes fwp_participant_mngr
+ * - calls fwp_mngt
+ */
 int fwp_mngt_init()
 {
        fwp_participant_info_t  my_info, mngr_info;
        unsigned int node_id;
-       int flags;
+       fwp_endpoint_attr_t attr;
+       char *value;
+       int ret;
        
-       /* Create fwp_participant_this */
-       my_info.id.node_id = inet_addr("127.0.0.1");
+       fwp_endpoint_attr_init(&attr);
+       fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
+
+       /* Create fwp_participant_this */       
+       my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
+       if (my_info.id.node_id == inet_addr(FWP_MY_ADDR_DEFAULT)) {
+               /* if default then check env variable */
+               value = getenv("FWP_MY_ADDR");
+               if (value) {
+                       my_info.id.node_id = inet_addr(value);
+               }       
+       }
+       fwp_configuration.my_node_id = my_info.id.node_id;
        my_info.id.app_id = getpid();
-       my_info.stream_id = 0;
+       my_info.stream_id = fwp_configuration.my_stream_id;
 
-       fwp_participant_this = fwp_participant_create(&my_info);        
-       fwp_receive_endpoint_create(0, 0, &fwp_participant_this->epointd);
+       fwp_participant_this = fwp_participant_new(&my_info);   
+       ret = fwp_receive_endpoint_create(my_info.stream_id, &attr,
+                                         &fwp_participant_this->epointd);
+       if (ret != 0)
+               return ret;
        /* FIXME 
        fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
                                &fwp_participant_this->stream_id,
                                &flags,
                                fwp_participant_this->epointd);
        */
-       fwp_endpoint_get_params(&node_id, 
+       fwp_endpoint_get_params(fwp_participant_this->epointd, 
+                               &node_id, 
                                &fwp_participant_this->stream_id,
-                               &flags,
-                               fwp_participant_this->epointd);
+                               &attr);
        FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
                        fwp_participant_this->id.node_id, 
                        fwp_participant_this->stream_id);
        
        /* Create fwp_participant_mngr */
-       mngr_info.id.node_id = inet_addr("127.0.0.1");
-       /*mngr_info.id.node_id = inet_addr("255.255.255.255");*/
-       mngr_info.id.app_id = getpid();
-       mngr_info.stream_id = FWP_MNGR_STREAM_ID;
-       
-       fwp_participant_mngr = fwp_participant_create(&mngr_info);
        
-       /* Create discovery endpoint */
-       FWP_DEBUG("Service vres created\n");
-       fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
+       mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
+       /* Env. variable always overrides configured settings */
+       value = getenv("FWP_MNGR_ADDR");
+       if (value) {
+               mngr_info.id.node_id = inet_addr(value);
+       }       
+       FWP_DEBUG("mngr node=%s node_id=%d\n", 
+                       fwp_configuration.mngr_addr,
+                       mngr_info.id.node_id);
+       fwp_configuration.mngr_node_id = mngr_info.id.node_id;
+       mngr_info.id.app_id = getpid();
+       mngr_info.stream_id = fwp_configuration.mngr_stream_id;
        
-       FWP_DEBUG("Discovery send endpoint created\n");
-       fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
-                                fwp_participant_mngr->stream_id,
-                                0, &fwp_participant_mngr->epointd);    
-       fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
-                               fwp_participant_mngr->vresd);
+       if ((mngr_info.id.node_id == inet_addr("127.0.0.1")) && 
+               (my_info.stream_id == mngr_info.stream_id)) {
+               /* I am a manager  */
+               FWP_DEBUG("I am FWP manager\n");
+               fwp_participant_mngr = fwp_participant_this;
+       } else {
+               fwp_participant_mngr = fwp_participant_new(&mngr_info);
+               /* Connet to FWP manager */
+               ret = fwp_mngt_connect();
+               if (ret != 0)
+                       return ret;
+       }       
        
        return 0;
 }