]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSCSTReaderProc.c
orte 0.3.2 release
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
index 5f8e868a0b9795c3bf171b1db6324efd7f00450b..185033db009f6bc11788006dbd042fd3f2ceb435 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
  *
  *  DEBUG:  section 54                  CSChanges processing
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr.smolik@wo.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -19,7 +29,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 /*****************************************************************************/
 void
@@ -64,7 +74,7 @@ CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
       //update parameters of object
       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
       //changes can make only local Apps
-      if (cstRemoteWriter->objectEntryOID->appMOM) {
+      if (cstRemoteWriter->spobject->appMOM) {
         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
                                  csChangeFromWriter,
                                  ORTE_TRUE);
@@ -120,19 +130,20 @@ CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
 void 
 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
   CSChangeFromWriter *csChangeFromWriter;
-  SequenceNumber     snNext;
+  SequenceNumber     sn,snNext,lastGapSN;
 
   debug(54,10) ("CSTReaderProcCSChanges: start\n");
   if (!cstRemoteWriter) return;
   while (1) {
     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
     if (!csChangeFromWriter) break;
-    if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
-                     cstRemoteWriter->firstSN)>=0) {
+    sn=csChangeFromWriter->csChange->sn;
+    if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
       SeqNumberInc(snNext,cstRemoteWriter->sn);
-      debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
-                                             csChangeFromWriter->csChange->sn.low);
-      if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+      debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
+                                             csChangeFromWriter->csChange->sn.low,
+                                             csChangeFromWriter->csChange->gapSN.low);
+      if ((SeqNumberCmp(sn,snNext)==0) &&
           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
           if ((d->guid.aid & 0x03)==MANAGER) 
@@ -150,8 +161,25 @@ CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
         }
         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
             &snNext,ORTE_FALSE);
-      } else
-        break;
+      } else {
+        if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+          //GAP
+          SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+          SeqNumberDec(lastGapSN,lastGapSN);
+          CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+          //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+          if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+              (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+            cstRemoteWriter->sn=lastGapSN;
+          } 
+        } else {
+          if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+            CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+          } else
+            /* stop processing of csChanges */
+            break;
+        }
+      }
     } else {
       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
           csChangeFromWriter,ORTE_FALSE);
@@ -165,10 +193,11 @@ CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
 void
 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
     CSChangeFromWriter *csChangeFromWriter) {
+  CSChange             *csChange=csChangeFromWriter->csChange;
   ORTERecvInfo         info;  
   ORTESubsProp         *sp;
-  ObjectEntryOID       *objectEntryOID;\r
-  unsigned int         length;
+  ObjectEntryOID       *objectEntryOID;
+  unsigned int                max_size;
         
   if (cstRemoteWriter==NULL) return;
   objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
@@ -177,24 +206,36 @@ CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
     //deserialization routine
     if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
       cstRemoteWriter->cstReader->typeRegister->deserialize(
-          &csChangeFromWriter->csChange->cdrStream,
+          &csChange->cdrCodec,
           objectEntryOID->instance);
     } else {
-      length=csChangeFromWriter->csChange->cdrStream.length;
-      if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
-        length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
       //no deserialization -> memcpy
+      ORTEGetMaxSizeParam gms;
+
+      /* determine maximal size */
+      gms.host_endian=csChange->cdrCodec.host_endian;
+      gms.data_endian=csChange->cdrCodec.data_endian;
+      gms.data=csChange->cdrCodec.buffer;
+      gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
+      gms.recv_size=csChange->cdrCodec.buf_len;
+      gms.csize=0;
+      if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
+        max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms);
+      else
+        max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
+      if (max_size>csChange->cdrCodec.buf_len)
+        max_size=csChange->cdrCodec.buf_len;
       memcpy(objectEntryOID->instance,
-            csChangeFromWriter->csChange->cdrStream.buffer,
-            length);
+             csChange->cdrCodec.buffer,
+             max_size);
     }
     info.status=NEW_DATA;
-    info.topic=sp->topic;
-    info.type=sp->typeName;
-    info.senderGUID=csChangeFromWriter->csChange->guid;
-    info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
-    info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
-    info.sn=csChangeFromWriter->csChange->sn;
+    info.topic=(char*)sp->topic;
+    info.type=(char*)sp->typeName;
+    info.senderGUID=csChange->guid;
+    info.localTimeReceived=csChange->localTimeReceived;
+    info.remoteTimePublished=csChange->remoteTimePublished;
+    info.sn=csChange->sn;
     objectEntryOID->recvCallBack(&info,
                             objectEntryOID->instance,
                             objectEntryOID->callBackParam);
@@ -230,7 +271,7 @@ void
 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
   ORTESubsProp         *sp;
   CSChangeFromWriter   *csChangeFromWriter;
-  SequenceNumber       snNext;
+  SequenceNumber       sn,snNext,lastGapSN;
  
   debug(54,10) ("CSTReaderProcIssue: start\n");
   if (cstRemoteWriter==NULL) return;
@@ -241,12 +282,10 @@ CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled)
     while (1) {
       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
       if (!csChangeFromWriter) break;
-      if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
-                      cstRemoteWriter->firstSN)>=0) {
+      sn=csChangeFromWriter->csChange->sn;
+      if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
         SeqNumberInc(snNext,cstRemoteWriter->sn);
-        debug(54,10) ("CSTReaderProcChangesIssue: processing sn:%u,Change sn:%u\n",snNext.low,
-                                              csChangeFromWriter->csChange->sn.low);
-        if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+        if ((SeqNumberCmp(sn,snNext)==0) &&
             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
             if ((cstRemoteWriter==
@@ -264,8 +303,25 @@ CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled)
           }
           CSTReaderDestroyCSChange(cstRemoteWriter,
               &snNext,ORTE_FALSE);
-        } else
-          break;
+        } else {
+          if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+            //GAP
+            SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+            SeqNumberDec(lastGapSN,lastGapSN);
+            CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+            //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+            if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+                (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+              cstRemoteWriter->sn=lastGapSN;
+            }
+          } else {
+            if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+              CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+            } else
+              /* stop processing of csChanges */
+              break;
+          }
+       }
       } else {
         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
             csChangeFromWriter,ORTE_FALSE);
@@ -282,6 +338,9 @@ CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled)
       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
         //NewData                
         CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
+
+        cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
+
         CSTReaderDestroyCSChangeFromWriter(
             cstRemoteWriter,
             csChangeFromWriter,