]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSAck.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSAck.c
index 5422a84f5ed64d83e92698b55fcb1b33cf40212f..868273356cabd3aad3c6f71d2ac0377d0ea6b79e 100644 (file)
 #include "orte_all.h"
 
 /**********************************************************************************/
-int32_t 
-RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
+int
+RTPSAckCreate(CDR_Codec *cdrCodec,
     SequenceNumber *seqNumber,
-    ObjectId roid,ObjectId woid,Boolean f_bit) {
-  SequenceNumber        sn_tmp;                    
-                    
-  if (max_msg_len<28) return -1;
-  rtps_msg[0]=(uint8_t)ACK;
-  rtps_msg[1]=ORTE_MY_MBO;
-  if (f_bit) rtps_msg[1]|=2;
-  *((ParameterLength*)(rtps_msg+2))=24;
-  conv_u32(&roid,0);
-  *((ObjectId*)(rtps_msg+4))=roid;
-  conv_u32(&woid,0);
-  *((ObjectId*)(rtps_msg+8))=woid;
+    ObjectId roid,ObjectId woid,Boolean f_bit) 
+{
+  SequenceNumber     sn_tmp;                    
+  CDR_Endianness     data_endian;
+  CORBA_octet       flags;
+
+  if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
+
+  /* submessage id */
+  CDR_put_octet(cdrCodec,ACK);
+
+  /* flags */
+  flags=cdrCodec->data_endian;
+  if (f_bit) flags|=2;
+  CDR_put_octet(cdrCodec,flags);
+
+  /* length */
+  CDR_put_ushort(cdrCodec,24);
+
+  /* next data are sent in big endianing */
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_put_ulong(cdrCodec,roid);
+  
+  /* writerObjectId */
+  CDR_put_ulong(cdrCodec,woid);
+
+  cdrCodec->data_endian=data_endian;
+
   SeqNumberInc(sn_tmp,*seqNumber);
-  *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
-  *((uint32_t*)(rtps_msg+20))=32;
-  *((uint32_t*)(rtps_msg+24))=0;
+
+  /* SeqNumber */
+  CDR_put_ulong(cdrCodec,sn_tmp.high);
+  CDR_put_ulong(cdrCodec,sn_tmp.low);
+
+  /* bitmap - bits */
+  CDR_put_ulong(cdrCodec,32);
+  CDR_put_ulong(cdrCodec,0);
+
   return 28;
 } 
 
 /**********************************************************************************/
 void 
-RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
   GUID_RTPS          readerGUID;
   CSTWriter          *cstWriter=NULL;
   CSTRemoteReader    *cstRemoteReader;
   CSChangeForReader  *csChangeForReader;
   StateMachineSend   stateMachineSendNew;
-  ObjectId              roid,woid;
+  ObjectId          roid,woid;
   SequenceNumber     sn;   
-  char               e_bit,f_bit;
   char               queue=1;
+  CDR_Endianness     data_endian;
+  CORBA_octet       flags;
+  char              f_bit;
+
+  /* restore flag possition in submessage */
+  cdrCodec->rptr-=3;
+
+  /* flags */
+  CDR_get_octet(cdrCodec,&flags);
+  f_bit=flags & 2;
+
+  /* move reading possition to begin of submessage */
+  cdrCodec->rptr+=2;
+
+  /* next data are sent in big endianing */
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_get_ulong(cdrCodec,&roid);
+  
+  /* writerObjectId */
+  CDR_get_ulong(cdrCodec,&woid);
+
+  cdrCodec->data_endian=data_endian;
+
+  /* SeqNumber */
+  CDR_get_ulong(cdrCodec,&sn.high);
+  CDR_get_ulong(cdrCodec,&sn.low);
 
-  e_bit=rtps_msg[1] & 0x01;
-  f_bit=(rtps_msg[1] & 0x02)>>1;
-  roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
-  conv_u32(&roid,0);
-  woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
-  conv_u32(&woid,0);
-  sn=*((SequenceNumber*)(rtps_msg+12));         /* Bitmap - SN    */
-  conv_sn(&sn,e_bit);
   readerGUID.hid=mi->sourceHostId;
   readerGUID.aid=mi->sourceAppId;
   readerGUID.oid=roid;
@@ -73,7 +118,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
                 f_bit ? 'F':'f',
                 woid,mi->sourceHostId,mi->sourceAppId);
   
