#include "orte_all.h"
/**********************************************************************************/
-int32_t
-RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
+int
+RTPSAckCreate(CDR_Codec *cdrCodec,
SequenceNumber *seqNumber,
- ObjectId roid,ObjectId woid,Boolean f_bit) {
- SequenceNumber sn_tmp;
-
- if (max_msg_len<28) return -1;
- rtps_msg[0]=(uint8_t)ACK;
- rtps_msg[1]=ORTE_MY_MBO;
- if (f_bit) rtps_msg[1]|=2;
- *((ParameterLength*)(rtps_msg+2))=24;
- conv_u32(&roid,0);
- *((ObjectId*)(rtps_msg+4))=roid;
- conv_u32(&woid,0);
- *((ObjectId*)(rtps_msg+8))=woid;
+ ObjectId roid,ObjectId woid,Boolean f_bit)
+{
+ SequenceNumber sn_tmp;
+ CDR_Endianness data_endian;
+ CORBA_octet flags;
+
+ if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
+
+ /* submessage id */
+ CDR_put_octet(cdrCodec,ACK);
+
+ /* flags */
+ flags=cdrCodec->data_endian;
+ if (f_bit) flags|=2;
+ CDR_put_octet(cdrCodec,flags);
+
+ /* length */
+ CDR_put_ushort(cdrCodec,24);
+
+ /* next data are sent in big endianing */
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_put_ulong(cdrCodec,roid);
+
+ /* writerObjectId */
+ CDR_put_ulong(cdrCodec,woid);
+
+ cdrCodec->data_endian=data_endian;
+
SeqNumberInc(sn_tmp,*seqNumber);
- *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
- *((uint32_t*)(rtps_msg+20))=32;
- *((uint32_t*)(rtps_msg+24))=0;
+
+ /* SeqNumber */
+ CDR_put_ulong(cdrCodec,sn_tmp.high);
+ CDR_put_ulong(cdrCodec,sn_tmp.low);
+
+ /* bitmap - bits */
+ CDR_put_ulong(cdrCodec,32);
+ CDR_put_ulong(cdrCodec,0);
+
return 28;
}
/**********************************************************************************/
void
-RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
GUID_RTPS readerGUID;
CSTWriter *cstWriter=NULL;
CSTRemoteReader *cstRemoteReader;
CSChangeForReader *csChangeForReader;
StateMachineSend stateMachineSendNew;
- ObjectId roid,woid;
+ ObjectId roid,woid;
SequenceNumber sn;
- char e_bit,f_bit;
char queue=1;
+ CDR_Endianness data_endian;
+ CORBA_octet flags;
+ char f_bit;
+
+ /* restore flag possition in submessage */
+ cdrCodec->rptr-=3;
+
+ /* flags */
+ CDR_get_octet(cdrCodec,&flags);
+ f_bit=flags & 2;
+
+ /* move reading possition to begin of submessage */
+ cdrCodec->rptr+=2;
+
+ /* next data are sent in big endianing */
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_get_ulong(cdrCodec,&roid);
+
+ /* writerObjectId */
+ CDR_get_ulong(cdrCodec,&woid);
+
+ cdrCodec->data_endian=data_endian;
+
+ /* SeqNumber */
+ CDR_get_ulong(cdrCodec,&sn.high);
+ CDR_get_ulong(cdrCodec,&sn.low);
- e_bit=rtps_msg[1] & 0x01;
- f_bit=(rtps_msg[1] & 0x02)>>1;
- roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
- conv_u32(&roid,0);
- woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
- conv_u32(&woid,0);
- sn=*((SequenceNumber*)(rtps_msg+12)); /* Bitmap - SN */
- conv_sn(&sn,e_bit);
readerGUID.hid=mi->sourceHostId;
readerGUID.aid=mi->sourceAppId;
readerGUID.oid=roid;
f_bit ? 'F':'f',
woid,mi->sourceHostId,mi->sourceAppId);
- //Manager
+ /* Manager */
if ((d->guid.aid & 0x03)==MANAGER) {
switch (woid) {
case OID_WRITE_APPSELF:
readerGUID.hid=senderIPAddress;
readerGUID.aid=AID_UNKNOWN;
readerGUID.oid=roid;
+ eventDetach(d,
+ cstWriter->objectEntryOID->objectEntryAID,
+ &cstWriter->registrationTimer,
+ 0); //common timer
break;
case OID_WRITE_MGR:
pthread_rwlock_wrlock(&d->writerManagers.lock);
break;
}
}
- //Application
+
+ /* Application */
if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
switch (roid) {
case OID_READ_APP:
case OID_READ_APPSELF:
pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
cstWriter=&d->writerApplicationSelf;
+ eventDetach(d,
+ cstWriter->objectEntryOID->objectEntryAID,
+ &cstWriter->registrationTimer,
+ 0); //common timer
break;
case OID_READ_PUBL:
pthread_rwlock_wrlock(&d->writerPublications.lock);
break;
}
}
+
if (!cstWriter) {
if ((woid & 0x07) == OID_PUBLICATION)
pthread_rwlock_unlock(&d->publications.lock);
pthread_rwlock_unlock(&d->publications.lock);
return;
}
+
stateMachineSendNew=NOTHNIGTOSEND;
csChangeForReader=CSChangeForReader_first(cstRemoteReader);
while(csChangeForReader) {
csChangeForReader=
CSChangeForReader_next(cstRemoteReader,csChangeForReader);
if ((woid & 0x07) == OID_PUBLICATION) {
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+ CSTWriterDestroyCSChangeForReader(
csChangeForReaderDestroyed,ORTE_TRUE);
}
} else {
CSChangeForReader_next(cstRemoteReader,csChangeForReader);
}
} else { //NACK
- csChangeForReader->commStateChFReader=TOSEND;
+ if (csChangeForReader->commStateChFReader!=TOSEND) {
+ csChangeForReader->commStateChFReader=TOSEND;
+ cstRemoteReader->commStateToSentCounter++;
+ }
stateMachineSendNew=MUSTSENDDATA;
csChangeForReader=
CSChangeForReader_next(cstRemoteReader,csChangeForReader);
}
}
+
if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
(stateMachineSendNew==MUSTSENDDATA)) {
cstRemoteReader->commStateSend=stateMachineSendNew;
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue);
if (queue==1) {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue, //metatraffic timer
"CSTWriterSendTimer",
&cstRemoteReader->cstWriter->params.delayResponceTime);
} else {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue, //userdata timer
"CSTWriterSendStrictTimer",
&cstRemoteReader->cstWriter->params.delayResponceTime);
}
}
+
if (stateMachineSendNew==NOTHNIGTOSEND) {
cstRemoteReader->commStateSend=NOTHNIGTOSEND;
if (queue==1) {
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue);
} else {
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
queue);
}
}
+
if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
cstRemoteReader->commStateHB=MUSTSENDHB;
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue);
if (queue==1) {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue, //metatraffic timer
"CSTWriterSendTimer",
&cstRemoteReader->cstWriter->params.delayResponceTime);
} else {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
queue, //userdata timer
"CSTWriterSendStrictTimer",
&cstRemoteReader->cstWriter->params.delayResponceTime);
}
}
+
pthread_rwlock_unlock(&cstWriter->lock);
if ((woid & 0x07) == OID_PUBLICATION)
pthread_rwlock_unlock(&d->publications.lock);