]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTWriterTimer.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSCSTWriterTimer.c
index 6736a37826c10747c61dfb01ae752edc9a5b423a..1f7717348ee64429803c9cf4fb6af1e35404d850 100644 (file)
 
 #include "orte_all.h"
 
+/*****************************************************************************/
+int 
+CSTWriterRegistrationTimer(ORTEDomain *d,void *vcstWriter) {
+  CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
+  CSTRemoteReader *cstRemoteReader;
+
+  debug(52,10) ("CSTWriterRegistrationTimer: start\n");
+
+  debug(52,5) ("CSTWriterRegistrationTimer: OID: 0xx%x - retries = %d\n",
+               cstWriter->guid.oid,cstWriter->registrationCounter);
+  eventDetach(d,
+      cstWriter->objectEntryOID->objectEntryAID,
+      &cstWriter->registrationTimer,
+      0);   //common timer
+
+  if (cstWriter->registrationCounter!=0) {
+    cstWriter->registrationCounter--;
+    gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
+      CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
+    }
+    eventAdd(d,
+        cstWriter->objectEntryOID->objectEntryAID,
+        &cstWriter->registrationTimer,
+        0,   //common timer
+        "CSTWriterRegistrationTimer",
+        CSTWriterRegistrationTimer,
+        &cstWriter->lock,
+        cstWriter,
+        &cstWriter->params.registrationPeriod);               
+  } else {
+    if (d->domainEvents.onRegFail) {
+       d->domainEvents.onRegFail(d->domainEvents.onRegFailParam);
+    }
+  }
+
+  debug(52,10) ("CSTWriterRegistrationTimer: finished\n");
+  return 0;
+}
+
+
 /*****************************************************************************/
 int 
 CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
