]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTWriter.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / RTPSCSTWriter.c
index b7d6851f4233cf58209c6d75f95bdcdd65d402ac..7cd43a730b7f8c82467f5e38b3d13ab963518dbe 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
  *
  *  DEBUG:  section 51                  CSTWriter
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr@smoliku.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -19,7 +29,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
                        CSTPublications, CSTWriter, GUID_RTPS,
@@ -43,10 +53,13 @@ CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
   cstWriter->guid.oid=oid;
   cstWriter->objectEntryOID=object;
   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
+  cstWriter->registrationCounter=0;
+  ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
   cstWriter->strictReliableCounter=0;
   cstWriter->bestEffortsCounter=0;
   cstWriter->csChangesCounter=0;
   cstWriter->cstRemoteReaderCounter=0;
+  cstWriter->registrationCounter=cstWriter->params.registrationRetries;
   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
   CSTWriterCSChange_init_head(cstWriter);
@@ -56,16 +69,20 @@ CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
   cstWriter->domain=d;
   cstWriter->typeRegister=typeRegister;
   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
-    sem_init(&cstWriter->semCSChangeDestroyed, 0, 0);
+    pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
+    pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
+    cstWriter->condValueCSChangeDestroyed=0;
   }
   //add event for refresh
   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
     CSTWriterRefreshTimer(d,(void*)cstWriter);
   }
+  //add event for registration 
+  if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
+    CSTWriterRegistrationTimer(d,(void*)cstWriter);
+  }
   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
-                cstWriter->guid.hid,
-                cstWriter->guid.aid,
-                cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   debug(51,10) ("CSTWriterInit: finished\n");
 }
 
@@ -78,9 +95,7 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
   debug(51,10) ("CSTWriterDelete: start\n");
   
   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
-                cstWriter->guid.hid,
-                cstWriter->guid.aid,
-                cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   //Destroy all cstRemoteReader connected on cstWriter
   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
@@ -94,38 +109,53 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
       cstWriter->objectEntryOID->objectEntryAID,
       &cstWriter->refreshPeriodTimer,
       0);
+  eventDetach(d,
+      cstWriter->objectEntryOID->objectEntryAID,
+      &cstWriter->registrationTimer,
+      0);
   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
-    sem_destroy(&cstWriter->semCSChangeDestroyed);
+    pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
+    pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
   }
   pthread_rwlock_destroy(&cstWriter->lock);
   debug(51,10) ("CSTWriterDelete: finished\n");
 }
 
 /*****************************************************************************/
-void
-CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
-                         ObjectId oid) {
+CSTRemoteReader *
+CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
+                        ObjectId oid,ObjectEntryOID *sobject) {
   CSTRemoteReader     *cstRemoteReader;
   CSChangeForReader   *csChangeForReader;
   CSChange            *csChange=NULL;
   
   cstWriter->cstRemoteReaderCounter++;
   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
-  cstRemoteReader->guid.hid=object->objectEntryHID->hid;
-  cstRemoteReader->guid.aid=object->objectEntryAID->aid;
+  cstRemoteReader->guid.hid=pobject->guid.hid;
+  cstRemoteReader->guid.aid=pobject->guid.aid;
   cstRemoteReader->guid.oid=oid;
-  cstRemoteReader->objectEntryOID=object;
+  cstRemoteReader->sobject=sobject;
+  cstRemoteReader->pobject=pobject;
   cstRemoteReader->cstWriter=cstWriter;
   CSChangeForReader_init_root_field(cstRemoteReader);
   cstRemoteReader->commStateHB=MAYSENDHB;
   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   cstRemoteReader->HBRetriesCounter=0;
   cstRemoteReader->csChangesCounter=0;
+  cstRemoteReader->commStateToSentCounter=0;
   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
   //insert remote reader 
   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
+  //multicast case
+  if (cstRemoteReader->sobject->multicastPort) {
+    debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
+                 GUID_PRINTF(cstRemoteReader->guid),
+                  GUID_PRINTF(cstRemoteReader->sobject->guid));
+   ObjectEntryMulticast_insert(cstRemoteReader->sobject,
+       cstRemoteReader);
+  }
   //copy all csChanges (not for publication)
   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
@@ -133,14 +163,17 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
       cstRemoteReader->csChangesCounter++;
       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
       csChangeForReader->commStateChFReader=TOSEND;
+      cstRemoteReader->commStateToSentCounter++;
       csChangeForReader->csChange=csChange;
