*
*/
-#include "orte.h"
+#include "orte_all.h"
GAVL_CUST_NODE_INT_IMP(CSTWriter,
CSTPublications, CSTWriter, GUID_RTPS,
cstWriter->domain=d;
cstWriter->typeRegister=typeRegister;
if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
+ cstWriter->condValueCSChangeDestroyed=0;
}
//add event for refresh
if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
&cstWriter->refreshPeriodTimer,
0);
if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
}
pthread_rwlock_destroy(&cstWriter->lock);
CSTRemoteReader *cstRemoteReader;
CSChange *csChangeFSN;
+ debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
+ cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
cstWriter->csChangesCounter++;
//look for old cschange
if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
csChange->sn=cstWriter->lastSN;
SEQUENCE_NUMBER_NONE(csChange->gapSN);
csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
- csChange->remoteReaderProcBest=0;
- csChange->remoteReaderProcStrict=0;
+ csChange->remoteReaderBest=0;
+ csChange->remoteReaderStrict=0;
CSTWriterCSChange_insert(cstWriter,csChange);
//update FirstSN
csChangeFSN=CSTWriterCSChange_first(cstWriter);
ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
cstRemoteReader->csChangesCounter++;
+ cstRemoteReader->HBRetriesCounter=0;
if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
cstRemoteReader->commStateSend=MUSTSENDDATA;
if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
//Strict reliable subscription
+ csChange->remoteReaderStrict++;
eventDetach(d,
cstRemoteReader->objectEntryOID->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
//best efforts subscription
NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
-
+
+ csChange->remoteReaderBest++;
NtpTimeAdd(nextIssueTime,
cstRemoteReader->lastSentIssueTime,
sp->minimumSeparation);
//direct sent issue, for case zero time
CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
} else {
- //shedule sent issue (future)
+ //schedule sent issue (future)
eventAdd(d,
cstRemoteReader->objectEntryOID->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
//!Best_Effort & !Strict_Reliable
CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
ORTE_TRUE);
+ debug(51,5) ("CSTWriterAddCSChange: destryed\n");
+
}
}
}
}
debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
}
+ debug(51,5) ("CSTWriterAddCSChange: finished\n");
}
/*****************************************************************************/
csChange=csChangeForReader->csChange;
csChange->remoteReaderCount--;
cstRemoteReader->csChangesCounter--;
+ if (!cstRemoteReader->csChangesCounter) {
+ cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+ }
+ if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
+ csChange->remoteReaderStrict--;
+ } else {
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
+ csChange->remoteReaderBest--;
+ }
+ }
+ }
eventDetach(cstRemoteReader->cstWriter->domain,
cstRemoteReader->objectEntryOID->objectEntryAID,
&csChangeForReader->waitWhileDataUnderwayTimer,
CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
FREE(csChangeForReader);
if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
- if (csChange->remoteReaderCount<=
- (csChange->remoteReaderProcBest+csChange->remoteReaderProcStrict)) {
+ if (!csChange->remoteReaderCount) {
if (destroyCSChange) {
CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
cstRemoteReader->cstWriter,csChange);
}
+ pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
+ cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
+ pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
debug(51,5) ("Publication: new queue level (%d)\n",
cstRemoteReader->cstWriter->csChangesCounter);
CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
CSTRemoteReader *cstRemoteReader;
CSChangeForReader *csChangeForReader;
+ CSChange *csChangeFSN;
if (!csChange) return;
cstWriter->csChangesCounter--;
FREE(csChange->cdrStream.buffer);
parameterDelete(csChange);
FREE(csChange);
+ //update first SN
+ csChangeFSN=CSTWriterCSChange_first(cstWriter);
+ if (csChangeFSN)
+ cstWriter->firstSN=csChangeFSN->sn;
+ else
+ cstWriter->firstSN=cstWriter->lastSN;
}
/*****************************************************************************/
CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
CSChange *csChange;
ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
- if (!csChange->remoteReaderProcStrict) {
+ if (!csChange->remoteReaderStrict) {
CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
return ORTE_TRUE;
}
if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
timerQueue=2; //userdata timer queue
gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
- csChangeForReader->commStateChFReader=TOSEND;
- if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
- cstRemoteReader->commStateSend=MUSTSENDDATA;
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- timerQueue);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- timerQueue,
- "CSTWriterSendTimer",
- CSTWriterSendTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &cstRemoteReader->cstWriter->params.delayResponceTime);
+ //refresh only VAR's
+ if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
+ csChangeForReader->commStateChFReader=TOSEND;
+ if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
+ cstRemoteReader->commStateSend=MUSTSENDDATA;
+ eventDetach(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ timerQueue);
+ eventAdd(d,
+ cstRemoteReader->objectEntryOID->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ timerQueue,
+ "CSTWriterSendTimer",
+ CSTWriterSendTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ }
}
}
}