@@ -59,32 +99,29 @@ CSTWriterAnnounceTimer(ORTEDomain *d,void *vcstRemoteReader) {
       ((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
 //       (cstRemoteReader->unacknowledgedCounter))) {
     //create HB
-    int len=RTPSHeardBeatCreate(
-        d->mbSend.cdrStream.bufferPtr,
-        getMaxMessageLength(d),
+    int len=RTPSHeartBeatCreate(
+        &d->taskSend.mb.cdrCodec,
         &cstRemoteReader->cstWriter->firstSN,
         &cstRemoteReader->cstWriter->lastSN,
-        cstRemoteReader->cstWriter->guid.oid,
         OID_UNKNOWN,
+        cstRemoteReader->cstWriter->guid.oid,
         ORTE_FALSE);
     if (len<0) {
       //not enought space in sending buffer
-      d->mbSend.needSend=ORTE_TRUE;
+      d->taskSend.mb.needSend=ORTE_TRUE;
       return 1;
     }
-    d->mbSend.cdrStream.bufferPtr+=len;
-    d->mbSend.cdrStream.length+=len;
     debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
                   cstRemoteReader->cstWriter->guid.oid,
                   cstRemoteReader->guid.hid,
                   cstRemoteReader->guid.aid);
   }
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       1);
   eventAdd(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       1,   //metatraffic timer
       "CSTWriterAnnounceTimer",
@@ -107,22 +144,19 @@ CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
   debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
   pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
   //create HB
-  d->mbSend.cdrStreamDirect=NULL;
-  len=RTPSHeardBeatCreate(
-      d->mbSend.cdrStream.bufferPtr,
-      getMaxMessageLength(d),
+  d->taskSend.mb.cdrCodecDirect=NULL;
+  len=RTPSHeartBeatCreate(
+      &d->taskSend.mb.cdrCodec,
       &cstRemoteReader->cstWriter->firstSN,
       &cstRemoteReader->cstWriter->lastSN,
-      cstRemoteReader->cstWriter->guid.oid,
       OID_UNKNOWN,
+      cstRemoteReader->cstWriter->guid.oid,
       ORTE_FALSE);
   if (len<0) {
     //not enought space in sending buffer
-    d->mbSend.needSend=ORTE_TRUE;
+    d->taskSend.mb.needSend=ORTE_TRUE;
     return 1;
   }
-  d->mbSend.cdrStream.bufferPtr+=len;
-  d->mbSend.cdrStream.length+=len;
   debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
                 cstRemoteReader->cstWriter->guid.oid,
                 cstRemoteReader->guid.hid,
@@ -135,12 +169,12 @@ CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
   }
   cstRemoteReader->HBRetriesCounter++;
   eventDetach(d,
-      cstRemoteReader->objectEntryOID->objectEntryAID,
+      cstRemoteReader->sobject->objectEntryAID,
       &cstRemoteReader->repeatAnnounceTimer,
       2);
   if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {              
     eventAdd(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
+        cstRemoteReader->sobject->objectEntryAID,
         &cstRemoteReader->repeatAnnounceTimer,
         2,   //metatraffic timer
         "CSTWriterAnnounceIssueTimer",
@@ -152,7 +186,7 @@ CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
     //destroy all csChangesForReader
     CSChangeForReader *csChangeForReader;
     while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
-      CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+      CSTWriterDestroyCSChangeForReader(
           csChangeForReader,ORTE_TRUE);
     }
     debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
@@ -174,37 +208,51 @@ CSChangeForReaderUnderwayTimer(ORTEDomain *d,void *vcsChangeForReader) {
 int
 CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
-  ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
+  ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
   CSChangeForReader *csChangeForReader=NULL;
         
   debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
-  d->mbSend.cdrStreamDirect=NULL;
+  d->taskSend.mb.cdrCodecDirect=NULL;
   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
       if (csChangeForReader->commStateChFReader==TOSEND) {
         CSChange *csChange=csChangeForReader->csChange;
+
         csChangeForReader->commStateChFReader=UNDERWAY;
         cstRemoteReader->commStateSend=MUSTSENDDATA;
         cstRemoteReader->lastSentIssueTime=getActualNtpTime();
-        d->mbSend.cdrStreamDirect=&csChange->cdrStream;
-        debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x\n",
-                    cstRemoteReader->cstWriter->guid.oid,
-                    cstRemoteReader->guid.hid,
-                    cstRemoteReader->guid.aid);
+        d->taskSend.mb.cdrCodecDirect=&csChange->cdrCodec;
+
+        if (cstRemoteReader->sobject) {
+          debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x-0x%x\n",
+                        cstRemoteReader->cstWriter->guid.oid,
+                        GUID_PRINTF(cstRemoteReader->sobject->guid));
+        }
+
         ORTESendData(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             ORTE_FALSE);
-        CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+
+       //it's not nessecary to NewState, there is setuped only new state & after is deleted
+        CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+       /* mark multicast messages like processed */
+        CSTWriterMulticast(csChangeForReader);
+
+        CSTWriterDestroyCSChangeForReader(
             csChangeForReader,ORTE_TRUE);
+
         eventDetach(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             2);   
+
         //when is no csChange -> break processing 
         if (cstRemoteReader->cstWriter->csChangesCounter==0) 
           break;
+
         eventAdd(d,
-            cstRemoteReader->objectEntryOID->objectEntryAID,
+            cstRemoteReader->sobject->objectEntryAID,
             &cstRemoteReader->delayResponceTimer,
             2,   
             "CSTWriterSendBestEffortTimer",
@@ -213,10 +261,10 @@ CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
             cstRemoteReader,
             &sp->minimumSeparation);
         return 0;
+
       }
     }
   }
-  cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
   return 0;
 }
@@ -226,48 +274,56 @@ int
 CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
   CSChangeForReader *csChangeForReader=NULL;
-  int               max_msg_len,len;
+  int               len,data_offset,wptr_max;
   CSChange          *csChange;
   Boolean           firstTrace=ORTE_TRUE;
   
   debug(52,10) ("CSTWriterSendStrictTimer: start\n");