+      csChangeForReader->cstRemoteReader=cstRemoteReader;
       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+      CSChangeParticipant_insert(csChange,csChangeForReader);
       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
       cstRemoteReader->commStateSend=MUSTSENDDATA;
     }
     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           1,   
           "CSTWriterSendTimer",
@@ -151,7 +184,7 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
     }
   } else {
     //Publication
-    ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
+    ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
       cstWriter->strictReliableCounter++;
     else {
@@ -160,9 +193,8 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
     }
   }
   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
-                cstRemoteReader->guid.hid,
-                cstRemoteReader->guid.aid,
-                cstRemoteReader->guid.oid);
+                GUID_PRINTF(cstRemoteReader->guid));
+  return cstRemoteReader;
 }
 
 /*****************************************************************************/
@@ -173,12 +205,10 @@ CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
   if (!cstRemoteReader) return;
   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
-                cstRemoteReader->guid.hid,
-                cstRemoteReader->guid.aid,
-                cstRemoteReader->guid.oid);
+                GUID_PRINTF(cstRemoteReader->guid));
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
     ORTESubsProp *sp;
-    sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+    sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
       cstRemoteReader->cstWriter->strictReliableCounter--;
     else {
@@ -187,25 +217,40 @@ CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
     }
   }  
   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
-    CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+    CSTWriterDestroyCSChangeForReader(
         csChangeForReader,ORTE_TRUE);
   }
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->delayResponceTimer,
       1);   //metatraffic timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->delayResponceTimer,
       2);   //userdata timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       1);   //metatraffic timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       2);   //userdata timer
+  //multicast case
+  if (cstRemoteReader->sobject->multicastPort) {
+    ObjectEntryOID *object;
+
+    object=cstRemoteReader->sobject;
+
+    ObjectEntryMulticast_delete(object,cstRemoteReader);
+    debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
+                 GUID_PRINTF(cstRemoteReader->guid),
+                  GUID_PRINTF(object->guid));
+
+    if (ObjectEntryMulticast_is_empty(object)) {
+      objectEntryDelete(d,object,ORTE_TRUE);
+    }
+  }
   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
   FREE(cstRemoteReader);
 }
@@ -254,7 +299,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   CSChange            *csChangeFSN;
   
   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
-               cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   cstWriter->csChangesCounter++;
   //look for old cschange
   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
@@ -266,7 +311,10 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
   csChange->remoteReaderBest=0;
   csChange->remoteReaderStrict=0;
+  CSChangeParticipant_init_head(csChange);
   CSTWriterCSChange_insert(cstWriter,csChange);
+  debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
+               csChange->sn.low);
   //update FirstSN
   csChangeFSN=CSTWriterCSChange_first(cstWriter);
   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
@@ -278,88 +326,86 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   //insert new cschange for each reader
   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
     //csChangeForReader
+    debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
+                  GUID_PRINTF(cstRemoteReader->guid));
     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
     csChangeForReader->commStateChFReader=TOSEND;
+    cstRemoteReader->commStateToSentCounter++;
     csChangeForReader->csChange=csChange;
+    csChangeForReader->cstRemoteReader=cstRemoteReader;
     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+    CSChangeParticipant_insert(csChange,csChangeForReader);
     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
     cstRemoteReader->csChangesCounter++;
     cstRemoteReader->HBRetriesCounter=0;
