]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTReaderProc.c
orte 0.3.2 release
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
index e94ee5c4ff01050cd22d8c0a6adf57c9d47ea5bf..185033db009f6bc11788006dbd042fd3f2ceb435 100644 (file)
@@ -130,19 +130,20 @@ CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
 void 
 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
   CSChangeFromWriter *csChangeFromWriter;
-  SequenceNumber     snNext;
+  SequenceNumber     sn,snNext,lastGapSN;
 
   debug(54,10) ("CSTReaderProcCSChanges: start\n");
   if (!cstRemoteWriter) return;
   while (1) {
     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
     if (!csChangeFromWriter) break;
-    if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
-                     cstRemoteWriter->firstSN)>=0) {
+    sn=csChangeFromWriter->csChange->sn;
+    if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
       SeqNumberInc(snNext,cstRemoteWriter->sn);
-      debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
-                                             csChangeFromWriter->csChange->sn.low);
-      if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+      debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
+                                             csChangeFromWriter->csChange->sn.low,
+                                             csChangeFromWriter->csChange->gapSN.low);
+      if ((SeqNumberCmp(sn,snNext)==0) &&
           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
           if ((d->guid.aid & 0x03)==MANAGER) 
@@ -160,8 +161,25 @@ CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
         }
         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
             &snNext,ORTE_FALSE);
-      } else
-        break;
+      } else {
+        if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+          //GAP
+          SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+          SeqNumberDec(lastGapSN,lastGapSN);
+          CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+          //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+          if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+              (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+            cstRemoteWriter->sn=lastGapSN;
+          } 
+        } else {
+          if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+            CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+          } else
+            /* stop processing of csChanges */
+            break;
+        }
+      }
     } else {
       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
           csChangeFromWriter,ORTE_FALSE);
@@ -178,7 +196,7 @@ CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
   CSChange             *csChange=csChangeFromWriter->csChange;
   ORTERecvInfo         info;  
   ORTESubsProp         *sp;
-  ObjectEntryOID       *objectEntryOID;\r
+  ObjectEntryOID       *objectEntryOID;
   unsigned int                max_size;
         
   if (cstRemoteWriter==NULL) return;
@@ -212,8 +230,8 @@ CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
              max_size);
     }
     info.status=NEW_DATA;
-    info.topic=sp->topic;
-    info.type=sp->typeName;
+    info.topic=(char*)sp->topic;
+    info.type=(char*)sp->typeName;
     info.senderGUID=csChange->guid;
     info.localTimeReceived=csChange->localTimeReceived;
     info.remoteTimePublished=csChange->remoteTimePublished;
@@ -253,7 +271,7 @@ void
 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
   ORTESubsProp         *sp;
   CSChangeFromWriter   *csChangeFromWriter;
-  SequenceNumber       snNext;
+  SequenceNumber       sn,snNext,lastGapSN;
  
   debug(54,10) ("CSTReaderProcIssue: start\n");
   if (cstRemoteWriter==NULL) return;
@@ -264,10 +282,10 @@ CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled)
     while (1) {
       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
       if (!csChangeFromWriter) break;
-      if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
-                      cstRemoteWriter->firstSN)>=0) {
+      sn=csChangeFromWriter->csChange->sn;
+      if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
         SeqNumberInc(snNext,cstRemoteWriter->sn);
-        if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+        if ((SeqNumberCmp(sn,snNext)==0) &&
             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
             if ((cstRemoteWriter==
@@ -283,11 +301,27 @@ CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled)
                         cstRemoteWriter->sn,
                         csChangeFromWriter->csChange->gapSN);
           }
-
           CSTReaderDestroyCSChange(cstRemoteWriter,
               &snNext,ORTE_FALSE);
-        } else
-          break;
+        } else {
+          if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+            //GAP
+            SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+            SeqNumberDec(lastGapSN,lastGapSN);
+            CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+            //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+            if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+                (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+              cstRemoteWriter->sn=lastGapSN;
+            }
+          } else {
+            if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+              CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+            } else
+              /* stop processing of csChanges */
+              break;
+          }
+       }
       } else {
         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
             csChangeFromWriter,ORTE_FALSE);