-  //Manager
+  /* Manager */
   if ((d->guid.aid & 0x03)==MANAGER) {
     switch (woid) {
       case OID_WRITE_APPSELF:
@@ -82,6 +127,10 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
         readerGUID.hid=senderIPAddress;
         readerGUID.aid=AID_UNKNOWN;
         readerGUID.oid=roid;
+        eventDetach(d,
+           cstWriter->objectEntryOID->objectEntryAID,
+           &cstWriter->registrationTimer,
+           0);   //common timer
         break;
       case OID_WRITE_MGR:
         pthread_rwlock_wrlock(&d->writerManagers.lock);
@@ -93,13 +142,18 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
         break;
     }
   }
-  //Application
+
+  /* Application */
   if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
     switch (roid) {
       case OID_READ_APP:
       case OID_READ_APPSELF:
         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
         cstWriter=&d->writerApplicationSelf;
+        eventDetach(d,
+           cstWriter->objectEntryOID->objectEntryAID,
+           &cstWriter->registrationTimer,
+           0);   //common timer
         break;
       case OID_READ_PUBL:
         pthread_rwlock_wrlock(&d->writerPublications.lock);
@@ -121,6 +175,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
         break;
     }
   }
+
   if (!cstWriter) {
     if ((woid & 0x07) == OID_PUBLICATION) 
       pthread_rwlock_unlock(&d->publications.lock);
@@ -133,6 +188,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
       pthread_rwlock_unlock(&d->publications.lock);
     return;
   }
+
   stateMachineSendNew=NOTHNIGTOSEND;
   csChangeForReader=CSChangeForReader_first(cstRemoteReader);
   while(csChangeForReader) {
@@ -144,7 +200,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
         csChangeForReader=
           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
         if ((woid & 0x07) == OID_PUBLICATION) {
-          CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+          CSTWriterDestroyCSChangeForReader(
             csChangeForReaderDestroyed,ORTE_TRUE);
         }
       } else {
@@ -152,22 +208,26 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
       }
     } else {                                                      //NACK
-      csChangeForReader->commStateChFReader=TOSEND;
+      if (csChangeForReader->commStateChFReader!=TOSEND) {
+        csChangeForReader->commStateChFReader=TOSEND;
+        cstRemoteReader->commStateToSentCounter++;
+      }
       stateMachineSendNew=MUSTSENDDATA;
       csChangeForReader=
         CSChangeForReader_next(cstRemoteReader,csChangeForReader);
     }
   }
+
   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
       (stateMachineSendNew==MUSTSENDDATA)) {
     cstRemoteReader->commStateSend=stateMachineSendNew;
     eventDetach(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
+        cstRemoteReader->sobject->objectEntryAID,
         &cstRemoteReader->delayResponceTimer,
         queue);
     if (queue==1) {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           queue,   //metatraffic timer
           "CSTWriterSendTimer",
@@ -177,7 +237,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
           &cstRemoteReader->cstWriter->params.delayResponceTime);               
     } else {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           queue,   //userdata timer
           "CSTWriterSendStrictTimer",
@@ -187,29 +247,31 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
           &cstRemoteReader->cstWriter->params.delayResponceTime);               
     }
   } 
+
   if (stateMachineSendNew==NOTHNIGTOSEND) {
     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
     if (queue==1) {
       eventDetach(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           queue);
     } else {
       eventDetach(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->repeatAnnounceTimer,
           queue);
     }
   }  
+
   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
     cstRemoteReader->commStateHB=MUSTSENDHB;
     eventDetach(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
+        cstRemoteReader->sobject->objectEntryAID,
         &cstRemoteReader->delayResponceTimer,
         queue);
     if (queue==1) {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           queue,   //metatraffic timer
           "CSTWriterSendTimer",
@@ -219,7 +281,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
           &cstRemoteReader->cstWriter->params.delayResponceTime);               
     } else {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           queue,   //userdata timer
           "CSTWriterSendStrictTimer",
@@ -229,6 +291,7 @@ RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPA
           &cstRemoteReader->cstWriter->params.delayResponceTime);               
     }
   } 
+
   pthread_rwlock_unlock(&cstWriter->lock);
   if ((woid & 0x07) == OID_PUBLICATION) 
     pthread_rwlock_unlock(&d->publications.lock);