]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSHeardBeat.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSHeardBeat.c
index 6311783ed1334a46d7574cd34589d25065df6e53..0e4c0523550936a747dc004458f8eeb867bc7520 100644 (file)
 
 /**********************************************************************************/
 int 
-RTPSHeardBeatCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
-    SequenceNumber *firstSeqNumber,SequenceNumber *lastSeqNumber,
-    ObjectId woid,ObjectId roid,Boolean f_bit) {
-  if (max_msg_len<28) return -1;
-  rtps_msg[0]=(uint8_t)HEARTBEAT;
-  rtps_msg[1]=ORTE_MY_MBO;
-  if (f_bit) rtps_msg[1]|=2;
-  *((ParameterLength*)(rtps_msg+2))=24;
-  conv_u32(&roid,0);
-  *((ObjectId*)(rtps_msg+4))=roid;
-  conv_u32(&woid,0);
-  *((ObjectId*)(rtps_msg+8))=woid;
-  *((SequenceNumber*)(rtps_msg+12))=*firstSeqNumber;
-  *((SequenceNumber*)(rtps_msg+20))=*lastSeqNumber;
+RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
+    SequenceNumber *fsn,SequenceNumber *lsn,
+    ObjectId roid,ObjectId woid,Boolean f_bit) 
+{
+  CDR_Endianness     data_endian;
+  CORBA_octet       flags;
+
+  if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
+
+  /* submessage id */
+  CDR_put_octet(cdrCodec,HEARTBEAT);
+
+  /* flags */
+  flags=cdrCodec->data_endian;
+  if (f_bit) flags|=2;
+  CDR_put_octet(cdrCodec,flags);
+
+  /* length */
+  CDR_put_ushort(cdrCodec,24);
+
+  /* next data are sent in big endianing */
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_put_ulong(cdrCodec,roid);
+  
+  /* writerObjectId */
+  CDR_put_ulong(cdrCodec,woid);
+
+  cdrCodec->data_endian=data_endian;
+
+  /* firstSeqNumber */
+  CDR_put_ulong(cdrCodec,fsn->high);
+  CDR_put_ulong(cdrCodec,fsn->low);
+
+  /* lastSeqNumber */
+  CDR_put_ulong(cdrCodec,lsn->high);
+  CDR_put_ulong(cdrCodec,lsn->low);
+
   return 28;
 }
 
 /**********************************************************************************/
 void 
-HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
+HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
     SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
   CSTRemoteWriter    *cstRemoteWriter;
   
   if (!cstReader) return;
   cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
   if (!cstRemoteWriter) return;
+
   cstRemoteWriter->firstSN=*fsn;
   cstRemoteWriter->lastSN=*lsn;
   cstRemoteWriter->ACKRetriesCounter=0;
+
   if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
     cstRemoteWriter->sn=*lsn;
   if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
@@ -59,26 +87,28 @@ HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
       SeqNumberDec(cstRemoteWriter->sn,*fsn);
     }
   }
+
   if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
     CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
   } else {
     CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
   }
+
   if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
     char queue=1;
     cstRemoteWriter->commStateACK=ACKPENDING;
     if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
       queue=2;
     eventDetach(cstReader->domain,
-        cstRemoteWriter->objectEntryOID->objectEntryAID,
+        cstRemoteWriter->spobject->objectEntryAID,
         &cstRemoteWriter->repeatActiveQueryTimer,
         queue); 
     eventDetach(cstReader->domain,
-        cstRemoteWriter->objectEntryOID->objectEntryAID,
+        cstRemoteWriter->spobject->objectEntryAID,
         &cstRemoteWriter->delayResponceTimer,
         queue);   //metatraffic timer
     eventAdd(cstReader->domain,
-        cstRemoteWriter->objectEntryOID->objectEntryAID,
+        cstRemoteWriter->spobject->objectEntryAID,
         &cstRemoteWriter->delayResponceTimer,
         queue,    //metatraffic timer
         "CSTReaderResponceTimer",
@@ -91,23 +121,45 @@ HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
 
 /**********************************************************************************/
 void 
-RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
+RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
   GUID_RTPS          writerGUID;
-  ObjectId              roid,woid;
+  ObjectId          roid,woid;
   SequenceNumber     fsn,lsn;
-  char               e_bit,f_bit;
   CSTReader          *cstReader=NULL;
+  CDR_Endianness     data_endian;
+  CORBA_octet       flags;
+  char              f_bit;
+
+  /* restore flag possition in submessage */
+  cdrCodec->rptr-=3;
+
+  /* flags */
+  CDR_get_octet(cdrCodec,&flags);
+  f_bit=flags & 2;
+
+  /* move reading possition to begin of submessage */
+  cdrCodec->rptr+=2;
+
+  /* next data are sent in big endianing */
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_get_ulong(cdrCodec,&roid);
+  
+  /* writerObjectId */
+  CDR_get_ulong(cdrCodec,&woid);
+
+  cdrCodec->data_endian=data_endian;
+
+  /* firstSeqNumber */
+  CDR_get_ulong(cdrCodec,&fsn.high);
+  CDR_get_ulong(cdrCodec,&fsn.low);
+
+  /* lastSeqNumber */
+  CDR_get_ulong(cdrCodec,&lsn.high);
+  CDR_get_ulong(cdrCodec,&lsn.low);
 
-  e_bit=rtps_msg[1] & 0x01;
-  f_bit=(rtps_msg[1] & 0x02)>>1;
-  roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
-  conv_u32(&roid,0);
-  woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
-  conv_u32(&woid,0);
-  fsn=*((SequenceNumber*)(rtps_msg+12));        /* firstSeqNumber */
-  conv_sn(&fsn,e_bit);
-  lsn=*((SequenceNumber*)(rtps_msg+20));        /* lastSeqNumber  */
-  conv_sn(&lsn,e_bit);
   if (SeqNumberCmp(fsn,lsn)==1) return;                // lsn<fsn -> break
   writerGUID.hid=mi->sourceHostId;
   writerGUID.aid=mi->sourceAppId;
@@ -130,6 +182,7 @@ RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
       cstReader=&d->readerApplications;
     }
   }
+
   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
     switch (writerGUID.oid) {
       case OID_WRITE_MGR:
@@ -149,18 +202,21 @@ RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
         cstReader=&d->readerSubscriptions;
         break;
     }
+
     if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
       pthread_rwlock_rdlock(&d->subscriptions.lock);
       gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
         pthread_rwlock_wrlock(&cstReader->lock);
-        HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+        HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
         pthread_rwlock_unlock(&cstReader->lock);    
       }
       pthread_rwlock_unlock(&d->subscriptions.lock);
       cstReader=NULL;
     }
   }  
-  HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+
+  HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+
   if (cstReader)
     pthread_rwlock_unlock(&cstReader->lock);
 }