]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTWriter.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSCSTWriter.c
index eaa1d971910641ee016056117cd20fe899efc75b..16f189bda243156a2e8f8b54fe5761620cc863ea 100644 (file)
@@ -43,10 +43,13 @@ CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
   cstWriter->guid.oid=oid;
   cstWriter->objectEntryOID=object;
   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
+  cstWriter->registrationCounter=0;
+  ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
   cstWriter->strictReliableCounter=0;
   cstWriter->bestEffortsCounter=0;
   cstWriter->csChangesCounter=0;
   cstWriter->cstRemoteReaderCounter=0;
+  cstWriter->registrationCounter=cstWriter->params.registrationRetries;
   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
   CSTWriterCSChange_init_head(cstWriter);
@@ -64,10 +67,12 @@ CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
     CSTWriterRefreshTimer(d,(void*)cstWriter);
   }
+  //add event for registration 
+  if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
+    CSTWriterRegistrationTimer(d,(void*)cstWriter);
+  }
   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
-                cstWriter->guid.hid,
-                cstWriter->guid.aid,
-                cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   debug(51,10) ("CSTWriterInit: finished\n");
 }
 
@@ -80,9 +85,7 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
   debug(51,10) ("CSTWriterDelete: start\n");
   
   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
-                cstWriter->guid.hid,
-                cstWriter->guid.aid,
-                cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   //Destroy all cstRemoteReader connected on cstWriter
   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
@@ -96,6 +99,10 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
       cstWriter->objectEntryOID->objectEntryAID,
       &cstWriter->refreshPeriodTimer,
       0);
+  eventDetach(d,
+      cstWriter->objectEntryOID->objectEntryAID,
+      &cstWriter->registrationTimer,
+      0);
   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
@@ -105,30 +112,40 @@ CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
 }
 
 /*****************************************************************************/
-void
-CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
-                         ObjectId oid) {
+CSTRemoteReader *
+CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
+                        ObjectId oid,ObjectEntryOID *sobject) {
   CSTRemoteReader     *cstRemoteReader;
   CSChangeForReader   *csChangeForReader;
   CSChange            *csChange=NULL;
   
   cstWriter->cstRemoteReaderCounter++;
   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
-  cstRemoteReader->guid.hid=object->objectEntryHID->hid;
-  cstRemoteReader->guid.aid=object->objectEntryAID->aid;
+  cstRemoteReader->guid.hid=pobject->guid.hid;
+  cstRemoteReader->guid.aid=pobject->guid.aid;
   cstRemoteReader->guid.oid=oid;
-  cstRemoteReader->objectEntryOID=object;
+  cstRemoteReader->sobject=sobject;
+  cstRemoteReader->pobject=pobject;
   cstRemoteReader->cstWriter=cstWriter;
   CSChangeForReader_init_root_field(cstRemoteReader);
   cstRemoteReader->commStateHB=MAYSENDHB;
   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   cstRemoteReader->HBRetriesCounter=0;
   cstRemoteReader->csChangesCounter=0;
+  cstRemoteReader->commStateToSentCounter=0;
   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
   //insert remote reader 
   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
+  //multicast case
+  if (cstRemoteReader->sobject->multicastPort) {
+    debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
+                 GUID_PRINTF(cstRemoteReader->guid),
+                  GUID_PRINTF(cstRemoteReader->sobject->guid));
+   ObjectEntryMulticast_insert(cstRemoteReader->sobject,
+       cstRemoteReader);
+  }
   //copy all csChanges (not for publication)
   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
@@ -136,14 +153,17 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
       cstRemoteReader->csChangesCounter++;
       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
       csChangeForReader->commStateChFReader=TOSEND;
+      cstRemoteReader->commStateToSentCounter++;
       csChangeForReader->csChange=csChange;
