void
CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
CSChangeFromWriter *csChangeFromWriter;
- SequenceNumber snNext;
+ SequenceNumber sn,snNext,lastGapSN;
debug(54,10) ("CSTReaderProcCSChanges: start\n");
if (!cstRemoteWriter) return;
while (1) {
csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
if (!csChangeFromWriter) break;
- if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
- cstRemoteWriter->firstSN)>=0) {
+ sn=csChangeFromWriter->csChange->sn;
+ if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
SeqNumberInc(snNext,cstRemoteWriter->sn);
- debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
- csChangeFromWriter->csChange->sn.low);
- if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+ debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
+ csChangeFromWriter->csChange->sn.low,
+ csChangeFromWriter->csChange->gapSN.low);
+ if ((SeqNumberCmp(sn,snNext)==0) &&
(csChangeFromWriter->commStateChFWriter==RECEIVED)) {
if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
if ((d->guid.aid & 0x03)==MANAGER)
}
CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
&snNext,ORTE_FALSE);
- } else
- break;
+ } else {
+ if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+ //GAP
+ SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+ SeqNumberDec(lastGapSN,lastGapSN);
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+ if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+ (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+ cstRemoteWriter->sn=lastGapSN;
+ }
+ } else {
+ if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ } else
+ /* stop processing of csChanges */
+ break;
+ }
+ }
} else {
CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
csChangeFromWriter,ORTE_FALSE);
CSChange *csChange=csChangeFromWriter->csChange;
ORTERecvInfo info;
ORTESubsProp *sp;
- ObjectEntryOID *objectEntryOID;\r
+ ObjectEntryOID *objectEntryOID;
unsigned int max_size;
if (cstRemoteWriter==NULL) return;
max_size);
}
info.status=NEW_DATA;
- info.topic=sp->topic;
- info.type=sp->typeName;
+ info.topic=(char*)sp->topic;
+ info.type=(char*)sp->typeName;
info.senderGUID=csChange->guid;
info.localTimeReceived=csChange->localTimeReceived;
info.remoteTimePublished=csChange->remoteTimePublished;
CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
ORTESubsProp *sp;
CSChangeFromWriter *csChangeFromWriter;
- SequenceNumber snNext;
+ SequenceNumber sn,snNext,lastGapSN;
debug(54,10) ("CSTReaderProcIssue: start\n");
if (cstRemoteWriter==NULL) return;
while (1) {
csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
if (!csChangeFromWriter) break;
- if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
- cstRemoteWriter->firstSN)>=0) {
+ sn=csChangeFromWriter->csChange->sn;
+ if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
SeqNumberInc(snNext,cstRemoteWriter->sn);
- if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
+ if ((SeqNumberCmp(sn,snNext)==0) &&
(csChangeFromWriter->commStateChFWriter==RECEIVED)) {
if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
if ((cstRemoteWriter==
cstRemoteWriter->sn,
csChangeFromWriter->csChange->gapSN);
}
-
CSTReaderDestroyCSChange(cstRemoteWriter,
&snNext,ORTE_FALSE);
- } else
- break;
+ } else {
+ if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
+ //GAP
+ SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
+ SeqNumberDec(lastGapSN,lastGapSN);
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
+ if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
+ (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
+ cstRemoteWriter->sn=lastGapSN;
+ }
+ } else {
+ if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
+ CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
+ } else
+ /* stop processing of csChanges */
+ break;
+ }
+ }
} else {
CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
csChangeFromWriter,ORTE_FALSE);