-  max_msg_len=getMaxMessageLength(d);
   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
       csChange=csChangeForReader->csChange;
       if (csChangeForReader->commStateChFReader==TOSEND) {
         cstRemoteReader->commStateSend=MUSTSENDDATA;
+
+        wptr_max=d->taskSend.mb.cdrCodec.wptr_max;
+        d->taskSend.mb.cdrCodec.wptr_max=csChange->cdrCodec.wptr_max;
+       /* infoReply */
         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
-            !d->mbSend.containsInfoReply) {
+            !d->taskSend.mb.containsInfoReply) {
+          AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
           firstTrace=ORTE_FALSE;
-          len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
+          len=RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
               IPADDRESS_INVALID,
-              ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->userdataUnicastPort);
+              ap->userdataUnicastPort);
           if (len<0) {
-            d->mbSend.needSend=ORTE_TRUE;
+            d->taskSend.mb.needSend=ORTE_TRUE;
+           d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
             return 1;
           }
-          d->mbSend.containsInfoReply=ORTE_TRUE;  
-          d->mbSend.cdrStream.bufferPtr+=len;
-          d->mbSend.cdrStream.length+=len;
-          max_msg_len-=len;
+          d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
                        cstRemoteReader->cstWriter->guid.oid,
                        cstRemoteReader->guid.hid,
                        cstRemoteReader->guid.aid);
         }
-        len=20+cstRemoteReader->cstWriter->typeRegister->getMaxSize;
-        if (max_msg_len<len) {
-          d->mbSend.needSend=ORTE_TRUE;
-          return 1;
+
+       data_offset=RTPS_HEADER_LENGTH+12;
+        if (CDR_buffer_puts(&d->taskSend.mb.cdrCodec,
+                           csChange->cdrCodec.buffer+data_offset, //src
+                           csChange->cdrCodec.wptr-data_offset)==CORBA_FALSE) {
+            d->taskSend.mb.needSend=ORTE_TRUE;
+           d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
+            return 1;
         }
-        memcpy(d->mbSend.cdrStream.bufferPtr,     //dest
-//               csChange->cdrStream.bufferPtr-len, //src
-               csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12, //src
-               len);                              //length
-        d->mbSend.cdrStream.bufferPtr+=len;
-        d->mbSend.cdrStream.length+=len;
-        max_msg_len-=len;
+    
+        d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
+
+       /* setup new state for csChangeForReader */
+        CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+       /* mark multicast messages like processed */
+        CSTWriterMulticast(csChangeForReader);
+
         debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
                     cstRemoteReader->cstWriter->guid.oid,
                     cstRemoteReader->guid.hid,
@@ -275,7 +331,6 @@ CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
       }
     }
   }
-  cstRemoteReader->commStateSend=NOTHNIGTOSEND;
   debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
   //add HeardBeat  
   return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
@@ -286,173 +341,115 @@ int
 CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
   CSChangeForReader *csChangeForReader=NULL;
-  unsigned int      max_msg_len;
-  int               len,off;
   Boolean           firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
   
   debug(52,10) ("CSTWriterSendTimer: start\n");
-  max_msg_len=getMaxMessageLength(d);
-  //setup f_bit of object
+
+  /* setup f_bit of object */
   if (cstRemoteReader->cstWriter->params.fullAcknowledge)
     f_bit=ORTE_FALSE;
+
   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
+
     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+
       if (csChangeForReader->commStateChFReader==TOSEND) {
         cstRemoteReader->commStateSend=MUSTSENDDATA;
+
+        /* infoReply */
         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
-            !d->mbSend.containsInfoReply) {
+            !d->taskSend.mb.containsInfoReply) {
+          AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
           firstTrace=ORTE_FALSE;
-          len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
-              IPADDRESS_INVALID,
-              ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->metatrafficUnicastPort);
-          if (len<0) {
-            d->mbSend.needSend=ORTE_TRUE;
+          if (RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
+                                 IPADDRESS_INVALID,
+                                 ap->metatrafficUnicastPort) < 0) {
+            d->taskSend.mb.needSend=ORTE_TRUE;
             return 1;
           }