+      csChangeForReader->cstRemoteReader=cstRemoteReader;
       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+      CSChangeParticipant_insert(csChange,csChangeForReader);
       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
       cstRemoteReader->commStateSend=MUSTSENDDATA;
     }
     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
       eventAdd(d,
-          cstRemoteReader->objectEntryOID->objectEntryAID,
+          cstRemoteReader->sobject->objectEntryAID,
           &cstRemoteReader->delayResponceTimer,
           1,   
           "CSTWriterSendTimer",
@@ -154,7 +174,7 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
     }
   } else {
     //Publication
-    ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
+    ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
       cstWriter->strictReliableCounter++;
     else {
@@ -163,9 +183,8 @@ CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *obje
     }
   }
   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
-                cstRemoteReader->guid.hid,
-                cstRemoteReader->guid.aid,
-                cstRemoteReader->guid.oid);
+                GUID_PRINTF(cstRemoteReader->guid));
+  return cstRemoteReader;
 }
 
 /*****************************************************************************/
@@ -176,12 +195,10 @@ CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
   if (!cstRemoteReader) return;
   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
-                cstRemoteReader->guid.hid,
-                cstRemoteReader->guid.aid,
-                cstRemoteReader->guid.oid);
+                GUID_PRINTF(cstRemoteReader->guid));
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
     ORTESubsProp *sp;
-    sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+    sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
       cstRemoteReader->cstWriter->strictReliableCounter--;
     else {
@@ -190,25 +207,40 @@ CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
     }
   }  
   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
-    CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+    CSTWriterDestroyCSChangeForReader(
         csChangeForReader,ORTE_TRUE);
   }
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->delayResponceTimer,
       1);   //metatraffic timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->delayResponceTimer,
       2);   //userdata timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       1);   //metatraffic timer
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       2);   //userdata timer
+  //multicast case
+  if (cstRemoteReader->sobject->multicastPort) {
+    ObjectEntryOID *object;
+
+    object=cstRemoteReader->sobject;
+
+    ObjectEntryMulticast_delete(object,cstRemoteReader);
+    debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
+                 GUID_PRINTF(cstRemoteReader->guid),
+                  GUID_PRINTF(object->guid));
+
+    if (ObjectEntryMulticast_is_empty(object)) {
+      objectEntryDelete(d,object,ORTE_TRUE);
+    }
+  }
   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
   FREE(cstRemoteReader);
 }
@@ -257,7 +289,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   CSChange            *csChangeFSN;
   
   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
-               cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
+                GUID_PRINTF(cstWriter->guid));
   cstWriter->csChangesCounter++;
   //look for old cschange
   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
@@ -269,6 +301,7 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
   csChange->remoteReaderBest=0;
   csChange->remoteReaderStrict=0;
+  CSChangeParticipant_init_head(csChange);
   CSTWriterCSChange_insert(cstWriter,csChange);
   debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
                csChange->sn.low);
@@ -283,10 +316,15 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
   //insert new cschange for each reader
   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
     //csChangeForReader
+    debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
+                  GUID_PRINTF(cstRemoteReader->guid));
     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
     csChangeForReader->commStateChFReader=TOSEND;
+    cstRemoteReader->commStateToSentCounter++;
     csChangeForReader->csChange=csChange;
+    csChangeForReader->cstRemoteReader=cstRemoteReader;
     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
+    CSChangeParticipant_insert(csChange,csChangeForReader);
     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
     cstRemoteReader->csChangesCounter++;
     cstRemoteReader->HBRetriesCounter=0;
