]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTWriter.c
new version 0.2.3
[orte.git] / orte / liborte / RTPSCSTWriter.c
index 5f20ded8f0c2d9d72b3e7c00d735a6c3b4632201..56cacd0bfbb711cda1d3bc0838013f8f92719d71 100644 (file)
@@ -19,7 +19,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
                        CSTPublications, CSTWriter, GUID_RTPS,
@@ -56,7 +56,9 @@ CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
   cstWriter->domain=d;
   cstWriter->typeRegister=typeRegister;
   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+    pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
+    cstWriter->condValueCSChangeDestroyed=0;
   }
   //add event for refresh
   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
@@ -95,6 +97,7 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
       &cstWriter->refreshPeriodTimer,
       0);
   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+    pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
   }
   pthread_rwlock_destroy(&cstWriter->lock);
@@ -253,6 +256,8 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   CSTRemoteReader     *cstRemoteReader;
   CSChange            *csChangeFSN;
   
+  debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
+               cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
   cstWriter->csChangesCounter++;
   //look for old cschange
   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
@@ -262,8 +267,8 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   csChange->sn=cstWriter->lastSN;
   SEQUENCE_NUMBER_NONE(csChange->gapSN);
   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
-  csChange->remoteReaderProcBest=0;
-  csChange->remoteReaderProcStrict=0;
+  csChange->remoteReaderBest=0;
+  csChange->remoteReaderStrict=0;
   CSTWriterCSChange_insert(cstWriter,csChange);
   //update FirstSN
   csChangeFSN=CSTWriterCSChange_first(cstWriter);
@@ -282,6 +287,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
     cstRemoteReader->csChangesCounter++;
+    cstRemoteReader->HBRetriesCounter=0;
     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
       cstRemoteReader->commStateSend=MUSTSENDDATA;
       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
@@ -303,6 +309,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
         
         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
           //Strict reliable subscription
+          csChange->remoteReaderStrict++;
           eventDetach(d,
               cstRemoteReader->objectEntryOID->objectEntryAID,
               &cstRemoteReader->delayResponceTimer,
@@ -320,7 +327,8 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
             //best efforts subscription
             NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
-
+            
+            csChange->remoteReaderBest++;
             NtpTimeAdd(nextIssueTime,
                       cstRemoteReader->lastSentIssueTime,
                       sp->minimumSeparation);
@@ -337,7 +345,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
               //direct sent issue, for case zero time
               CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
             } else {
-              //shedule sent issue (future)
+              //schedule sent issue (future)
               eventAdd(d,
                   cstRemoteReader->objectEntryOID->objectEntryAID,
                   &cstRemoteReader->delayResponceTimer,
@@ -352,12 +360,15 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
             //!Best_Effort & !Strict_Reliable
             CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
               ORTE_TRUE);
+            debug(51,5) ("CSTWriterAddCSChange: destryed\n");
+             
           }
         }
       }
     }
     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
   }
+  debug(51,5) ("CSTWriterAddCSChange: finished\n");
 }
 
 /*****************************************************************************/
@@ -369,6 +380,19 @@ CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
   csChange=csChangeForReader->csChange;
   csChange->remoteReaderCount--;  
   cstRemoteReader->csChangesCounter--;
+  if (!cstRemoteReader->csChangesCounter) {
+    cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+  }
+  if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
+    ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+    if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
+        csChange->remoteReaderStrict--;
+    } else {
+      if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
+        csChange->remoteReaderBest--;
+      }
+    }
+  }  
   eventDetach(cstRemoteReader->cstWriter->domain,
       cstRemoteReader->objectEntryOID->objectEntryAID,
       &csChangeForReader->waitWhileDataUnderwayTimer,
@@ -376,12 +400,14 @@ CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
   FREE(csChangeForReader);
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
-    if (csChange->remoteReaderCount<=
-        (csChange->remoteReaderProcBest+csChange->remoteReaderProcStrict)) {
+    if (!csChange->remoteReaderCount) {
       if (destroyCSChange) {
         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
             cstRemoteReader->cstWriter,csChange);
       }
+      pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
+      cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
+      pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
       debug(51,5) ("Publication: new queue level (%d)\n",
                   cstRemoteReader->cstWriter->csChangesCounter);
@@ -394,6 +420,7 @@ void
 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   CSTRemoteReader     *cstRemoteReader;
   CSChangeForReader   *csChangeForReader;
+  CSChange            *csChangeFSN;
 
   if (!csChange) return;
   cstWriter->csChangesCounter--;
@@ -407,6 +434,12 @@ CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange)
     FREE(csChange->cdrStream.buffer);
   parameterDelete(csChange);
   FREE(csChange);
+  //update first SN
+  csChangeFSN=CSTWriterCSChange_first(cstWriter);
+  if (csChangeFSN)
+    cstWriter->firstSN=csChangeFSN->sn;
+  else
+    cstWriter->firstSN=cstWriter->lastSN;
 }
 
 /*****************************************************************************/
@@ -414,7 +447,7 @@ Boolean
 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
   CSChange *csChange;
   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
-    if (!csChange->remoteReaderProcStrict) {
+    if (!csChange->remoteReaderStrict) {
       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
       return ORTE_TRUE;
     }
@@ -431,22 +464,25 @@ CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
     timerQueue=2; //userdata timer queue
   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
-    csChangeForReader->commStateChFReader=TOSEND;
-    if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
-      cstRemoteReader->commStateSend=MUSTSENDDATA;
-      eventDetach(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
-          &cstRemoteReader->delayResponceTimer,
-          timerQueue);
-      eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
-          &cstRemoteReader->delayResponceTimer,
-          timerQueue,  
-          "CSTWriterSendTimer",
-          CSTWriterSendTimer,
-          &cstRemoteReader->cstWriter->lock,
-          cstRemoteReader,
-          &cstRemoteReader->cstWriter->params.delayResponceTime);               
+    //refresh only VAR's
+    if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
+      csChangeForReader->commStateChFReader=TOSEND;
+      if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
+        cstRemoteReader->commStateSend=MUSTSENDDATA;
+        eventDetach(d,
+            cstRemoteReader->objectEntryOID->objectEntryAID,
+            &cstRemoteReader->delayResponceTimer,
+            timerQueue);
+        eventAdd(d,
+            cstRemoteReader->objectEntryOID->objectEntryAID,
+            &cstRemoteReader->delayResponceTimer,
+            timerQueue,  
+            "CSTWriterSendTimer",
+            CSTWriterSendTimer,
+            &cstRemoteReader->cstWriter->lock,
+            cstRemoteReader,
+            &cstRemoteReader->cstWriter->params.delayResponceTime);               
+      }
     }
   }
 }