* $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
*
* DEBUG: section 51 CSTWriter
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr@smoliku.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
*
*/
-#include "orte.h"
+#include "orte_all.h"
GAVL_CUST_NODE_INT_IMP(CSTWriter,
CSTPublications, CSTWriter, GUID_RTPS,
cstWriter->guid.oid=oid;
cstWriter->objectEntryOID=object;
memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
+ cstWriter->registrationCounter=0;
+ ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
cstWriter->strictReliableCounter=0;
cstWriter->bestEffortsCounter=0;
cstWriter->csChangesCounter=0;
cstWriter->cstRemoteReaderCounter=0;
+ cstWriter->registrationCounter=cstWriter->params.registrationRetries;
SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
CSTWriterCSChange_init_head(cstWriter);
cstWriter->domain=d;
cstWriter->typeRegister=typeRegister;
if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
+ cstWriter->condValueCSChangeDestroyed=0;
}
//add event for refresh
if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
CSTWriterRefreshTimer(d,(void*)cstWriter);
}
+ //add event for registration
+ if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
+ CSTWriterRegistrationTimer(d,(void*)cstWriter);
+ }
debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
- cstWriter->guid.hid,
- cstWriter->guid.aid,
- cstWriter->guid.oid);
+ GUID_PRINTF(cstWriter->guid));
debug(51,10) ("CSTWriterInit: finished\n");
}
debug(51,10) ("CSTWriterDelete: start\n");
debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
- cstWriter->guid.hid,
- cstWriter->guid.aid,
- cstWriter->guid.oid);
+ GUID_PRINTF(cstWriter->guid));
//Destroy all cstRemoteReader connected on cstWriter
while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
CSTWriterDestroyRemoteReader(d,cstRemoteReader);
cstWriter->objectEntryOID->objectEntryAID,
&cstWriter->refreshPeriodTimer,
0);
+ eventDetach(d,
+ cstWriter->objectEntryOID->objectEntryAID,
+ &cstWriter->registrationTimer,
+ 0);
if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
}
pthread_rwlock_destroy(&cstWriter->lock);
}
/*****************************************************************************/
-void
-CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
- ObjectId oid) {
+CSTRemoteReader *
+CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
+ ObjectId oid,ObjectEntryOID *sobject) {
CSTRemoteReader *cstRemoteReader;
CSChangeForReader *csChangeForReader;
CSChange *csChange=NULL;
cstWriter->cstRemoteReaderCounter++;
cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
- cstRemoteReader->guid.hid=object->objectEntryHID->hid;
- cstRemoteReader->guid.aid=object->objectEntryAID->aid;
+ cstRemoteReader->guid.hid=pobject->guid.hid;
+ cstRemoteReader->guid.aid=pobject->guid.aid;
cstRemoteReader->guid.oid=oid;
- cstRemoteReader->objectEntryOID=object;
+ cstRemoteReader->sobject=sobject;
+ cstRemoteReader->pobject=pobject;
cstRemoteReader->cstWriter=cstWriter;
CSChangeForReader_init_root_field(cstRemoteReader);
cstRemoteReader->commStateHB=MAYSENDHB;
cstRemoteReader->commStateSend=NOTHNIGTOSEND;
cstRemoteReader->HBRetriesCounter=0;
cstRemoteReader->csChangesCounter=0;
+ cstRemoteReader->commStateToSentCounter=0;
NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
//insert remote reader
CSTRemoteReader_insert(cstWriter,cstRemoteReader);
+ //multicast case
+ if (cstRemoteReader->sobject->multicastPort) {
+ debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->guid),
+ GUID_PRINTF(cstRemoteReader->sobject->guid));
+ ObjectEntryMulticast_insert(cstRemoteReader->sobject,
+ cstRemoteReader);
+ }
//copy all csChanges (not for publication)
if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
cstRemoteReader->csChangesCounter++;
csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
csChangeForReader->commStateChFReader=TOSEND;
+ cstRemoteReader->commStateToSentCounter++;
csChangeForReader->csChange=csChange;
+ csChangeForReader->cstRemoteReader=cstRemoteReader;
ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+ CSChangeParticipant_insert(csChange,csChangeForReader);
CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
cstRemoteReader->commStateSend=MUSTSENDDATA;
}
if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
1,
"CSTWriterSendTimer",
}
} else {
//Publication
- ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
+ ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
cstWriter->strictReliableCounter++;
else {
}
}
debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid,
- cstRemoteReader->guid.oid);
+ GUID_PRINTF(cstRemoteReader->guid));
+ return cstRemoteReader;
}
/*****************************************************************************/
if (!cstRemoteReader) return;
cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
- cstRemoteReader->guid.hid,
- cstRemoteReader->guid.aid,
- cstRemoteReader->guid.oid);
+ GUID_PRINTF(cstRemoteReader->guid));
if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
ORTESubsProp *sp;
- sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+ sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
cstRemoteReader->cstWriter->strictReliableCounter--;
else {
}
}
while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+ CSTWriterDestroyCSChangeForReader(
csChangeForReader,ORTE_TRUE);
}
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
1); //metatraffic timer
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
2); //userdata timer
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
1); //metatraffic timer
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->repeatAnnounceTimer,
2); //userdata timer
+ //multicast case
+ if (cstRemoteReader->sobject->multicastPort) {
+ ObjectEntryOID *object;
+
+ object=cstRemoteReader->sobject;
+
+ ObjectEntryMulticast_delete(object,cstRemoteReader);
+ debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->guid),
+ GUID_PRINTF(object->guid));
+
+ if (ObjectEntryMulticast_is_empty(object)) {
+ objectEntryDelete(d,object,ORTE_TRUE);
+ }
+ }
CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
FREE(cstRemoteReader);
}
CSTRemoteReader *cstRemoteReader;
CSChange *csChangeFSN;
+ debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstWriter->guid));
cstWriter->csChangesCounter++;
//look for old cschange
if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
csChange->sn=cstWriter->lastSN;
SEQUENCE_NUMBER_NONE(csChange->gapSN);
csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
- csChange->remoteReaderProcBest=0;
- csChange->remoteReaderProcStrict=0;
+ csChange->remoteReaderBest=0;
+ csChange->remoteReaderStrict=0;
+ CSChangeParticipant_init_head(csChange);
CSTWriterCSChange_insert(cstWriter,csChange);
+ debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
+ csChange->sn.low);
//update FirstSN
csChangeFSN=CSTWriterCSChange_first(cstWriter);
if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
//insert new cschange for each reader
gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
//csChangeForReader
+ debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
+ GUID_PRINTF(cstRemoteReader->guid));
csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
csChangeForReader->commStateChFReader=TOSEND;
+ cstRemoteReader->commStateToSentCounter++;
csChangeForReader->csChange=csChange;
+ csChangeForReader->cstRemoteReader=cstRemoteReader;
ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+ CSChangeParticipant_insert(csChange,csChangeForReader);
CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
cstRemoteReader->csChangesCounter++;
- if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
- cstRemoteReader->commStateSend=MUSTSENDDATA;
- if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
+ cstRemoteReader->HBRetriesCounter=0;
+ cstRemoteReader->commStateSend=MUSTSENDDATA;
+ if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
+ eventDetach(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ 1);
+ eventAdd(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ 1,
+ "CSTWriterSendTimer",
+ CSTWriterSendTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ NULL);
+ } else {
+ ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
+
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
+ //Strict reliable subscription
+ csChange->remoteReaderStrict++;
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
- 1);
+ 2);
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
- 1,
- "CSTWriterSendTimer",
- CSTWriterSendTimer,
+ 2,
+ "CSTWriterSendStrictTimer",
+ CSTWriterSendStrictTimer,
&cstRemoteReader->cstWriter->lock,
cstRemoteReader,
NULL);
} else {
- ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
-
- if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
- //Strict reliable subscription
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
+ //best efforts subscription
+ NtpTime nextIssueTime,nextIssueDelay,actTime;
+
+ actTime=getActualNtpTime();
+ csChange->remoteReaderBest++;
+ NtpTimeAdd(nextIssueTime,
+ cstRemoteReader->lastSentIssueTime,
+ sp->minimumSeparation);
+ NtpTimeSub(nextIssueDelay,
+ nextIssueTime,
+ actTime);
+ if (NtpTimeCmp(actTime,nextIssueTime)>=0)
+ NTPTIME_ZERO(nextIssueDelay);
eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
2);
+ //schedule sent issue
eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&cstRemoteReader->delayResponceTimer,
2,
- "CSTWriterSendStrictTimer",
- CSTWriterSendStrictTimer,
+ "CSTWriterSendBestEffortTimer",
+ CSTWriterSendBestEffortTimer,
&cstRemoteReader->cstWriter->lock,
cstRemoteReader,
- NULL);
+ &nextIssueDelay);
} else {
- if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
- //best efforts subscription
- NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
-
- NtpTimeAdd(nextIssueTime,
- cstRemoteReader->lastSentIssueTime,
- sp->minimumSeparation);
- NtpTimeSub(nextIssueDelay,
- nextIssueTime,
- actTime);
- if (NtpTimeCmp(actTime,nextIssueTime)>=0)
- NTPTIME_ZERO(nextIssueDelay);
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- 2);
- if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
- //direct sent issue, for case zero time
- CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
- } else {
- //schedule sent issue (future)
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- 2,
- "CSTWriterSendBestEffortTimer",
- CSTWriterSendBestEffortTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &nextIssueDelay);
- }
- } else {
- //!Best_Effort & !Strict_Reliable
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
- ORTE_TRUE);
- }
+ //!Best_Effort & !Strict_Reliable
+ CSTWriterDestroyCSChangeForReader(csChangeForReader,
+ ORTE_TRUE);
+ debug(51,5) ("CSTWriterAddCSChange: destroyed\n");
}
}
}
debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
}
+ debug(51,5) ("CSTWriterAddCSChange: finished\n");
}
/*****************************************************************************/
void
-CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
- CSChangeForReader *csChangeForReader,Boolean destroyCSChange) {
+CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
+ Boolean destroyCSChange) {
+ CSTRemoteReader *cstRemoteReader;
CSChange *csChange;
+
if (!csChangeForReader) return;
+ cstRemoteReader=csChangeForReader->cstRemoteReader;
csChange=csChangeForReader->csChange;
csChange->remoteReaderCount--;
cstRemoteReader->csChangesCounter--;
+ if (!cstRemoteReader->csChangesCounter) {
+ cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+ }
+ if (csChangeForReader->commStateChFReader==TOSEND) {
+ cstRemoteReader->commStateToSentCounter--;
+ }
+ if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+ ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
+ csChange->remoteReaderStrict--;
+ } else {
+ if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
+ csChange->remoteReaderBest--;
+ }
+ }
+ }
eventDetach(cstRemoteReader->cstWriter->domain,
- cstRemoteReader->objectEntryOID->objectEntryAID,
+ cstRemoteReader->sobject->objectEntryAID,
&csChangeForReader->waitWhileDataUnderwayTimer,
0);
+ CSChangeParticipant_delete(csChange,csChangeForReader);
CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
FREE(csChangeForReader);
+
if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
- if (csChange->remoteReaderCount<=
- (csChange->remoteReaderProcBest+csChange->remoteReaderProcStrict)) {
+ if (!csChange->remoteReaderCount) {
if (destroyCSChange) {
CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
cstRemoteReader->cstWriter,csChange);
}
+ pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
+ cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
+ pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
debug(51,5) ("Publication: new queue level (%d)\n",
cstRemoteReader->cstWriter->csChangesCounter);
CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
CSTRemoteReader *cstRemoteReader;
CSChangeForReader *csChangeForReader;
+ CSChange *csChangeFSN;
if (!csChange) return;
+
cstWriter->csChangesCounter--;
CSTWriterCSChange_delete(cstWriter,csChange);
gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
- CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+ CSTWriterDestroyCSChangeForReader(
csChangeForReader,ORTE_FALSE);
}
- if (csChange->cdrStream.buffer)
- FREE(csChange->cdrStream.buffer);
+
+ if (csChange->cdrCodec.buffer)
+ FREE(csChange->cdrCodec.buffer);
parameterDelete(csChange);
FREE(csChange);
+
+ //update first SN
+ csChangeFSN=CSTWriterCSChange_first(cstWriter);
+ if (csChangeFSN)
+ cstWriter->firstSN=csChangeFSN->sn;
+ else
+ cstWriter->firstSN=cstWriter->lastSN;
}
/*****************************************************************************/
Boolean
CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
CSChange *csChange;
+
ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
- if (!csChange->remoteReaderProcStrict) {
+
+ if (!csChange->remoteReaderStrict) {
CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
return ORTE_TRUE;
}
if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
timerQueue=2; //userdata timer queue
+
gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
- csChangeForReader->commStateChFReader=TOSEND;
- if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
- cstRemoteReader->commStateSend=MUSTSENDDATA;
- eventDetach(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- timerQueue);
- eventAdd(d,
- cstRemoteReader->objectEntryOID->objectEntryAID,
- &cstRemoteReader->delayResponceTimer,
- timerQueue,
- "CSTWriterSendTimer",
- CSTWriterSendTimer,
- &cstRemoteReader->cstWriter->lock,
- cstRemoteReader,
- &cstRemoteReader->cstWriter->params.delayResponceTime);
+
+ //refresh only VAR's
+ if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
+
+ if (csChangeForReader->commStateChFReader!=TOSEND) {
+ csChangeForReader->commStateChFReader=TOSEND;
+ cstRemoteReader->commStateToSentCounter++;
+ }
+
+ if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
+ cstRemoteReader->commStateSend=MUSTSENDDATA;
+ eventDetach(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ timerQueue);
+ eventAdd(d,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ timerQueue,
+ "CSTWriterSendTimer",
+ CSTWriterSendTimer,
+ &cstRemoteReader->cstWriter->lock,
+ cstRemoteReader,
+ &cstRemoteReader->cstWriter->params.delayResponceTime);
+ }
}
}
}
+
+/*****************************************************************************/
+int
+CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader)
+{
+ CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
+
+ //setup new state for csChangeForReader
+ if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
+ cstRemoteReader->commStateToSentCounter--;
+
+ if (!cstRemoteReader->commStateToSentCounter)
+ cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+
+ if (NtpTimeCmp(zNtpTime,
+ cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
+ csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
+ } else {
+ csChangeForReader->commStateChFReader=UNDERWAY;
+ eventDetach(cstRemoteReader->cstWriter->domain,
+ cstRemoteReader->sobject->objectEntryAID,
+ &csChangeForReader->waitWhileDataUnderwayTimer,
+ 0);
+ eventAdd(cstRemoteReader->cstWriter->domain,
+ cstRemoteReader->sobject->objectEntryAID,
+ &csChangeForReader->waitWhileDataUnderwayTimer,
+ 0, //common timer
+ "CSChangeForReaderUnderwayTimer",
+ CSChangeForReaderUnderwayTimer,
+ &cstRemoteReader->cstWriter->lock,
+ csChangeForReader,
+ &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
+ }
+ return 0;
+}
+
+/*****************************************************************************/
+void
+CSTWriterMulticast(CSChangeForReader *csChangeForReader)
+{
+ CSTRemoteReader *cstRemoteReader;
+ ObjectEntryOID *objectEntryOID;
+ CSChangeForReader *csChangeForReader1;
+ char queue=1;
+
+ cstRemoteReader=csChangeForReader->cstRemoteReader;
+ objectEntryOID=cstRemoteReader->sobject;
+
+ //multicast can do an application with multicast interface
+ if (!objectEntryOID->multicastPort)
+ return;
+
+ ul_list_for_each(CSChangeParticipant,
+ csChangeForReader->csChange,
+ csChangeForReader1) {
+ ObjectEntryOID *objectEntryOID1;
+ CSTRemoteReader *cstRemoteReader1;
+
+ cstRemoteReader1=csChangeForReader1->cstRemoteReader;
+ objectEntryOID1=cstRemoteReader1->sobject;
+
+ /* are RRs from same GROUP */
+ if (objectEntryOID!=objectEntryOID1)
+ continue;
+
+ /* is the csChange in state TOSEND ? If yes, marks like proc. */
+ CSTWriterCSChangeForReaderNewState(csChangeForReader1);
+
+ /* if there are no messages, detach sending timer */
+ if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
+ !(cstRemoteReader->commStateHB==MAYSENDHB))
+ continue;
+
+ if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
+ queue=2;
+ eventDetach(cstRemoteReader->cstWriter->domain,
+ cstRemoteReader->sobject->objectEntryAID,
+ &cstRemoteReader->delayResponceTimer,
+ queue);
+ }
+}