-          d->mbSend.containsInfoReply=ORTE_TRUE;  
-          d->mbSend.cdrStream.bufferPtr+=len;
-          d->mbSend.cdrStream.length+=len;
-          max_msg_len-=len;
-          debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
-                       cstRemoteReader->cstWriter->guid.oid,
-                       cstRemoteReader->guid.hid,
-                       cstRemoteReader->guid.aid);
-        }
-        if (max_msg_len<32) {
-          d->mbSend.needSend=ORTE_TRUE;
-          return 1;
+          d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
+          debug(52,3) ("sent: RTPS_InfoREPLY from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+                        GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+                        GUID_PRINTF(cstRemoteReader->guid));
         }
-        off=0;
-        //VAR ???
+
+        /* VAR */
         if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
-          debug(52,3) ("sent: RTPS_VAR(0x%x) to 0x%x-0x%x\n",
-                       cstRemoteReader->cstWriter->guid.oid,
-                       cstRemoteReader->guid.hid,
-                       cstRemoteReader->guid.aid);
-          len=32;
-          d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)VAR;
-          d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
-          if (csChangeForReader->csChange->alive) 
-            d->mbSend.cdrStream.bufferPtr[1]|=4;
-          *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
-          conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
-          *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
-            cstRemoteReader->cstWriter->guid.oid;
-          conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
-          if (csChangeForReader->csChange->guid.oid==OID_APP) {
-            d->mbSend.cdrStream.bufferPtr[1]|=8;
-            *((HostId*)(d->mbSend.cdrStream.bufferPtr+12))=
-              csChangeForReader->csChange->guid.hid;
-            conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+12),0);
-            *((AppId*)(d->mbSend.cdrStream.bufferPtr+16))=
-              csChangeForReader->csChange->guid.aid;
-            conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+16),0);
-          } else {
-            len-=8;
-            off=-8;
-          }
-          *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+20+off))=
-            csChangeForReader->csChange->guid.oid;
-          conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+20+off),0);
-          *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+24+off))=
-            csChangeForReader->csChange->sn;
-          if (!CSChangeAttributes_is_empty(csChangeForReader->csChange)) {
-            int plen;
-            plen=parameterCodeStreamFromCSChange(csChangeForReader->csChange,
-                 d->mbSend.cdrStream.bufferPtr+32+off,max_msg_len-len);
-            if (plen<0) {
-              d->mbSend.needSend=ORTE_TRUE;
-              return 1;
-            }
-            d->mbSend.cdrStream.bufferPtr[1]|=2;
-            len+=plen;
+          debug(52,3) ("sent: RTPS_VAR from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+                        GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+                        GUID_PRINTF(cstRemoteReader->guid));
+
+          if (RTPSVarCreate(&d->taskSend.mb.cdrCodec,
+                           OID_UNKNOWN,
+                            cstRemoteReader->cstWriter->guid.oid,
+                           csChangeForReader->csChange) < 0) {
+            d->taskSend.mb.needSend=ORTE_TRUE;
+            return 1;
           }
-        } else {  //GAP ???
-          debug(52,3) ("sent: RTPS_GAP(0x%x) to 0x%x-0x%x\n",
-                       cstRemoteReader->cstWriter->guid.oid,
-                       cstRemoteReader->guid.hid,
-                       cstRemoteReader->guid.aid);
-          len=32;
-          d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)GAP;
-          d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
-          *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
-          conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
-          *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
-            cstRemoteReader->cstWriter->guid.oid;
-          conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
-          *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12))=
-            csChangeForReader->csChange->sn;
-          conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12),ORTE_MY_MBO);
-          SeqNumberAdd(*((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20)),  
-                       csChangeForReader->csChange->sn,
-                       csChangeForReader->csChange->gapSN);
-          conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20),ORTE_MY_MBO);
-          *((uint32_t*)(d->mbSend.cdrStream.bufferPtr+28))=0;    //NumBits 
-        }
-        *((ParameterLength*)(d->mbSend.cdrStream.bufferPtr+2))=len-4; 
-        d->mbSend.cdrStream.bufferPtr+=len;
-        d->mbSend.cdrStream.length+=len;
-        max_msg_len-=len;
-        //setup new state for csChangeForReader
-        if (NtpTimeCmp(zNtpTime,
-                cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
-          csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
+
         } else {
-          csChangeForReader->commStateChFReader=UNDERWAY;
-          eventDetach(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
-              &csChangeForReader->waitWhileDataUnderwayTimer,
-              0);
-          eventAdd(d,
-              cstRemoteReader->objectEntryOID->objectEntryAID,
-              &csChangeForReader->waitWhileDataUnderwayTimer,
-              0,   //common timer
-              "CSChangeForReaderUnderwayTimer",
-              CSChangeForReaderUnderwayTimer,
-              &cstRemoteReader->cstWriter->lock,
-              csChangeForReader,
-              &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
+        /* GAP */
+          debug(52,3) ("sent: RTPS_GAP from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+                        GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+                        GUID_PRINTF(cstRemoteReader->guid));
+
+          if (RTPSGapCreate(&d->taskSend.mb.cdrCodec,
+                           OID_UNKNOWN,
+                           cstRemoteReader->cstWriter->guid.oid,
+                           csChangeForReader->csChange) < 0) {
+            d->taskSend.mb.needSend=ORTE_TRUE;
+            return 1;
+          }
         }
+
+       /* setup new state for csChangeForReader */
+       CSTWriterCSChangeForReaderNewState(csChangeForReader);
+
+       /* mark multicast messages like processed */
+        CSTWriterMulticast(csChangeForReader);
+
       }
-    }
-  }
-  //add HeardBeat
-  len=RTPSHeardBeatCreate(
-      d->mbSend.cdrStream.bufferPtr,max_msg_len,
-      &cstRemoteReader->cstWriter->firstSN,
-      &cstRemoteReader->cstWriter->lastSN,
-      cstRemoteReader->cstWriter->guid.oid,
-      OID_UNKNOWN,
-      f_bit);
-  if (len<0) {
-    d->mbSend.needSend=ORTE_TRUE;
-    return 1;
-  } else {
-    //schedule new time for Announce timer
-    eventDetach(d,
-         cstRemoteReader->objectEntryOID->objectEntryAID,
-         &cstRemoteReader->repeatAnnounceTimer,
-         1);
-    eventAdd(d,
-         cstRemoteReader->objectEntryOID->objectEntryAID,
-         &cstRemoteReader->repeatAnnounceTimer,
-         1,   //metatraffic timer
-         "CSTWriterAnnounceTimer",
-         CSTWriterAnnounceTimer,
-         &cstRemoteReader->cstWriter->lock,
-         cstRemoteReader,
-         &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
+    } /* gavl_cust_for_each */
+
+    cstRemoteReader->commStateHB=MUSTSENDHB;
+
   }
