/**********************************************************************************/
int32_t
-RTPSAckCreate(u_int8_t *rtps_msg,u_int32_t max_msg_len,
+RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
SequenceNumber *seqNumber,
ObjectId roid,ObjectId woid,Boolean f_bit) {
SequenceNumber sn_tmp;
if (max_msg_len<28) return -1;
- rtps_msg[0]=(u_int8_t)ACK;
+ rtps_msg[0]=(uint8_t)ACK;
rtps_msg[1]=ORTE_MY_MBO;
if (f_bit) rtps_msg[1]|=2;
*((ParameterLength*)(rtps_msg+2))=24;
*((ObjectId*)(rtps_msg+8))=woid;
SeqNumberInc(sn_tmp,*seqNumber);
*((SequenceNumber*)(rtps_msg+12))=sn_tmp;
- *((u_int32_t*)(rtps_msg+20))=32;
- *((u_int32_t*)(rtps_msg+24))=0;
+ *((uint32_t*)(rtps_msg+20))=32;
+ *((uint32_t*)(rtps_msg+24))=0;
return 28;
}
/**********************************************************************************/
void
-RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
GUID_RTPS readerGUID;
CSTWriter *cstWriter=NULL;
CSTRemoteReader *cstRemoteReader;
StateMachineSend stateMachineSendNew;
ObjectId roid,woid;
SequenceNumber sn;
- int8_t e_bit,f_bit;
+ char e_bit,f_bit;
+ char queue=1;
e_bit=rtps_msg[1] & 0x01;
f_bit=(rtps_msg[1] & 0x02)>>1;
pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
cstWriter=&d->writerSubscriptions;
break;
+ default:
+ if ((woid & 0x07) == OID_PUBLICATION) {
+ GUID_RTPS guid=d->guid;
+ guid.oid=woid;
+ pthread_rwlock_rdlock(&d->publications.lock);
+ cstWriter=CSTWriter_find(&d->publications,&guid);
+ pthread_rwlock_wrlock(&cstWriter->lock);
+ queue=2;
+ }
+ break;
}
}
- if (!cstWriter) return;
+ if (!cstWriter) {
+ if ((woid & 0x07) == OID_PUBLICATION)
+ pthread_rwlock_unlock(&d->publications.lock);
+ return;
+ }
cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
if (!cstRemoteReader) {
pthread_rwlock_unlock(&cstWriter->lock);
+ if ((woid & 0x07) == OID_PUBLICATION)
+ pthread_rwlock_unlock(&d->publications.lock);
return;
}
stateMachineSendNew=NOTHNIGTOSEND;
- gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+ csChangeForReader=CSChangeForReader_first(cstRemoteReader);
+ while(csChangeForReader) {
if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0) { //ACK
if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
+ CSChangeForReader *csChangeForReaderDestroyed;
+ csChangeForReaderDestroyed=csChangeForReader;
csChangeForReader->commStateChFReader=ACKNOWLEDGED;
-// csChangeForReader->csChange->acknowledgedCounter++;
- }
+ csChangeForReader=
+ CSChangeForReader_next(cstRemoteReader,csChangeForReader);
+ if ((woid & 0x07) == OID_PUBLICATION) {
+ CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+ csChangeForReaderDestroyed,ORTE_TRUE);
+ }
+ } else {
+ csChangeForReader=
+ CSChangeForReader_next(cstRemoteReader,csChangeForReader);
+ }
} else { //NACK
csChangeForReader->commStateChFReader=TOSEND;
stateMachineSendNew=MUSTSENDDATA;
+ csChangeForReader=
+ CSChangeForReader_next(cstRemoteReader,csChangeForReader);
}
}
if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
eventDetach(d,
cstRemoteReader->objectEntryOID->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
- 1);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- 1, //metatraffic timer
- "CSTWriterSendTimer",
- CSTWriterSendTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &cstRemoteReader->cstWriter->params.delayResponceTime);
+ queue);
+ if (queue==1) {
+ eventAdd(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue, //metatraffic timer
+ "CSTWriterSendTimer",
+ CSTWriterSendTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ } else {
+ eventAdd(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue, //userdata timer
+ "CSTWriterSendStrictTimer",
+ CSTWriterSendStrictTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ }
}
- if ((cstRemoteReader->commStateSend==MUSTSENDDATA) &&
- (stateMachineSendNew==NOTHNIGTOSEND)) {
- cstRemoteReader->commStateSend=stateMachineSendNew;
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- 1);
+ if (stateMachineSendNew==NOTHNIGTOSEND) {
+ cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+ if (queue==1) {
+ eventDetach(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue);
+ } else {
+ eventDetach(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->repeatAnnounceTimer,
+ queue);
+ }
}
if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
+ cstRemoteReader->commStateHB=MUSTSENDHB;
eventDetach(d,
cstRemoteReader->objectEntryOID->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
- 1);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- 1, //metatraffic timer
- "CSTWriterSendTimer",
- CSTWriterSendTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &cstRemoteReader->cstWriter->params.delayResponceTime);
+ queue);
+ if (queue==1) {
+ eventAdd(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue, //metatraffic timer
+ "CSTWriterSendTimer",
+ CSTWriterSendTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ } else {
+ eventAdd(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue, //userdata timer
+ "CSTWriterSendStrictTimer",
+ CSTWriterSendStrictTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ }
}
pthread_rwlock_unlock(&cstWriter->lock);
+ if ((woid & 0x07) == OID_PUBLICATION)
+ pthread_rwlock_unlock(&d->publications.lock);
}