]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Added functions fwp_mngt_send and fwp_mngt_recv; reworked FWP_MSG_HELLO handling
authorMartin Molnar <molnar@sum.(none)>
Thu, 29 May 2008 16:04:57 +0000 (18:04 +0200)
committerMartin Molnar <molnar@sum.(none)>
Thu, 29 May 2008 16:04:57 +0000 (18:04 +0200)
fwp/lib/core/fwp_msgb.c
fwp/lib/core/fwp_msgb.h
fwp/lib/mngt/fwp_mngt.c
fwp/lib/mngt/fwp_mngt.h
fwp/mngr/fwp_mngr.c

index 3b97f5ea7814bcaa7d07b5d36ca8fec307e45012..4d5a458b75389e31ea6177a22f90f4c71915438d 100644 (file)
@@ -77,11 +77,11 @@ inline void fwp_msgb_reset_data(struct fwp_msgb* msgb)
 
 
 /* reserve is called first then push */
-/*void fwp_msgb_reserve(struct msgb* msgb, unsigned int len)
+inline void fwp_msgb_reserve(fwp_msgb_t *msgb, unsigned int len)
 {
        msgb->data+=len;
        msgb->tail+=len;
-}*/
+}
 
 /*struct fwp_socket* fwp_socket_create(struct sockaddr *_addr, socklen_t _addrlen)
 {
index a64e0617d7538b29efa555d7bd1e799d209c2015..c588472abbfd2bed8c5f17d8a6c117ac929b5c2a 100644 (file)
@@ -36,6 +36,7 @@ inline unsigned char* fwp_msgb_push(struct fwp_msgb* msgb, unsigned int len);
 inline void fwp_msgb_reset_data_pointer(struct fwp_msgb* msgb);
 inline void fwp_msgb_reset_data(struct fwp_msgb* msgb);
 inline unsigned char* fwp_msgb_shift(struct fwp_msgb* msgb, unsigned int len);
+inline void fwp_msgb_reserve(fwp_msgb_t *msgb, unsigned int len);
 
 /*struct fwp_socketaddr* fwp_socket_create(struct sockaddr *_addr, socklen_t _addrlen);
 inline void fwp_socket_set(struct fwp_socketaddr *fwpsock, struct sockaddr *_addr, 
index f3dc14248a230c6db522cb10706dcab2b7149e9b..121b75c0eca992b36cabcc0c91bc9b84930ce4f6 100644 (file)
@@ -8,7 +8,7 @@
 fwp_participant_t      *fwp_participant_this;
 fwp_participant_t      *fwp_participant_mngr;
 
-fwp_endpoint_d_t       fwp_mngt_repointd;
+/*fwp_endpoint_d_t     fwp_mngt_repointd;*/
 
 /* temporarily*/
 
@@ -44,6 +44,36 @@ int fwp_negt_commit(tr_id_t tr_id)
 }
 #endif
 
+int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb,
+                 fwp_participant_t *source, fwp_participant_t *dest)
+{
+       fwp_msgb_push(msgb, sizeof(struct fwp_msg_header));
+       fwp_msg_header_deflate(msgb->data, type, source->id);
+
+       fwp_send(dest->epointd, msgb->data, msgb->len);
+
+       return 0;
+}
+
+int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id,
+                       fwp_msgb_t *msgb)
+{
+       int size;
+       
+       fwp_msgb_reset_data(msgb);
+       size = fwp_recv(fwp_participant_this->epointd, msgb->data, 
+                       msgb->buffer_size);
+       fwp_msgb_put(msgb, size);
+
+       fwp_msg_header_inflate(msgb->data, type, participant_id);
+       fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
+       
+       FWP_DEBUG("Received msg: type=%d  from nodeid=%d appid=%d\n", *type,
+                       participant_id->node_id, participant_id->app_id);
+
+       return 0;
+} 
+
 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
 {
        struct fwp_vres_params  fwp_service_vparams;
@@ -67,11 +97,11 @@ int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd)
 void fwp_mngt_connect()
 {
        fwp_participant_info_t  my_info, mngr_info;
-       fwp_endpoint_d_t        discovery_epointd;
-       fwp_vres_d_t            service_vresd;
-       ssize_t                 size;
+       fwp_participant_id_t    participant_id;
        fwp_msgb_t              *msgb;
+       fwp_msg_type_t          msg_type;
 
+#if 0  
        FWP_DEBUG("Service vres created\n");
        fwp_mngt_service_vres_create(&service_vresd);
        
@@ -81,24 +111,33 @@ void fwp_mngt_connect()
                                 FWP_MNGT_DISCOVERY_STREAM_ID,
                                 0, &discovery_epointd);        
        fwp_send_endpoint_bind(discovery_epointd, service_vresd);
+#endif
 
        /* Launch discovery process */
-       my_info.id = fwp_participant_this->id;
-       my_info.stream_id = fwp_participant_this->stream_id;
-
        /* introduce yourself to resource manager */
        msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + 
                              sizeof(struct fwp_msg_hello));
+       fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
+#if 0
        fwp_msg_header_deflate(msgb->tail, FWP_MSG_HELLO, 
                                fwp_participant_this->id);
        fwp_msgb_put(msgb, sizeof(struct fwp_msg_header));
+#endif
+       my_info.id = fwp_participant_this->id;
+       my_info.stream_id = fwp_participant_this->stream_id;
+
        fwp_msg_hello_deflate(msgb->tail, &my_info);
        fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
 
