2 * $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 54 CSChanges processing
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
37 CSChangeFromWriter *csChangeFromWriter) {
39 ObjectEntryOID *objectEntryOID;
42 csChange=csChangeFromWriter->csChange;
43 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
44 if (!objectEntryOID) return;
45 if (!csChange->alive) {
47 objectEntryOID->objectEntryAID,
48 &objectEntryOID->expirationPurgeTimer,
51 objectEntryOID->objectEntryAID,
52 &objectEntryOID->expirationPurgeTimer,
55 objectEntryExpirationTimer,
61 switch (csChange->guid.aid & 0x03) {
63 //update parameters of object
64 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
65 //copy csChange to writerManagers
66 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
69 pthread_rwlock_wrlock(&d->writerManagers.lock);
70 CSTWriterAddCSChange(d,&d->writerManagers,csChange);
71 pthread_rwlock_unlock(&d->writerManagers.lock);
73 case MANAGEDAPPLICATION:
74 //update parameters of object
75 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
76 //changes can make only local Apps
77 if (cstRemoteWriter->spobject->appMOM) {
78 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
81 pthread_rwlock_wrlock(&d->writerApplications.lock);
82 CSTWriterAddCSChange(d,&d->writerApplications,csChange);
83 pthread_rwlock_unlock(&d->writerApplications.lock);
89 /*****************************************************************************/
91 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
92 CSChangeFromWriter *csChangeFromWriter) {
94 ObjectEntryOID *objectEntryOID;
96 csChange=csChangeFromWriter->csChange;
97 objectEntryOID=objectEntryFind(d,&csChange->guid);
98 if (!objectEntryOID) return;
99 if (!csChange->alive) {
101 objectEntryOID->objectEntryAID,
102 &objectEntryOID->expirationPurgeTimer,
105 objectEntryOID->objectEntryAID,
106 &objectEntryOID->expirationPurgeTimer,
109 objectEntryExpirationTimer,
115 switch (csChange->guid.oid & 0x07) {
116 case OID_APPLICATION:
118 case OID_PUBLICATION:
119 parameterUpdatePublication(csChange,
120 (ORTEPublProp*)objectEntryOID->attributes);
122 case OID_SUBSCRIPTION:
123 parameterUpdateSubscription(csChange,
124 (ORTESubsProp*)objectEntryOID->attributes);
129 /*****************************************************************************/
131 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
132 CSChangeFromWriter *csChangeFromWriter;
133 SequenceNumber sn,snNext,lastGapSN;
135 debug(54,10) ("CSTReaderProcCSChanges: start\n");
136 if (!cstRemoteWriter) return;
138 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
139 if (!csChangeFromWriter) break;
140 sn=csChangeFromWriter->csChange->sn;
141 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
142 SeqNumberInc(snNext,cstRemoteWriter->sn);
143 debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
144 csChangeFromWriter->csChange->sn.low,
145 csChangeFromWriter->csChange->gapSN.low);
146 if ((SeqNumberCmp(sn,snNext)==0) &&
147 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
148 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
149 if ((d->guid.aid & 0x03)==MANAGER)
150 CSTReaderProcCSChangesManager(d,cstRemoteWriter,
152 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION)
153 CSTReaderProcCSChangesApp(d,cstRemoteWriter,
155 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
158 SeqNumberAdd(cstRemoteWriter->sn,
160 csChangeFromWriter->csChange->gapSN);
162 CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
165 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
167 SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
168 SeqNumberDec(lastGapSN,lastGapSN);
169 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
170 //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
171 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
172 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
173 cstRemoteWriter->sn=lastGapSN;
176 if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
177 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
179 /* stop processing of csChanges */
184 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
185 csChangeFromWriter,ORTE_FALSE);
188 CSTReaderSetupState(cstRemoteWriter);
189 debug(54,10) ("CSTReaderProcCSChanges: finished\n");
192 /*****************************************************************************/
194 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
195 CSChangeFromWriter *csChangeFromWriter) {
196 CSChange *csChange=csChangeFromWriter->csChange;
199 ObjectEntryOID *objectEntryOID;
200 unsigned int max_size;
202 if (cstRemoteWriter==NULL) return;
203 objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
204 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
205 if (objectEntryOID->recvCallBack) {
206 //deserialization routine
207 if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
208 cstRemoteWriter->cstReader->typeRegister->deserialize(
210 objectEntryOID->instance);
212 //no deserialization -> memcpy
213 ORTEGetMaxSizeParam gms;
215 /* determine maximal size */
216 gms.host_endian=csChange->cdrCodec.host_endian;
217 gms.data_endian=csChange->cdrCodec.data_endian;
218 gms.data=csChange->cdrCodec.buffer;
219 gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
220 gms.recv_size=csChange->cdrCodec.buf_len;
222 if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
223 max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms,1);
225 max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
226 if (max_size>csChange->cdrCodec.buf_len)
227 max_size=csChange->cdrCodec.buf_len;
228 memcpy(objectEntryOID->instance,
229 csChange->cdrCodec.buffer,
232 info.status=NEW_DATA;
233 info.topic=(char*)sp->topic;
234 info.type=(char*)sp->typeName;
235 info.senderGUID=csChange->guid;
236 info.localTimeReceived=csChange->localTimeReceived;
237 info.remoteTimePublished=csChange->remoteTimePublished;
238 info.sn=csChange->sn;
239 info.data_endian=csChange->cdrCodec.data_endian;
240 objectEntryOID->recvCallBack(&info,
241 objectEntryOID->instance,
242 objectEntryOID->callBackParam);
243 if (sp->mode==IMMEDIATE) {
244 //setup new time for deadline timer
245 eventDetach(cstRemoteWriter->cstReader->domain,
246 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
247 &cstRemoteWriter->cstReader->deadlineTimer,
249 eventAdd(cstRemoteWriter->cstReader->domain,
250 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
251 &cstRemoteWriter->cstReader->deadlineTimer,
253 "CSTReaderDeadlineTimer",
254 CSTReaderDeadlineTimer,
255 &cstRemoteWriter->cstReader->lock,
256 cstRemoteWriter->cstReader,
259 if (sp->mode==PULLED) {
262 (getActualNtpTime()),
264 htimerUnicastCommon_set_expire(&cstRemoteWriter->
265 cstReader->deadlineTimer,timeNext);
270 /*****************************************************************************/
272 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
274 CSChangeFromWriter *csChangeFromWriter;
275 SequenceNumber sn,snNext,lastGapSN;
277 debug(54,10) ("CSTReaderProcIssue: start\n");
278 if (cstRemoteWriter==NULL) return;
279 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
280 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
282 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
284 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
285 if (!csChangeFromWriter) break;
286 sn=csChangeFromWriter->csChange->sn;
287 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
288 SeqNumberInc(snNext,cstRemoteWriter->sn);
289 if ((SeqNumberCmp(sn,snNext)==0) &&
290 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
291 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
292 if ((cstRemoteWriter==
293 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
294 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
296 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
298 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
301 SeqNumberAdd(cstRemoteWriter->sn,
303 csChangeFromWriter->csChange->gapSN);
305 CSTReaderDestroyCSChange(cstRemoteWriter,
308 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
310 SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
311 SeqNumberDec(lastGapSN,lastGapSN);
312 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
313 //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
314 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
315 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
316 cstRemoteWriter->sn=lastGapSN;
319 if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
320 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
322 /* stop processing of csChanges */
327 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
328 csChangeFromWriter,ORTE_FALSE);
333 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
334 if ((cstRemoteWriter!=
335 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
336 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL))
338 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
339 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
341 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
343 cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
345 CSTReaderDestroyCSChangeFromWriter(
352 CSTReaderSetupState(cstRemoteWriter);
353 debug(54,10) ("CSTReaderProcIssue: finished\n");