2 * $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 54 CSChanges processing
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27 CSChangeFromWriter *csChangeFromWriter) {
29 ObjectEntryOID *objectEntryOID;
32 csChange=csChangeFromWriter->csChange;
33 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34 if (!objectEntryOID) return;
35 if (!csChange->alive) {
37 objectEntryOID->objectEntryAID,
38 &objectEntryOID->expirationPurgeTimer,
41 objectEntryOID->objectEntryAID,
42 &objectEntryOID->expirationPurgeTimer,
45 objectEntryExpirationTimer,
51 switch (csChange->guid.aid & 0x03) {
53 //update parameters of object
54 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55 //copy csChange to writerManagers
56 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
59 pthread_rwlock_wrlock(&d->writerManagers.lock);
60 CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61 pthread_rwlock_unlock(&d->writerManagers.lock);
63 case MANAGEDAPPLICATION:
64 //update parameters of object
65 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66 //changes can make only local Apps
67 if (cstRemoteWriter->spobject->appMOM) {
68 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
71 pthread_rwlock_wrlock(&d->writerApplications.lock);
72 CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73 pthread_rwlock_unlock(&d->writerApplications.lock);
79 /*****************************************************************************/
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82 CSChangeFromWriter *csChangeFromWriter) {
84 ObjectEntryOID *objectEntryOID;
86 csChange=csChangeFromWriter->csChange;
87 objectEntryOID=objectEntryFind(d,&csChange->guid);
88 if (!objectEntryOID) return;
89 if (!csChange->alive) {
91 objectEntryOID->objectEntryAID,
92 &objectEntryOID->expirationPurgeTimer,
95 objectEntryOID->objectEntryAID,
96 &objectEntryOID->expirationPurgeTimer,
99 objectEntryExpirationTimer,
105 switch (csChange->guid.oid & 0x07) {
106 case OID_APPLICATION:
108 case OID_PUBLICATION:
109 parameterUpdatePublication(csChange,
110 (ORTEPublProp*)objectEntryOID->attributes);
112 case OID_SUBSCRIPTION:
113 parameterUpdateSubscription(csChange,
114 (ORTESubsProp*)objectEntryOID->attributes);
119 /*****************************************************************************/
121 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
122 CSChangeFromWriter *csChangeFromWriter;
123 SequenceNumber snNext;
125 debug(54,10) ("CSTReaderProcCSChanges: start\n");
126 if (!cstRemoteWriter) return;
128 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
129 if (!csChangeFromWriter) break;
130 if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
131 cstRemoteWriter->firstSN)>=0) {
132 SeqNumberInc(snNext,cstRemoteWriter->sn);
133 debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
134 csChangeFromWriter->csChange->sn.low);
135 if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
136 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
137 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
138 if ((d->guid.aid & 0x03)==MANAGER)
139 CSTReaderProcCSChangesManager(d,cstRemoteWriter,
141 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION)
142 CSTReaderProcCSChangesApp(d,cstRemoteWriter,
144 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
147 SeqNumberAdd(cstRemoteWriter->sn,
149 csChangeFromWriter->csChange->gapSN);
151 CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
156 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
157 csChangeFromWriter,ORTE_FALSE);
160 CSTReaderSetupState(cstRemoteWriter);
161 debug(54,10) ("CSTReaderProcCSChanges: finished\n");
164 /*****************************************************************************/
166 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
167 CSChangeFromWriter *csChangeFromWriter) {
168 CSChange *csChange=csChangeFromWriter->csChange;
171 ObjectEntryOID *objectEntryOID;
\r
172 unsigned int max_size;
174 if (cstRemoteWriter==NULL) return;
175 objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
176 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
177 if (objectEntryOID->recvCallBack) {
178 //deserialization routine
179 if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
180 cstRemoteWriter->cstReader->typeRegister->deserialize(
182 objectEntryOID->instance);
184 //no deserialization -> memcpy
185 ORTEGetMaxSizeParam gms;
187 /* determine maximal size */
188 gms.host_endian=csChange->cdrCodec.host_endian;
189 gms.data_endian=csChange->cdrCodec.data_endian;
190 gms.data=csChange->cdrCodec.buffer;
191 gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
192 gms.recv_size=csChange->cdrCodec.buf_len;
194 if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
195 max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms);
197 max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
198 if (max_size>csChange->cdrCodec.buf_len)
199 max_size=csChange->cdrCodec.buf_len;
200 memcpy(objectEntryOID->instance,
201 csChange->cdrCodec.buffer,
204 info.status=NEW_DATA;
205 info.topic=sp->topic;
206 info.type=sp->typeName;
207 info.senderGUID=csChange->guid;
208 info.localTimeReceived=csChange->localTimeReceived;
209 info.remoteTimePublished=csChange->remoteTimePublished;
210 info.sn=csChange->sn;
211 objectEntryOID->recvCallBack(&info,
212 objectEntryOID->instance,
213 objectEntryOID->callBackParam);
214 if (sp->mode==IMMEDIATE) {
215 //setup new time for deadline timer
216 eventDetach(cstRemoteWriter->cstReader->domain,
217 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
218 &cstRemoteWriter->cstReader->deadlineTimer,
220 eventAdd(cstRemoteWriter->cstReader->domain,
221 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
222 &cstRemoteWriter->cstReader->deadlineTimer,
224 "CSTReaderDeadlineTimer",
225 CSTReaderDeadlineTimer,
226 &cstRemoteWriter->cstReader->lock,
227 cstRemoteWriter->cstReader,
230 if (sp->mode==PULLED) {
233 (getActualNtpTime()),
235 htimerUnicastCommon_set_expire(&cstRemoteWriter->
236 cstReader->deadlineTimer,timeNext);
241 /*****************************************************************************/
243 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
245 CSChangeFromWriter *csChangeFromWriter;
246 SequenceNumber snNext;
248 debug(54,10) ("CSTReaderProcIssue: start\n");
249 if (cstRemoteWriter==NULL) return;
250 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
251 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
253 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
255 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
256 if (!csChangeFromWriter) break;
257 if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
258 cstRemoteWriter->firstSN)>=0) {
259 SeqNumberInc(snNext,cstRemoteWriter->sn);
260 if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
261 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
262 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
263 if ((cstRemoteWriter==
264 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
265 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
267 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
269 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
272 SeqNumberAdd(cstRemoteWriter->sn,
274 csChangeFromWriter->csChange->gapSN);
277 CSTReaderDestroyCSChange(cstRemoteWriter,
282 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
283 csChangeFromWriter,ORTE_FALSE);
288 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
289 if ((cstRemoteWriter!=
290 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
291 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL))
293 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
294 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
296 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
298 cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
300 CSTReaderDestroyCSChangeFromWriter(
307 CSTReaderSetupState(cstRemoteWriter);
308 debug(54,10) ("CSTReaderProcIssue: finished\n");