-    if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
-      cstRemoteReader->commStateSend=MUSTSENDDATA;
-      if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
+    cstRemoteReader->commStateSend=MUSTSENDDATA;
+    if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
+      eventDetach(d,
+          cstRemoteReader->sobject->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          1);
+      eventAdd(d,
+          cstRemoteReader->sobject->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          1,   
+          "CSTWriterSendTimer",
+          CSTWriterSendTimer,
+          &cstRemoteReader->cstWriter->lock,
+          cstRemoteReader,
+          NULL);
+    } else {
+      ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
+
+      if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
+        //Strict reliable subscription
+        csChange->remoteReaderStrict++;
         eventDetach(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
-            1);
+            2);
         eventAdd(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
-            1,   
-            "CSTWriterSendTimer",
-            CSTWriterSendTimer,
+            2,   
+            "CSTWriterSendStrictTimer",
+            CSTWriterSendStrictTimer,
             &cstRemoteReader->cstWriter->lock,
             cstRemoteReader,
             NULL);
       } else {
-        ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
-        
-        if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
-          //Strict reliable subscription
-          csChange->remoteReaderStrict++;
+        if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
+          //best efforts subscription
+          NtpTime nextIssueTime,nextIssueDelay,actTime;
+
+         actTime=getActualNtpTime();
+          csChange->remoteReaderBest++;
+          NtpTimeAdd(nextIssueTime,
+                    cstRemoteReader->lastSentIssueTime,
+                    sp->minimumSeparation);
+          NtpTimeSub(nextIssueDelay,
+                    nextIssueTime,
+                    actTime);
+          if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
+            NTPTIME_ZERO(nextIssueDelay);
           eventDetach(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
+              cstRemoteReader->sobject->objectEntryAID,
               &cstRemoteReader->delayResponceTimer,
               2);
+          //schedule sent issue 
           eventAdd(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
+              cstRemoteReader->sobject->objectEntryAID,
               &cstRemoteReader->delayResponceTimer,
               2,   
-              "CSTWriterSendStrictTimer",
-              CSTWriterSendStrictTimer,
+              "CSTWriterSendBestEffortTimer",
+              CSTWriterSendBestEffortTimer,
               &cstRemoteReader->cstWriter->lock,
               cstRemoteReader,
-              NULL);
+              &nextIssueDelay);
         } else {
-          if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
-            //best efforts subscription
-            NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
-            
-            csChange->remoteReaderBest++;
-            NtpTimeAdd(nextIssueTime,
-                      cstRemoteReader->lastSentIssueTime,
-                      sp->minimumSeparation);
-            NtpTimeSub(nextIssueDelay,
-                      nextIssueTime,
-                      actTime);
-            if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
-              NTPTIME_ZERO(nextIssueDelay);
-            eventDetach(d,
-                cstRemoteReader->objectEntryOID->objectEntryAID,
-                &cstRemoteReader->delayResponceTimer,
-                2);
-            if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
-              //direct sent issue, for case zero time
-              CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
-            } else {
-              //schedule sent issue (future)
-              eventAdd(d,
-                  cstRemoteReader->objectEntryOID->objectEntryAID,
-                  &cstRemoteReader->delayResponceTimer,
-                  2,   
-                  "CSTWriterSendBestEffortTimer",
-                  CSTWriterSendBestEffortTimer,
-                  &cstRemoteReader->cstWriter->lock,
-                  cstRemoteReader,
-                  &nextIssueDelay);
-            }
-          } else {
-            //!Best_Effort & !Strict_Reliable
-            CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
-              ORTE_TRUE);
-            debug(51,5) ("CSTWriterAddCSChange: destryed\n");
-             
-          }
+          //!Best_Effort & !Strict_Reliable
+          CSTWriterDestroyCSChangeForReader(csChangeForReader,
+            ORTE_TRUE);
+          debug(51,5) ("CSTWriterAddCSChange: destroyed\n");         
         }
       }
     }
@@ -370,18 +416,24 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
 
 /*****************************************************************************/
 void 
-CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
-    CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
+CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
+    Boolean destroyCSChange) {
+  CSTRemoteReader *cstRemoteReader;
   CSChange *csChange;
+  
   if (!csChangeForReader) return;
+  cstRemoteReader=csChangeForReader->cstRemoteReader;
   csChange=csChangeForReader->csChange;
   csChange->remoteReaderCount--;  
   cstRemoteReader->csChangesCounter--;
   if (!cstRemoteReader->csChangesCounter) {
     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   }
+  if (csChangeForReader->commStateChFReader==TOSEND) {
+    cstRemoteReader->commStateToSentCounter--;
+  }
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
-    ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+    ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
         csChange->remoteReaderStrict--;
     } else {
@@ -391,18 +443,23 @@ CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
     }
   }  
   eventDetach(cstRemoteReader->cstWriter->domain,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &csChangeForReader->waitWhileDataUnderwayTimer,
       0);
+  CSChangeParticipant_delete(csChange,csChangeForReader);
   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
   FREE(csChangeForReader);
+
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
     if (!csChange->remoteReaderCount) {
       if (destroyCSChange) {
         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
             cstRemoteReader->cstWriter,csChange);
       }
-      sem_post(&cstRemoteReader->cstWriter->semCSChangeDestroyed);
+      pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
+      cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
+      pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
+      pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
       debug(51,5) ("Publication: new queue level (%d)\n",
                   cstRemoteReader->cstWriter->csChangesCounter);
     }
