]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/mngt/fwp_mngt.c
Added support for contract negotiation
[frescor/fwp.git] / fwp / lib / mngt / fwp_mngt.c
index 43f9ecadc5b5023fe2fe4eadb3d40db45e06a75e..49c85a7766a0584784ab1e9f687f90f2f11c1619 100644 (file)
@@ -31,12 +31,14 @@ static fwp_vres_params_t fwp_service_vres_params = {
 int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
                  fwp_participant_t *source, fwp_participant_t *dest)
 {
+       int ret;
+       
        fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
        fwp_msg_header_in(msgb->data, type, msgb->len, source->id);
 
-       fwp_send(dest->epointd, msgb->data, msgb->len, 0);
+       ret = fwp_send(dest->epointd, msgb->data, msgb->len, 0);
 
-       return 0;
+       return ret;
 }
 
 /**
@@ -51,6 +53,8 @@ int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
        fwp_msgb_reset_data(msgb);
        size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
                        msgb->buffer_size, 0);
+       if (size < 0)
+               return size;
        fwp_msgb_put(msgb, size);
 
        fwp_msg_header_out(msgb->data, type, participant_id);
@@ -62,10 +66,11 @@ int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
        return 0;
 } 
 
-int fwp_mngt_service_vres_create(fwp_vres_d_t* vresdp)
+int fwp_mngt_service_vres_create(fwp_contract_d_t* contd, fwp_vres_d_t* vresdp)
 {
        fwp_contract_d_t        contractd;
        fwp_contract_data_t*    contdata;
+       int ret;
        
        /*if ((fwp_vres_create(&fwp_service_vres_params, vresdp) < 0)) {
                 fprintf(stderr,"Unable to open service vres\n");
@@ -74,15 +79,24 @@ int fwp_mngt_service_vres_create(fwp_vres_d_t* vresdp)
        
        contractd = fwp_contract_create(&fwp_service_contract);
        contdata = contractd;
+       if (!contdata)
+               return -1;
                
        /* TODO: Consider to call _fwp_contract_commit */
        contdata->status = FWP_CONT_NEGOTIATED;
        /* Set parameters of vres 
         * and activate it if needed */
-       fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
+       ret = fwp_vres_set_params(contdata->vresd, &fwp_service_vres_params);
+       if (ret < 0) {
+               int e = errno;
+               fwp_contract_destroy(contractd);
+               errno = e;
+               return ret;
+       }
        *vresdp = contdata->vresd;
        
        FWP_DEBUG("Service vres negotiated\n");
+       *contd = contractd;
        return 0;
 }
 
@@ -97,21 +111,24 @@ int fwp_mngt_connect()
        fwp_msgb_t              *msgb;
        fwp_msg_type_t          msg_type;
        fwp_endpoint_attr_t     attr;
-       int ret;
+       int ret, e;
        
        fwp_endpoint_attr_init(&attr);
        fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
 
        /* Create discovery endpoint */
        FWP_DEBUG("Service vres created\n");
-       fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
+       fwp_mngt_service_vres_create(&fwp_participant_mngr->service_contract,
+                                    &fwp_participant_mngr->vresd);
        
        FWP_DEBUG("Discovery send endpoint created\n");
        ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
                                       fwp_participant_mngr->stream_id,
                                       &attr, &fwp_participant_mngr->epointd);
-       if (ret != 0)
+       if (ret != 0) {
+               e = errno;
                goto err_vres;
+       }
        
        fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
                                fwp_participant_mngr->vresd);
@@ -128,11 +145,22 @@ int fwp_mngt_connect()
        fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
 
        /* Send hello to manager */
-       fwp_mngt_send(FWP_MSG_HELLO, msgb, 
-                       fwp_participant_this, fwp_participant_mngr);
+       ret = fwp_mngt_send(FWP_MSG_HELLO, msgb, 
+                           fwp_participant_this, fwp_participant_mngr);
+       if (ret < 0) {
+               e = errno;
+               goto err_ep;
+       }
 
        /* receive hello from manager */
-       fwp_mngt_recv(&msg_type, &participant_id, msgb);
+       alarm(3);               /* Timeout in secconds */
+       ret = fwp_mngt_recv(&msg_type, &participant_id, msgb);
+       alarm(0);
+       if (ret < 0) {
+               if (errno == EINTR) e = ETIMEDOUT;
+               else e = errno;
+               goto err_ep;
+       }
        FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
                        participant_id.node_id, participant_id.app_id);
        
@@ -151,16 +179,21 @@ int fwp_mngt_connect()
        ret = fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
                                       fwp_participant_mngr->stream_id, &attr,
                                       &fwp_participant_mngr->epointd);
-       if (ret != 0)
+       if (ret != 0) {
+               e = errno;
                goto err_vres;
+       }
        
        FWP_DEBUG("Management send endpoint created\n");
        fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
                                fwp_participant_mngr->vresd);
        return 0;
+err_ep:
+       fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
+       fwp_endpoint_destroy(fwp_participant_mngr->epointd);
 err_vres:
        fwp_vres_destroy(fwp_participant_mngr->vresd);
-       
+       errno = e;
        return ret;
 }
 /**
@@ -176,6 +209,7 @@ int fwp_mngt_disconnect()
        fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
        
        /* Say GoodBye to manager */
+       FWP_DEBUG("Send BYE to manager\n");
        fwp_mngt_send(FWP_MSG_BYE, msgb, 
                        fwp_participant_this, fwp_participant_mngr);
        
@@ -242,16 +276,14 @@ int fwp_mngt_init()
        /* Create fwp_participant_mngr */
        
        mngr_info.id.node_id = inet_addr(fwp_configuration.mngr_addr);
+       /* Env. variable always overrides configured settings */
+       value = getenv("FWP_MNGR_ADDR");
+       if (value) {
+               mngr_info.id.node_id = inet_addr(value);
+       }       
        FWP_DEBUG("mngr node=%s node_id=%d\n", 
                        fwp_configuration.mngr_addr,
                        mngr_info.id.node_id);
-       if (mngr_info.id.node_id == inet_addr(FWP_MNGR_ADDR_DEFAULT)) {
-               /* if default then check env variable */
-               value = getenv("FWP_MNGR_ADDR");
-               if (value) {
-                       mngr_info.id.node_id = inet_addr(value);
-               }       
-       }
        fwp_configuration.mngr_node_id = mngr_info.id.node_id;
        mngr_info.id.app_id = getpid();
        mngr_info.stream_id = fwp_configuration.mngr_stream_id;