2 * $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 54 CSChanges processing
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
37 CSChangeFromWriter *csChangeFromWriter) {
39 ObjectEntryOID *objectEntryOID;
42 csChange=csChangeFromWriter->csChange;
43 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
44 if (!objectEntryOID) return;
45 if (!csChange->alive) {
47 objectEntryOID->objectEntryAID,
48 &objectEntryOID->expirationPurgeTimer,
51 objectEntryOID->objectEntryAID,
52 &objectEntryOID->expirationPurgeTimer,
55 objectEntryExpirationTimer,
61 switch (csChange->guid.aid & 0x03) {
63 //update parameters of object
64 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
65 //copy csChange to writerManagers
66 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
69 pthread_rwlock_wrlock(&d->writerManagers.lock);
70 CSTWriterAddCSChange(d,&d->writerManagers,csChange);
71 pthread_rwlock_unlock(&d->writerManagers.lock);
73 case MANAGEDAPPLICATION:
74 //update parameters of object
75 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
76 //changes can make only local Apps
77 if (cstRemoteWriter->spobject->appMOM) {
78 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
81 pthread_rwlock_wrlock(&d->writerApplications.lock);
82 CSTWriterAddCSChange(d,&d->writerApplications,csChange);
83 pthread_rwlock_unlock(&d->writerApplications.lock);
89 /*****************************************************************************/
91 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
92 CSChangeFromWriter *csChangeFromWriter) {
94 ObjectEntryOID *objectEntryOID;
96 csChange=csChangeFromWriter->csChange;
97 objectEntryOID=objectEntryFind(d,&csChange->guid);
98 if (!objectEntryOID) return;
99 if (!csChange->alive) {
101 objectEntryOID->objectEntryAID,
102 &objectEntryOID->expirationPurgeTimer,
105 objectEntryOID->objectEntryAID,
106 &objectEntryOID->expirationPurgeTimer,
109 objectEntryExpirationTimer,
115 switch (csChange->guid.oid & 0x07) {
116 case OID_APPLICATION:
118 case OID_PUBLICATION:
119 parameterUpdatePublication(csChange,
120 (ORTEPublProp*)objectEntryOID->attributes);
122 case OID_SUBSCRIPTION:
123 parameterUpdateSubscription(csChange,
124 (ORTESubsProp*)objectEntryOID->attributes);
129 /*****************************************************************************/
131 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
132 CSChangeFromWriter *csChangeFromWriter;
133 SequenceNumber sn,snNext,lastGapSN;
135 debug(54,10) ("CSTReaderProcCSChanges: start\n");
136 if (!cstRemoteWriter) return;
138 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
139 if (!csChangeFromWriter) break;
140 sn=csChangeFromWriter->csChange->sn;
141 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
142 SeqNumberInc(snNext,cstRemoteWriter->sn);
143 debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
144 csChangeFromWriter->csChange->sn.low,
145 csChangeFromWriter->csChange->gapSN.low);
146 if ((SeqNumberCmp(sn,snNext)==0) &&
147 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
148 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
149 if ((d->guid.aid & 0x03)==MANAGER)
150 CSTReaderProcCSChangesManager(d,cstRemoteWriter,
152 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION)
153 CSTReaderProcCSChangesApp(d,cstRemoteWriter,
155 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
158 SeqNumberAdd(cstRemoteWriter->sn,
160 csChangeFromWriter->csChange->gapSN);
162 CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
165 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
167 SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
168 SeqNumberDec(lastGapSN,lastGapSN);
169 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
170 //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
171 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
172 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
173 cstRemoteWriter->sn=lastGapSN;
176 if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
177 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
179 /* stop processing of csChanges */
184 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
185 csChangeFromWriter,ORTE_FALSE);
188 CSTReaderSetupState(cstRemoteWriter);
189 debug(54,10) ("CSTReaderProcCSChanges: finished\n");
192 /*****************************************************************************/
194 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
195 CSChangeFromWriter *csChangeFromWriter) {
196 CSChange *csChange=csChangeFromWriter->csChange;
199 ObjectEntryOID *objectEntryOID;
200 unsigned int max_size;
202 if (cstRemoteWriter==NULL) return;
203 objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
204 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
205 if (objectEntryOID->recvCallBack) {
206 //deserialization routine
207 if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
208 cstRemoteWriter->cstReader->typeRegister->deserialize(
210 objectEntryOID->instance);
212 //no deserialization -> memcpy
213 ORTEGetMaxSizeParam gms;
215 /* determine maximal size */
216 gms.host_endian=csChange->cdrCodec.host_endian;
217 gms.data_endian=csChange->cdrCodec.data_endian;
218 gms.data=csChange->cdrCodec.buffer;
219 gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
220 gms.recv_size=csChange->cdrCodec.buf_len;
222 if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
223 max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms,1);
225 max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
226 if (max_size>csChange->cdrCodec.buf_len)
227 max_size=csChange->cdrCodec.buf_len;
228 memcpy(objectEntryOID->instance,
229 csChange->cdrCodec.buffer,
232 info.status=NEW_DATA;
233 info.topic=(char*)sp->topic;
234 info.type=(char*)sp->typeName;
235 info.senderGUID=csChange->guid;
236 info.localTimeReceived=csChange->localTimeReceived;
237 info.remoteTimePublished=csChange->remoteTimePublished;
238 info.sn=csChange->sn;
239 objectEntryOID->recvCallBack(&info,
240 objectEntryOID->instance,
241 objectEntryOID->callBackParam);
242 if (sp->mode==IMMEDIATE) {
243 //setup new time for deadline timer
244 eventDetach(cstRemoteWriter->cstReader->domain,
245 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
246 &cstRemoteWriter->cstReader->deadlineTimer,
248 eventAdd(cstRemoteWriter->cstReader->domain,
249 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
250 &cstRemoteWriter->cstReader->deadlineTimer,
252 "CSTReaderDeadlineTimer",
253 CSTReaderDeadlineTimer,
254 &cstRemoteWriter->cstReader->lock,
255 cstRemoteWriter->cstReader,
258 if (sp->mode==PULLED) {
261 (getActualNtpTime()),
263 htimerUnicastCommon_set_expire(&cstRemoteWriter->
264 cstReader->deadlineTimer,timeNext);
269 /*****************************************************************************/
271 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
273 CSChangeFromWriter *csChangeFromWriter;
274 SequenceNumber sn,snNext,lastGapSN;
276 debug(54,10) ("CSTReaderProcIssue: start\n");
277 if (cstRemoteWriter==NULL) return;
278 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
279 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
281 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
283 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
284 if (!csChangeFromWriter) break;
285 sn=csChangeFromWriter->csChange->sn;
286 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
287 SeqNumberInc(snNext,cstRemoteWriter->sn);
288 if ((SeqNumberCmp(sn,snNext)==0) &&
289 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
290 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
291 if ((cstRemoteWriter==
292 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
293 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
295 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
297 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
300 SeqNumberAdd(cstRemoteWriter->sn,
302 csChangeFromWriter->csChange->gapSN);
304 CSTReaderDestroyCSChange(cstRemoteWriter,
307 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
309 SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
310 SeqNumberDec(lastGapSN,lastGapSN);
311 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
312 //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
313 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
314 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
315 cstRemoteWriter->sn=lastGapSN;
318 if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
319 CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
321 /* stop processing of csChanges */
326 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
327 csChangeFromWriter,ORTE_FALSE);
332 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
333 if ((cstRemoteWriter!=
334 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
335 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL))
337 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
338 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
340 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
342 cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
344 CSTReaderDestroyCSChangeFromWriter(
351 CSTReaderSetupState(cstRemoteWriter);
352 debug(54,10) ("CSTReaderProcIssue: finished\n");