@@ -417,17 +474,20 @@ CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange)
   CSChange            *csChangeFSN;
 
   if (!csChange) return;
+
   cstWriter->csChangesCounter--;
   CSTWriterCSChange_delete(cstWriter,csChange);
   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
-    CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+    CSTWriterDestroyCSChangeForReader(
         csChangeForReader,ORTE_FALSE);
   }
-  if (csChange->cdrStream.buffer)
-    FREE(csChange->cdrStream.buffer);
+
+  if (csChange->cdrCodec.buffer)
+    FREE(csChange->cdrCodec.buffer);
   parameterDelete(csChange);
   FREE(csChange);
+
   //update first SN
   csChangeFSN=CSTWriterCSChange_first(cstWriter);
   if (csChangeFSN)
@@ -440,7 +500,9 @@ CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange)
 Boolean
 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
   CSChange *csChange;
+
   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
+
     if (!csChange->remoteReaderStrict) {
       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
       return ORTE_TRUE;
@@ -457,18 +519,25 @@ CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
   
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
     timerQueue=2; //userdata timer queue
+
   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+
     //refresh only VAR's
     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
-      csChangeForReader->commStateChFReader=TOSEND;
+      
+      if (csChangeForReader->commStateChFReader!=TOSEND) {
+        csChangeForReader->commStateChFReader=TOSEND;
+        cstRemoteReader->commStateToSentCounter++;
+      }
+
       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
         cstRemoteReader->commStateSend=MUSTSENDDATA;
         eventDetach(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             timerQueue);
         eventAdd(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             timerQueue,  
             "CSTWriterSendTimer",
@@ -480,3 +549,84 @@ CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
     }
   }
 }
+
+/*****************************************************************************/
+int
+CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader) 
+{
+  CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
+
+  //setup new state for csChangeForReader
+  if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
+  cstRemoteReader->commStateToSentCounter--;
+
+  if (!cstRemoteReader->commStateToSentCounter)
+       cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+
+  if (NtpTimeCmp(zNtpTime,
+        cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
+       csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
+  } else {
+    csChangeForReader->commStateChFReader=UNDERWAY;
+    eventDetach(cstRemoteReader->cstWriter->domain,
+               cstRemoteReader->sobject->objectEntryAID,
+               &csChangeForReader->waitWhileDataUnderwayTimer,
+               0);
+    eventAdd(cstRemoteReader->cstWriter->domain,
+            cstRemoteReader->sobject->objectEntryAID,
+             &csChangeForReader->waitWhileDataUnderwayTimer,
+             0,   //common timer
+             "CSChangeForReaderUnderwayTimer",
+             CSChangeForReaderUnderwayTimer,
+             &cstRemoteReader->cstWriter->lock,
+             csChangeForReader,
+             &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
+  }
+  return 0;
+}
+
+/*****************************************************************************/
+void
+CSTWriterMulticast(CSChangeForReader *csChangeForReader) 
+{
+    CSTRemoteReader     *cstRemoteReader;
+    ObjectEntryOID     *objectEntryOID;
+    CSChangeForReader   *csChangeForReader1;
+    char                queue=1;
+    
+    cstRemoteReader=csChangeForReader->cstRemoteReader;
+    objectEntryOID=cstRemoteReader->sobject;
+
+    //multicast can do an application with multicast interface
+    if (!objectEntryOID->multicastPort)
+        return;
+
+    ul_list_for_each(CSChangeParticipant,
+                    csChangeForReader->csChange,
+                    csChangeForReader1) {
+        ObjectEntryOID  *objectEntryOID1;
+        CSTRemoteReader *cstRemoteReader1;
+      
+        cstRemoteReader1=csChangeForReader1->cstRemoteReader;
+        objectEntryOID1=cstRemoteReader1->sobject;
+                  
+        /* are RRs from same GROUP */
+        if (objectEntryOID!=objectEntryOID1)
+          continue;
+
+        /* is the csChange in state TOSEND ? If yes, marks like proc. */
+        CSTWriterCSChangeForReaderNewState(csChangeForReader1);
+
+        /* if there are no messages, detach sending timer */
+        if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
+            !(cstRemoteReader->commStateHB==MAYSENDHB))
+           continue;
+
+        if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) 
+          queue=2;
+        eventDetach(cstRemoteReader->cstWriter->domain,
+                   cstRemoteReader->sobject->objectEntryAID,
+                   &cstRemoteReader->delayResponceTimer,
+                   queue);
+    }
+}