#include "orte_all.h"
+/*****************************************************************************/
+int
+CSTWriterRegistrationTimer(ORTEDomain *d,void *vcstWriter) {
+ CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
+ CSTRemoteReader *cstRemoteReader;
+
+ debug(52,10) ("CSTWriterRegistrationTimer: start\n");
+
+ debug(52,5) ("CSTWriterRegistrationTimer: OID: 0xx%x - retries = %d\n",
+ cstWriter->guid.oid,cstWriter->registrationCounter);
+ eventDetach(d,
+ cstWriter->objectEntryOID->objectEntryAID,
+ &cstWriter->registrationTimer,
+ 0); //common timer
+
+ if (cstWriter->registrationCounter!=0) {
+ cstWriter->registrationCounter--;
+ gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
+ CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
+ }
+ eventAdd(d,
+ cstWriter->objectEntryOID->objectEntryAID,
+ &cstWriter->registrationTimer,
+ 0, //common timer
+ "CSTWriterRegistrationTimer",
+ CSTWriterRegistrationTimer,
+ &cstWriter->lock,
+ cstWriter,
+ &cstWriter->params.registrationPeriod);
+ } else {
+ if (d->domainEvents.onRegFail) {
+ d->domainEvents.onRegFail(d->domainEvents.onRegFailParam);
+ }
+ }
+
+ debug(52,10) ("CSTWriterRegistrationTimer: finished\n");
+ return 0;
+}
+
+
/*****************************************************************************/
int
CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
// (cstRemoteReader->unacknowledgedCounter))) {
//create HB
- int len=RTPSHeardBeatCreate(
- d->mbSend.cdrStream.bufferPtr,
- getMaxMessageLength(d),
+ int len=RTPSHeartBeatCreate(
+ &d->taskSend.mb.cdrCodec,
&cstRemoteReader->cstWriter->firstSN,
&cstRemoteReader->cstWriter->lastSN,
- cstRemoteReader->cstWriter->guid.oid,
OID_UNKNOWN,
+ cstRemoteReader->cstWriter->guid.oid,
ORTE_FALSE);
if (len<0) {
//not enought space in sending buffer
- d->mbSend.needSend=ORTE_TRUE;
+ d->taskSend.mb.needSend=ORTE_TRUE;
return 1;
}
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
cstRemoteReader->cstWriter->guid.oid,
cstRemoteReader->guid.hid,
cstRemoteReader->guid.aid);
}
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
1);
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
1, //metatraffic timer
"CSTWriterAnnounceTimer",
debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
//create HB
- d->mbSend.cdrStreamDirect=NULL;
- len=RTPSHeardBeatCreate(
- d->mbSend.cdrStream.bufferPtr,
- getMaxMessageLength(d),
+ d->taskSend.mb.cdrCodecDirect=NULL;
+ len=RTPSHeartBeatCreate(
+ &d->taskSend.mb.cdrCodec,
&cstRemoteReader->cstWriter->firstSN,
&cstRemoteReader->cstWriter->lastSN,
- cstRemoteReader->cstWriter->guid.oid,
OID_UNKNOWN,
+ cstRemoteReader->cstWriter->guid.oid,
ORTE_FALSE);
if (len<0) {
//not enought space in sending buffer
- d->mbSend.needSend=ORTE_TRUE;
+ d->taskSend.mb.needSend=ORTE_TRUE;
return 1;
}
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
cstRemoteReader->cstWriter->guid.oid,
cstRemoteReader->guid.hid,
}
cstRemoteReader->HBRetriesCounter++;
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
2);
if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
2, //metatraffic timer
"CSTWriterAnnounceIssueTimer",
//destroy all csChangesForReader
CSChangeForReader *csChangeForReader;
while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+ CSTWriterDestroyCSChangeForReader(
csChangeForReader,ORTE_TRUE);
}
debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
int
CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
- ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+ ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
CSChangeForReader *csChangeForReader=NULL;
debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
- d->mbSend.cdrStreamDirect=NULL;
+ d->taskSend.mb.cdrCodecDirect=NULL;
if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
if (csChangeForReader->commStateChFReader==TOSEND) {
CSChange *csChange=csChangeForReader->csChange;
+
csChangeForReader->commStateChFReader=UNDERWAY;
cstRemoteReader->commStateSend=MUSTSENDDATA;
cstRemoteReader->lastSentIssueTime=getActualNtpTime();
- d->mbSend.cdrStreamDirect=&csChange->cdrStream;
- debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x\n",
- cstRemoteReader->cstWriter->guid.oid,
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid);
+ d->taskSend.mb.cdrCodecDirect=&csChange->cdrCodec;
+
+ if (cstRemoteReader->sobject) {
+ debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x-0x%x\n",
+ cstRemoteReader->cstWriter->guid.oid,
+ GUID_PRINTF(cstRemoteReader->sobject->guid));
+ }
+
ORTESendData(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
ORTE_FALSE);
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+
+ //it's not nessecary to NewState, there is setuped only new state & after is deleted
+ CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+ /* mark multicast messages like processed */
+ CSTWriterMulticast(csChangeForReader);
+
+ CSTWriterDestroyCSChangeForReader(
csChangeForReader,ORTE_TRUE);
+
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
2);
+
//when is no csChange -> break processing
if (cstRemoteReader->cstWriter->csChangesCounter==0)
break;
+
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
2,
"CSTWriterSendBestEffortTimer",
cstRemoteReader,
&sp->minimumSeparation);
return 0;
+
}
}
}
- cstRemoteReader->commStateSend=NOTHNIGTOSEND;
debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
return 0;
}
CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
CSChangeForReader *csChangeForReader=NULL;
- int max_msg_len,len;
+ int len,data_offset,wptr_max;
CSChange *csChange;
Boolean firstTrace=ORTE_TRUE;
debug(52,10) ("CSTWriterSendStrictTimer: start\n");
- max_msg_len=getMaxMessageLength(d);
if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
csChange=csChangeForReader->csChange;
if (csChangeForReader->commStateChFReader==TOSEND) {
cstRemoteReader->commStateSend=MUSTSENDDATA;
+
+ wptr_max=d->taskSend.mb.cdrCodec.wptr_max;
+ d->taskSend.mb.cdrCodec.wptr_max=csChange->cdrCodec.wptr_max;
+ /* infoReply */
if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
- !d->mbSend.containsInfoReply) {
+ !d->taskSend.mb.containsInfoReply) {
+ AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
firstTrace=ORTE_FALSE;
- len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
+ len=RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
IPADDRESS_INVALID,
- ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->userdataUnicastPort);
+ ap->userdataUnicastPort);
if (len<0) {
- d->mbSend.needSend=ORTE_TRUE;
+ d->taskSend.mb.needSend=ORTE_TRUE;
+ d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
return 1;
}
- d->mbSend.containsInfoReply=ORTE_TRUE;
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
- max_msg_len-=len;
+ d->taskSend.mb.containsInfoReply=ORTE_TRUE;
debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
cstRemoteReader->cstWriter->guid.oid,
cstRemoteReader->guid.hid,
cstRemoteReader->guid.aid);
}
- len=20+cstRemoteReader->cstWriter->typeRegister->getMaxSize;
- if (max_msg_len<len) {
- d->mbSend.needSend=ORTE_TRUE;
- return 1;
+
+ data_offset=RTPS_HEADER_LENGTH+12;
+ if (CDR_buffer_puts(&d->taskSend.mb.cdrCodec,
+ csChange->cdrCodec.buffer+data_offset, //src
+ csChange->cdrCodec.wptr-data_offset)==CORBA_FALSE) {
+ d->taskSend.mb.needSend=ORTE_TRUE;
+ d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
+ return 1;
}
- memcpy(d->mbSend.cdrStream.bufferPtr, //dest
-// csChange->cdrStream.bufferPtr-len, //src
- csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12, //src
- len); //length
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
- max_msg_len-=len;
+
+ d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
+
+ /* setup new state for csChangeForReader */
+ CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+ /* mark multicast messages like processed */
+ CSTWriterMulticast(csChangeForReader);
+
debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
cstRemoteReader->cstWriter->guid.oid,
cstRemoteReader->guid.hid,
}
}
}
- cstRemoteReader->commStateSend=NOTHNIGTOSEND;
debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
//add HeardBeat
return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
CSChangeForReader *csChangeForReader=NULL;
- unsigned int max_msg_len;
- int len,off;
Boolean firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
debug(52,10) ("CSTWriterSendTimer: start\n");
- max_msg_len=getMaxMessageLength(d);
- //setup f_bit of object
+
+ /* setup f_bit of object */
if (cstRemoteReader->cstWriter->params.fullAcknowledge)
f_bit=ORTE_FALSE;
+
if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
+
gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+
if (csChangeForReader->commStateChFReader==TOSEND) {
cstRemoteReader->commStateSend=MUSTSENDDATA;
+
+ /* infoReply */
if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
- !d->mbSend.containsInfoReply) {
+ !d->taskSend.mb.containsInfoReply) {
+ AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
firstTrace=ORTE_FALSE;
- len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
- IPADDRESS_INVALID,
- ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->metatrafficUnicastPort);
- if (len<0) {
- d->mbSend.needSend=ORTE_TRUE;
+ if (RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
+ IPADDRESS_INVALID,
+ ap->metatrafficUnicastPort) < 0) {
+ d->taskSend.mb.needSend=ORTE_TRUE;
return 1;
}
- d->mbSend.containsInfoReply=ORTE_TRUE;
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
- max_msg_len-=len;
- debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
- cstRemoteReader->cstWriter->guid.oid,
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid);
- }
- if (max_msg_len<32) {
- d->mbSend.needSend=ORTE_TRUE;
- return 1;
+ d->taskSend.mb.containsInfoReply=ORTE_TRUE;
+ debug(52,3) ("sent: RTPS_InfoREPLY from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+ GUID_PRINTF(cstRemoteReader->guid));
}
- off=0;
- //VAR ???
+
+ /* VAR */
if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
- debug(52,3) ("sent: RTPS_VAR(0x%x) to 0x%x-0x%x\n",
- cstRemoteReader->cstWriter->guid.oid,
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid);
- len=32;
- d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)VAR;
- d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
- if (csChangeForReader->csChange->alive)
- d->mbSend.cdrStream.bufferPtr[1]|=4;
- *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
- *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
- cstRemoteReader->cstWriter->guid.oid;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
- if (csChangeForReader->csChange->guid.oid==OID_APP) {
- d->mbSend.cdrStream.bufferPtr[1]|=8;
- *((HostId*)(d->mbSend.cdrStream.bufferPtr+12))=
- csChangeForReader->csChange->guid.hid;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+12),0);
- *((AppId*)(d->mbSend.cdrStream.bufferPtr+16))=
- csChangeForReader->csChange->guid.aid;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+16),0);
- } else {
- len-=8;
- off=-8;
- }
- *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+20+off))=
- csChangeForReader->csChange->guid.oid;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+20+off),0);
- *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+24+off))=
- csChangeForReader->csChange->sn;
- if (!CSChangeAttributes_is_empty(csChangeForReader->csChange)) {
- int plen;
- plen=parameterCodeStreamFromCSChange(csChangeForReader->csChange,
- d->mbSend.cdrStream.bufferPtr+32+off,max_msg_len-len);
- if (plen<0) {
- d->mbSend.needSend=ORTE_TRUE;
- return 1;
- }
- d->mbSend.cdrStream.bufferPtr[1]|=2;
- len+=plen;
+ debug(52,3) ("sent: RTPS_VAR from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+ GUID_PRINTF(cstRemoteReader->guid));
+
+ if (RTPSVarCreate(&d->taskSend.mb.cdrCodec,
+ OID_UNKNOWN,
+ cstRemoteReader->cstWriter->guid.oid,
+ csChangeForReader->csChange) < 0) {
+ d->taskSend.mb.needSend=ORTE_TRUE;
+ return 1;
}
- } else { //GAP ???
- debug(52,3) ("sent: RTPS_GAP(0x%x) to 0x%x-0x%x\n",
- cstRemoteReader->cstWriter->guid.oid,
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid);
- len=32;
- d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)GAP;
- d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
- *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
- *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
- cstRemoteReader->cstWriter->guid.oid;
- conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
- *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12))=
- csChangeForReader->csChange->sn;
- conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12),ORTE_MY_MBO);
- SeqNumberAdd(*((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20)),
- csChangeForReader->csChange->sn,
- csChangeForReader->csChange->gapSN);
- conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20),ORTE_MY_MBO);
- *((uint32_t*)(d->mbSend.cdrStream.bufferPtr+28))=0; //NumBits
- }
- *((ParameterLength*)(d->mbSend.cdrStream.bufferPtr+2))=len-4;
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
- max_msg_len-=len;
- //setup new state for csChangeForReader
- if (NtpTimeCmp(zNtpTime,
- cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
- csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
+
} else {
- csChangeForReader->commStateChFReader=UNDERWAY;
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &csChangeForReader->waitWhileDataUnderwayTimer,
- 0);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &csChangeForReader->waitWhileDataUnderwayTimer,
- 0, //common timer
- "CSChangeForReaderUnderwayTimer",
- CSChangeForReaderUnderwayTimer,
- &cstRemoteReader->cstWriter->lock,
- csChangeForReader,
- &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
+ /* GAP */
+ debug(52,3) ("sent: RTPS_GAP from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+ GUID_PRINTF(cstRemoteReader->guid));
+
+ if (RTPSGapCreate(&d->taskSend.mb.cdrCodec,
+ OID_UNKNOWN,
+ cstRemoteReader->cstWriter->guid.oid,
+ csChangeForReader->csChange) < 0) {
+ d->taskSend.mb.needSend=ORTE_TRUE;
+ return 1;
+ }
}
+
+ /* setup new state for csChangeForReader */
+ CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+ /* mark multicast messages like processed */
+ CSTWriterMulticast(csChangeForReader);
+
}
- }
- }
- //add HeardBeat
- len=RTPSHeardBeatCreate(
- d->mbSend.cdrStream.bufferPtr,max_msg_len,
- &cstRemoteReader->cstWriter->firstSN,
- &cstRemoteReader->cstWriter->lastSN,
- cstRemoteReader->cstWriter->guid.oid,
- OID_UNKNOWN,
- f_bit);
- if (len<0) {
- d->mbSend.needSend=ORTE_TRUE;
- return 1;
- } else {
- //schedule new time for Announce timer
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->repeatAnnounceTimer,
- 1);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->repeatAnnounceTimer,
- 1, //metatraffic timer
- "CSTWriterAnnounceTimer",
- CSTWriterAnnounceTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
+ } /* gavl_cust_for_each */
+
+ cstRemoteReader->commStateHB=MUSTSENDHB;
+
}
- debug(52,3) ("sent: RTPS_HB(0x%x) to 0x%x-0x%x\n",
- cstRemoteReader->cstWriter->guid.oid,
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid);
+
if (cstRemoteReader->commStateHB==MUSTSENDHB) {
+ //add HeartBeat
+ if (RTPSHeartBeatCreate(
+ &d->taskSend.mb.cdrCodec,
+ &cstRemoteReader->cstWriter->firstSN,
+ &cstRemoteReader->cstWriter->lastSN,
+ OID_UNKNOWN,
+ cstRemoteReader->cstWriter->guid.oid,
+ f_bit)<0) {
+ d->taskSend.mb.needSend=ORTE_TRUE;
+ return 1;
+ } else {
+ //schedule new time for Announce timer
+ eventDetach(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->repeatAnnounceTimer,
+ 1);
+ eventAdd(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->repeatAnnounceTimer,
+ 1, //metatraffic timer
+ "CSTWriterAnnounceTimer",
+ CSTWriterAnnounceTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
+ }
+
+ debug(52,3) ("sent: RTPS_HB from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+ GUID_PRINTF(cstRemoteReader->guid));
+
cstRemoteReader->commStateHB=MAYSENDHB;
}
- cstRemoteReader->commStateSend=NOTHNIGTOSEND;
- d->mbSend.cdrStream.bufferPtr+=len;
- d->mbSend.cdrStream.length+=len;
+
debug(52,10) ("CSTWriterSendTimer: finished\n");
return 0;
}