/**********************************************************************************/
int
-RTPSHeardBeatCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
- SequenceNumber *firstSeqNumber,SequenceNumber *lastSeqNumber,
- ObjectId woid,ObjectId roid,Boolean f_bit) {
- if (max_msg_len<28) return -1;
- rtps_msg[0]=(uint8_t)HEARTBEAT;
- rtps_msg[1]=ORTE_MY_MBO;
- if (f_bit) rtps_msg[1]|=2;
- *((ParameterLength*)(rtps_msg+2))=24;
- conv_u32(&roid,0);
- *((ObjectId*)(rtps_msg+4))=roid;
- conv_u32(&woid,0);
- *((ObjectId*)(rtps_msg+8))=woid;
- *((SequenceNumber*)(rtps_msg+12))=*firstSeqNumber;
- *((SequenceNumber*)(rtps_msg+20))=*lastSeqNumber;
+RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
+ SequenceNumber *fsn,SequenceNumber *lsn,
+ ObjectId roid,ObjectId woid,Boolean f_bit)
+{
+ CDR_Endianness data_endian;
+ CORBA_octet flags;
+
+ if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
+
+ /* submessage id */
+ CDR_put_octet(cdrCodec,HEARTBEAT);
+
+ /* flags */
+ flags=cdrCodec->data_endian;
+ if (f_bit) flags|=2;
+ CDR_put_octet(cdrCodec,flags);
+
+ /* length */
+ CDR_put_ushort(cdrCodec,24);
+
+ /* next data are sent in big endianing */
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_put_ulong(cdrCodec,roid);
+
+ /* writerObjectId */
+ CDR_put_ulong(cdrCodec,woid);
+
+ cdrCodec->data_endian=data_endian;
+
+ /* firstSeqNumber */
+ CDR_put_ulong(cdrCodec,fsn->high);
+ CDR_put_ulong(cdrCodec,fsn->low);
+
+ /* lastSeqNumber */
+ CDR_put_ulong(cdrCodec,lsn->high);
+ CDR_put_ulong(cdrCodec,lsn->low);
+
return 28;
}
/**********************************************************************************/
void
-HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
+HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
CSTRemoteWriter *cstRemoteWriter;
if (!cstReader) return;
cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
if (!cstRemoteWriter) return;
+
cstRemoteWriter->firstSN=*fsn;
cstRemoteWriter->lastSN=*lsn;
cstRemoteWriter->ACKRetriesCounter=0;
+
if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
cstRemoteWriter->sn=*lsn;
if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
SeqNumberDec(cstRemoteWriter->sn,*fsn);
}
}
+
if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
} else {
CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
}
+
if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
char queue=1;
cstRemoteWriter->commStateACK=ACKPENDING;
if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
queue=2;
eventDetach(cstReader->domain,
- cstRemoteWriter->objectEntryOID->objectEntryAID,
+ cstRemoteWriter->spobject->objectEntryAID,
&cstRemoteWriter->repeatActiveQueryTimer,
queue);
eventDetach(cstReader->domain,
- cstRemoteWriter->objectEntryOID->objectEntryAID,
+ cstRemoteWriter->spobject->objectEntryAID,
&cstRemoteWriter->delayResponceTimer,
queue); //metatraffic timer
eventAdd(cstReader->domain,
- cstRemoteWriter->objectEntryOID->objectEntryAID,
+ cstRemoteWriter->spobject->objectEntryAID,
&cstRemoteWriter->delayResponceTimer,
queue, //metatraffic timer
"CSTReaderResponceTimer",
/**********************************************************************************/
void
-RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
+RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
GUID_RTPS writerGUID;
- ObjectId roid,woid;
+ ObjectId roid,woid;
SequenceNumber fsn,lsn;
- char e_bit,f_bit;
CSTReader *cstReader=NULL;
+ CDR_Endianness data_endian;
+ CORBA_octet flags;
+ char f_bit;
+
+ /* restore flag possition in submessage */
+ cdrCodec->rptr-=3;
+
+ /* flags */
+ CDR_get_octet(cdrCodec,&flags);
+ f_bit=flags & 2;
+
+ /* move reading possition to begin of submessage */
+ cdrCodec->rptr+=2;
+
+ /* next data are sent in big endianing */
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_get_ulong(cdrCodec,&roid);
+
+ /* writerObjectId */
+ CDR_get_ulong(cdrCodec,&woid);
+
+ cdrCodec->data_endian=data_endian;
+
+ /* firstSeqNumber */
+ CDR_get_ulong(cdrCodec,&fsn.high);
+ CDR_get_ulong(cdrCodec,&fsn.low);
+
+ /* lastSeqNumber */
+ CDR_get_ulong(cdrCodec,&lsn.high);
+ CDR_get_ulong(cdrCodec,&lsn.low);
- e_bit=rtps_msg[1] & 0x01;
- f_bit=(rtps_msg[1] & 0x02)>>1;
- roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
- conv_u32(&roid,0);
- woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
- conv_u32(&woid,0);
- fsn=*((SequenceNumber*)(rtps_msg+12)); /* firstSeqNumber */
- conv_sn(&fsn,e_bit);
- lsn=*((SequenceNumber*)(rtps_msg+20)); /* lastSeqNumber */
- conv_sn(&lsn,e_bit);
if (SeqNumberCmp(fsn,lsn)==1) return; // lsn<fsn -> break
writerGUID.hid=mi->sourceHostId;
writerGUID.aid=mi->sourceAppId;
cstReader=&d->readerApplications;
}
}
+
if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
switch (writerGUID.oid) {
case OID_WRITE_MGR:
cstReader=&d->readerSubscriptions;
break;
}
+
if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
pthread_rwlock_rdlock(&d->subscriptions.lock);
gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
pthread_rwlock_wrlock(&cstReader->lock);
- HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+ HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
pthread_rwlock_unlock(&cstReader->lock);
}
pthread_rwlock_unlock(&d->subscriptions.lock);
cstReader=NULL;
}
}
- HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+
+ HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
+
if (cstReader)
pthread_rwlock_unlock(&cstReader->lock);
}