@@ -294,11 +332,11 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
       cstRemoteReader->commStateSend=MUSTSENDDATA;
       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
         eventDetach(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             1);
         eventAdd(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             1,   
             "CSTWriterSendTimer",
@@ -307,17 +345,17 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
             cstRemoteReader,
             NULL);
       } else {
-        ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+        ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
         
         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
           //Strict reliable subscription
           csChange->remoteReaderStrict++;
           eventDetach(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
+              cstRemoteReader->sobject->objectEntryAID,
               &cstRemoteReader->delayResponceTimer,
               2);
           eventAdd(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
+              cstRemoteReader->sobject->objectEntryAID,
               &cstRemoteReader->delayResponceTimer,
               2,   
               "CSTWriterSendStrictTimer",
@@ -328,8 +366,9 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
         } else {
           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
             //best efforts subscription
-            NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
-            
+            NtpTime nextIssueTime,nextIssueDelay,actTime;
+
+           actTime=getActualNtpTime();
             csChange->remoteReaderBest++;
             NtpTimeAdd(nextIssueTime,
                       cstRemoteReader->lastSentIssueTime,
@@ -340,30 +379,24 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
               NTPTIME_ZERO(nextIssueDelay);
             eventDetach(d,
-                cstRemoteReader->objectEntryOID->objectEntryAID,
+                cstRemoteReader->sobject->objectEntryAID,
                 &cstRemoteReader->delayResponceTimer,
                 2);
-            if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
-              //direct sent issue, for case zero time
-              CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
-            } else {
-              //schedule sent issue (future)
-              eventAdd(d,
-                  cstRemoteReader->objectEntryOID->objectEntryAID,
-                  &cstRemoteReader->delayResponceTimer,
-                  2,   
-                  "CSTWriterSendBestEffortTimer",
-                  CSTWriterSendBestEffortTimer,
-                  &cstRemoteReader->cstWriter->lock,
-                  cstRemoteReader,
-                  &nextIssueDelay);
-            }
+            //schedule sent issue 
+            eventAdd(d,
+                cstRemoteReader->sobject->objectEntryAID,
+                &cstRemoteReader->delayResponceTimer,
+                2,   
+                "CSTWriterSendBestEffortTimer",
+                CSTWriterSendBestEffortTimer,
+                &cstRemoteReader->cstWriter->lock,
+                cstRemoteReader,
+                &nextIssueDelay);
           } else {
             //!Best_Effort & !Strict_Reliable
-            CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
+            CSTWriterDestroyCSChangeForReader(csChangeForReader,
               ORTE_TRUE);
-            debug(51,5) ("CSTWriterAddCSChange: destryed\n");
-             
+            debug(51,5) ("CSTWriterAddCSChange: destroyed\n");       
           }
         }
       }
@@ -375,18 +408,24 @@ CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
 
 /*****************************************************************************/
 void 
-CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
-    CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
+CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
+    Boolean destroyCSChange) {
+  CSTRemoteReader *cstRemoteReader;
   CSChange *csChange;
+  
   if (!csChangeForReader) return;
+  cstRemoteReader=csChangeForReader->cstRemoteReader;
   csChange=csChangeForReader->csChange;
   csChange->remoteReaderCount--;  
   cstRemoteReader->csChangesCounter--;
   if (!cstRemoteReader->csChangesCounter) {
     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   }
+  if (csChangeForReader->commStateChFReader==TOSEND) {
+    cstRemoteReader->commStateToSentCounter--;
+  }
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
-    ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+    ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
         csChange->remoteReaderStrict--;
     } else {
@@ -396,11 +435,13 @@ CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
     }
   }  
   eventDetach(cstRemoteReader->cstWriter->domain,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &csChangeForReader->waitWhileDataUnderwayTimer,
       0);
+  CSChangeParticipant_delete(csChange,csChangeForReader);
   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
   FREE(csChangeForReader);
+
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
     if (!csChange->remoteReaderCount) {
       if (destroyCSChange) {
@@ -425,17 +466,20 @@ CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange)
   CSChange            *csChangeFSN;
 
   if (!csChange) return;
+
   cstWriter->csChangesCounter--;
   CSTWriterCSChange_delete(cstWriter,csChange);
   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
-    CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+    CSTWriterDestroyCSChangeForReader(
         csChangeForReader,ORTE_FALSE);
   }
-  if (csChange->cdrStream.buffer)
-    FREE(csChange->cdrStream.buffer);
+
+  if (csChange->cdrCodec.buffer)
+    FREE(csChange->cdrCodec.buffer);
   parameterDelete(csChange);
   FREE(csChange);