-       fwp_send(discovery_epointd, msgb->data, msgb->len);
-       FWP_DEBUG("Sent HELLO msg \n");
+       /*fwp_send(discovery_epointd, msgb->data, msgb->len);
+       FWP_DEBUG("Sent HELLO msg \n");*/
+       fwp_mngt_send(FWP_MSG_HELLO, msgb, 
+                       fwp_participant_this, fwp_participant_mngr);
+
 
        /* receive info from manager */
+       
+#if 0  
        fwp_msgb_reset_data(msgb);
        size = fwp_recv(fwp_mngt_repointd, msgb->data, msgb->buffer_size);
        FWP_DEBUG("Received HELLO msg from mngr\n");
@@ -108,45 +147,68 @@ void fwp_mngt_connect()
         *fwp_msg_header_inflate(msgb->data, &msg_header);
         */
        fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
+#endif 
+       fwp_mngt_recv(&msg_type, &participant_id, msgb);
        fwp_msg_hello_inflate(msgb->data, &mngr_info);
        
        /* create fwp manager participant record */
-       fwp_participant_mngr = fwp_participant_create(&mngr_info);
+       //fwp_participant_mngr = fwp_participant_create(&mngr_info);
        FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
                        mngr_info.id.node_id, mngr_info.id.app_id);
        
-       fwp_send_endpoint_unbind(discovery_epointd);
-       fwp_participant_mngr->vresd  = service_vresd;
+       fwp_send_endpoint_unbind(fwp_participant_mngr->epointd);
+       /*fwp_endpoint_free(fwp_participant_mngr->epointd)*/
+       //fwp_participant_mngr->vresd  = service_vresd;
+       fwp_participant_mngr->id  = mngr_info.id;
+       fwp_participant_mngr->stream_id  = mngr_info.stream_id;
 
        FWP_DEBUG("Management send endpoint created\n");
        fwp_send_endpoint_create(fwp_participant_mngr->id.node_id, 
                                 fwp_participant_mngr->stream_id, 0,
                                 &fwp_participant_mngr->epointd);
-       fwp_send_endpoint_bind(fwp_participant_mngr->epointd, service_vresd);
+       fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
+                               fwp_participant_mngr->vresd);
 }
 
 int fwp_mngt_init()
 {
-       fwp_participant_info_t  my_info;
+       fwp_participant_info_t  my_info, mngr_info;
        int flags;
        
+       /* Create fwp_participant_this */
        my_info.id.node_id = inet_addr("127.0.0.1");
        my_info.id.app_id = getpid();
-       /* temporarily*/
        my_info.stream_id = 0;
 
        fwp_participant_this = fwp_participant_create(&my_info);        
-       
-       FWP_DEBUG("Management receive endpoint created\n");
-       fwp_receive_endpoint_create(0, 0, &fwp_mngt_repointd);
+       fwp_receive_endpoint_create(0, 0, &fwp_participant_this->epointd);
        fwp_endpoint_get_params(&(fwp_participant_this->id.node_id), 
                                &fwp_participant_this->stream_id,
                                &flags,
-                               fwp_mngt_repointd);
-
-       FWP_DEBUG("mngt_receive conf: nodeid=%d streamid=%d\n", 
-                       fwp_participant_this->id.node_id, 
+                               fwp_participant_this->epointd);
+       FWP_DEBUG("Management receive endpoint created, stream id= %d\n",
                        fwp_participant_this->stream_id);
        
+       
+       /* Create fwp_participant_mngr */
+
+       mngr_info.id.node_id = inet_addr("127.0.0.1");
+       /*mngr_info.id.node_id = inet_addr("255.255.255.255");*/
+       mngr_info.id.app_id = getpid();
+       mngr_info.stream_id = FWP_MNGR_STREAM_ID;
+       
+       fwp_participant_mngr = fwp_participant_create(&mngr_info);
+       
+       /* Create discovery endpoint */
+       FWP_DEBUG("Service vres created\n");
+       fwp_mngt_service_vres_create(&fwp_participant_mngr->vresd);
+       
+       FWP_DEBUG("Discovery send endpoint created\n");
+       fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
+                                fwp_participant_mngr->stream_id,
+                                0, &fwp_participant_mngr->epointd);    
+       fwp_send_endpoint_bind(fwp_participant_mngr->epointd, 
+                               fwp_participant_mngr->vresd);
+       
        return 0;
 }
index 03ab3488a03d923331fb9f34dbed1b2ba1359f35..60905344299d00057e006b260913f0a6374f654b 100644 (file)
@@ -12,7 +12,6 @@
 
 extern fwp_participant_t       *fwp_participant_this;
 extern fwp_participant_t       *fwp_participant_mngr;
-extern fwp_endpoint_d_t        fwp_mngt_repointd;
 
 int fwp_mngt_init();
 int fwp_mngt_service_vres_create(fwp_vres_d_t* fwp_service_vresd);
index bf0bd6bc2c7bbed08d6a69d84f789cd1cd58da91..8640f5ab29ac187a1629d08bcd4e96651c0bd121 100644 (file)
@@ -27,7 +27,7 @@ int fwp_mngr_input(struct fwp_msgb **pmsgb)
        ssize_t size;
 
        FWP_DEBUG("Waiting for messages\n");
-       size = fwp_recv(fwp_mngt_repointd, buffer, BUFFSIZE);
+       size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
         
        FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
        /* For future: fwp_socket could be allocated behind data in msgb*/
@@ -135,7 +135,8 @@ int main()
         * with specified parameters */
        
        FWP_DEBUG("Management receive endpoint created\n");
-       fwp_receive_endpoint_create(FWP_MNGR_STREAM_ID, 0, &fwp_mngt_repointd);
+       fwp_receive_endpoint_create(FWP_MNGR_STREAM_ID, 0, 
+                                       &fwp_participant_this->epointd);
                        
        fwp_mngr_main_loop();