* $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
*
* DEBUG: section 54 CSChanges processing
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr.smolik@wo.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
*
*/
-#include "orte.h"
+#include "orte_all.h"
/*****************************************************************************/
void
//update parameters of object
parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
//changes can make only local Apps
- if (cstRemoteWriter->objectEntryOID->appMOM) {
+ if (cstRemoteWriter->spobject->appMOM) {
CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
csChangeFromWriter,
ORTE_TRUE);
void
CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
CSChangeFromWriter *csChangeFromWriter;
- SequenceNumber snNext;
+ SequenceNumber sn,snNext,lastGapSN;
debug(54,10) ("CSTReaderProcCSChanges: start\n");
if (!cstRemoteWriter) return;
while (1) {
csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
if (!csChangeFromWriter) break;
- if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
- cstRemoteWriter->firstSN)>=0) {
+ sn=csChangeFromWriter->csChange->sn;
+ if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
SeqNumberInc(snNext,cstRemoteWriter->sn);
- debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
- csChangeFromWriter->csChange->sn.low);
- if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+ debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
+ csChangeFromWriter->csChange->sn.low,
+ csChangeFromWriter->csChange->gapSN.low);
+ if ((SeqNumberCmp(sn,snNext)==0) &&
(csChangeFromWriter->commStateChFWriter==RECEIVED)) {
if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
if ((d->guid.aid & 0x03)==MANAGER)
}
CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
&snNext,ORTE_FALSE);
- } else
- break;
+ } else {
+ if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+ //GAP
+ SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+ SeqNumberDec(lastGapSN,lastGapSN);
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+ if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+ (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+ cstRemoteWriter->sn=lastGapSN;
+ }
+ } else {
+ if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ } else
+ /* stop processing of csChanges */
+ break;
+ }
+ }
} else {
CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
csChangeFromWriter,ORTE_FALSE);
void
CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
CSChangeFromWriter *csChangeFromWriter) {
+ CSChange *csChange=csChangeFromWriter->csChange;
ORTERecvInfo info;
ORTESubsProp *sp;
- ObjectEntryOID *objectEntryOID;\r
- unsigned int length;
+ ObjectEntryOID *objectEntryOID;
+ unsigned int max_size;
if (cstRemoteWriter==NULL) return;
objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
//deserialization routine
if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
cstRemoteWriter->cstReader->typeRegister->deserialize(
- &csChangeFromWriter->csChange->cdrStream,
+ &csChange->cdrCodec,
objectEntryOID->instance);
} else {
- length=csChangeFromWriter->csChange->cdrStream.length;
- if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
- length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
//no deserialization -> memcpy
+ ORTEGetMaxSizeParam gms;
+
+ /* determine maximal size */
+ gms.host_endian=csChange->cdrCodec.host_endian;
+ gms.data_endian=csChange->cdrCodec.data_endian;
+ gms.data=csChange->cdrCodec.buffer;
+ gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
+ gms.recv_size=csChange->cdrCodec.buf_len;
+ gms.csize=0;
+ if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
+ max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms);
+ else
+ max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
+ if (max_size>csChange->cdrCodec.buf_len)
+ max_size=csChange->cdrCodec.buf_len;
memcpy(objectEntryOID->instance,
- csChangeFromWriter->csChange->cdrStream.buffer,
- length);
+ csChange->cdrCodec.buffer,
+ max_size);
}
info.status=NEW_DATA;
- info.topic=sp->topic;
- info.type=sp->typeName;
- info.senderGUID=csChangeFromWriter->csChange->guid;
- info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
- info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
- info.sn=csChangeFromWriter->csChange->sn;
+ info.topic=(char*)sp->topic;
+ info.type=(char*)sp->typeName;
+ info.senderGUID=csChange->guid;
+ info.localTimeReceived=csChange->localTimeReceived;
+ info.remoteTimePublished=csChange->remoteTimePublished;
+ info.sn=csChange->sn;
objectEntryOID->recvCallBack(&info,
objectEntryOID->instance,
objectEntryOID->callBackParam);
CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
ORTESubsProp *sp;
CSChangeFromWriter *csChangeFromWriter;
- SequenceNumber snNext;
+ SequenceNumber sn,snNext,lastGapSN;
debug(54,10) ("CSTReaderProcIssue: start\n");
if (cstRemoteWriter==NULL) return;
while (1) {
csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
if (!csChangeFromWriter) break;
- if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
- cstRemoteWriter->firstSN)>=0) {
+ sn=csChangeFromWriter->csChange->sn;
+ if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
SeqNumberInc(snNext,cstRemoteWriter->sn);
- debug(54,10) ("CSTReaderProcChangesIssue: processing sn:%u,Change sn:%u\n",snNext.low,
- csChangeFromWriter->csChange->sn.low);
- if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+ if ((SeqNumberCmp(sn,snNext)==0) &&
(csChangeFromWriter->commStateChFWriter==RECEIVED)) {
if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
if ((cstRemoteWriter==
}
CSTReaderDestroyCSChange(cstRemoteWriter,
&snNext,ORTE_FALSE);
- } else
- break;
+ } else {
+ if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+ //GAP
+ SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+ SeqNumberDec(lastGapSN,lastGapSN);
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+ if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+ (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+ cstRemoteWriter->sn=lastGapSN;
+ }
+ } else {
+ if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ } else
+ /* stop processing of csChanges */
+ break;
+ }
+ }
} else {
CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
csChangeFromWriter,ORTE_FALSE);
while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
//NewData
CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
+
+ cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
+
CSTReaderDestroyCSChangeFromWriter(
cstRemoteWriter,
csChangeFromWriter,