+
   //update first SN
   csChangeFSN=CSTWriterCSChange_first(cstWriter);
   if (csChangeFSN)
@@ -448,7 +492,9 @@ CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange)
 Boolean
 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
   CSChange *csChange;
+
   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
+
     if (!csChange->remoteReaderStrict) {
       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
       return ORTE_TRUE;
@@ -465,18 +511,25 @@ CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
   
   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
     timerQueue=2; //userdata timer queue
+
   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+
     //refresh only VAR's
     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
-      csChangeForReader->commStateChFReader=TOSEND;
+      
+      if (csChangeForReader->commStateChFReader!=TOSEND) {
+        csChangeForReader->commStateChFReader=TOSEND;
+        cstRemoteReader->commStateToSentCounter++;
+      }
+
       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
         cstRemoteReader->commStateSend=MUSTSENDDATA;
         eventDetach(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             timerQueue);
         eventAdd(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             timerQueue,  
             "CSTWriterSendTimer",
@@ -488,3 +541,86 @@ CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
     }
   }
 }
+
+/*****************************************************************************/
+int
+CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader) 
+{
+  CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
+
+  //setup new state for csChangeForReader
+  if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
+  cstRemoteReader->commStateToSentCounter--;
+
+  if (!cstRemoteReader->commStateToSentCounter)
+       cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+
+  if (NtpTimeCmp(zNtpTime,
+        cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
+       csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
+  } else {
+    csChangeForReader->commStateChFReader=UNDERWAY;
+    eventDetach(cstRemoteReader->cstWriter->domain,
+               cstRemoteReader->sobject->objectEntryAID,
+               &csChangeForReader->waitWhileDataUnderwayTimer,
+               0);
+    eventAdd(cstRemoteReader->cstWriter->domain,
+            cstRemoteReader->sobject->objectEntryAID,
+             &csChangeForReader->waitWhileDataUnderwayTimer,
+             0,   //common timer
+             "CSChangeForReaderUnderwayTimer",
+             CSChangeForReaderUnderwayTimer,
+             &cstRemoteReader->cstWriter->lock,
+             csChangeForReader,
+             &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
+  }
+  return 0;
+}
+
+/*****************************************************************************/
+void
+CSTWriterMulticast(CSChangeForReader *csChangeForReader) 
+{
+    CSTRemoteReader     *cstRemoteReader;
+    ObjectEntryOID     *objectEntryOID;
+    CSChangeForReader   *csChangeForReader1;
+    char                queue=1;
+    
+    cstRemoteReader=csChangeForReader->cstRemoteReader;
+    objectEntryOID=cstRemoteReader->sobject;
+
+    //multicast can do an application with multicast interface
+    if (!objectEntryOID->multicastPort)
+        return;
+
+    ul_list_for_each(CSChangeParticipant,
+                    csChangeForReader->csChange,
+                    csChangeForReader1) {
+        ObjectEntryOID  *objectEntryOID1;
+        CSTRemoteReader *cstRemoteReader1;
+      
+        cstRemoteReader1=csChangeForReader1->cstRemoteReader;
+        objectEntryOID1=cstRemoteReader1->sobject;
+
+         /* are RRs from same host */
+        if (cstRemoteReader1->guid.hid!=cstRemoteReader->guid.hid) {
+          /* is the sending object from same multicast group ? */
+          if (objectEntryOID!=objectEntryOID1)
+              continue;
+        }
+
+        /* is the csChange in state TOSEND ? If yes, marks like proc. */
+        CSTWriterCSChangeForReaderNewState(csChangeForReader1);
+
+        /* if there are no messages, detach sending timer */
+        if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
+            !(cstRemoteReader->commStateHB==MAYSENDHB))
+           continue;
+        if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) 
+          queue=2;
+        eventDetach(cstRemoteReader->cstWriter->domain,
+                   cstRemoteReader->sobject->objectEntryAID,
+                   &cstRemoteReader->delayResponceTimer,
+                   queue);
+    }
+}