-  debug(52,3) ("sent: RTPS_HB(0x%x) to 0x%x-0x%x\n",
-                cstRemoteReader->cstWriter->guid.oid,
-                cstRemoteReader->guid.hid,
-                cstRemoteReader->guid.aid);
+
   if (cstRemoteReader->commStateHB==MUSTSENDHB) {
+    //add HeartBeat
+    if (RTPSHeartBeatCreate(
+        &d->taskSend.mb.cdrCodec,
+        &cstRemoteReader->cstWriter->firstSN,
+        &cstRemoteReader->cstWriter->lastSN,
+        OID_UNKNOWN,
+        cstRemoteReader->cstWriter->guid.oid,
+        f_bit)<0) {
+      d->taskSend.mb.needSend=ORTE_TRUE;
+      return 1;
+    } else {
+      //schedule new time for Announce timer
+      eventDetach(d,
+           cstRemoteReader->sobject->objectEntryAID,
+           &cstRemoteReader->repeatAnnounceTimer,
+           1);
+      eventAdd(d,
+           cstRemoteReader->sobject->objectEntryAID,
+           &cstRemoteReader->repeatAnnounceTimer,
+           1,   //metatraffic timer
+           "CSTWriterAnnounceTimer",
+           CSTWriterAnnounceTimer,
+           &cstRemoteReader->cstWriter->lock,
+           cstRemoteReader,
+           &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
+    }
+
+    debug(52,3) ("sent: RTPS_HB from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
+                  GUID_PRINTF(cstRemoteReader->cstWriter->guid),
+                  GUID_PRINTF(cstRemoteReader->guid));
+
     cstRemoteReader->commStateHB=MAYSENDHB;
   }
-  cstRemoteReader->commStateSend=NOTHNIGTOSEND;
-  d->mbSend.cdrStream.bufferPtr+=len;
-  d->mbSend.cdrStream.length+=len;
   debug(52,10) ("CSTWriterSendTimer: finished\n");
   return 0;
 }