2 * $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 54 CSChanges processing
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27 CSChangeFromWriter *csChangeFromWriter) {
29 ObjectEntryOID *objectEntryOID;
32 csChange=csChangeFromWriter->csChange;
33 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34 if (!objectEntryOID) return;
35 if (!csChange->alive) {
37 objectEntryOID->objectEntryAID,
38 &objectEntryOID->expirationPurgeTimer,
41 objectEntryOID->objectEntryAID,
42 &objectEntryOID->expirationPurgeTimer,
45 objectEntryExpirationTimer,
51 switch (csChange->guid.aid & 0x03) {
53 //update parameters of object
54 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55 //copy csChange to writerManagers
56 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
59 pthread_rwlock_wrlock(&d->writerManagers.lock);
60 CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61 pthread_rwlock_unlock(&d->writerManagers.lock);
63 case MANAGEDAPPLICATION:
64 //update parameters of object
65 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66 //changes can make only local Apps
67 if (cstRemoteWriter->objectEntryOID->appMOM) {
68 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
71 pthread_rwlock_wrlock(&d->writerApplications.lock);
72 CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73 pthread_rwlock_unlock(&d->writerApplications.lock);
79 /*****************************************************************************/
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82 CSChangeFromWriter *csChangeFromWriter) {
84 ObjectEntryOID *objectEntryOID;
86 csChange=csChangeFromWriter->csChange;
87 objectEntryOID=objectEntryFind(d,&csChange->guid);
88 if (!objectEntryOID) return;
89 if (!csChange->alive) {
91 objectEntryOID->objectEntryAID,
92 &objectEntryOID->expirationPurgeTimer,
95 objectEntryOID->objectEntryAID,
96 &objectEntryOID->expirationPurgeTimer,
99 objectEntryExpirationTimer,
105 switch (csChange->guid.oid & 0x07) {
106 case OID_APPLICATION:
108 case OID_PUBLICATION:
109 parameterUpdatePublication(csChange,
110 (ORTEPublProp*)objectEntryOID->attributes);
112 case OID_SUBSCRIPTION:
113 parameterUpdateSubscription(csChange,
114 (ORTESubsProp*)objectEntryOID->attributes);
119 /*****************************************************************************/
121 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
122 CSChangeFromWriter *csChangeFromWriter;
123 SequenceNumber snNext;
125 debug(54,10) ("CSTReaderProcCSChanges: start\n");
126 if (!cstRemoteWriter) return;
128 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
129 if (!csChangeFromWriter) break;
130 if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
131 cstRemoteWriter->firstSN)>=0) {
132 SeqNumberInc(snNext,cstRemoteWriter->sn);
133 debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
134 csChangeFromWriter->csChange->sn.low);
135 if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
136 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
137 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
138 if ((d->guid.aid & 0x03)==MANAGER)
139 CSTReaderProcCSChangesManager(d,cstRemoteWriter,
141 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION)
142 CSTReaderProcCSChangesApp(d,cstRemoteWriter,
144 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
147 SeqNumberAdd(cstRemoteWriter->sn,
149 csChangeFromWriter->csChange->gapSN);
151 CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
156 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
157 csChangeFromWriter,ORTE_FALSE);
160 CSTReaderSetupState(cstRemoteWriter);
161 debug(54,10) ("CSTReaderProcCSChanges: finished\n");
164 /*****************************************************************************/
166 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
167 CSChangeFromWriter *csChangeFromWriter) {
170 ObjectEntryOID *objectEntryOID;
\r
173 if (cstRemoteWriter==NULL) return;
174 objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
175 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
176 if (objectEntryOID->recvCallBack) {
177 //deserialization routine
178 if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
179 cstRemoteWriter->cstReader->typeRegister->deserialize(
180 &csChangeFromWriter->csChange->cdrStream,
181 objectEntryOID->instance);
183 length=csChangeFromWriter->csChange->cdrStream.length;
184 if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
185 length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
186 //no deserialization -> memcpy
187 memcpy(objectEntryOID->instance,
188 csChangeFromWriter->csChange->cdrStream.buffer,
191 info.status=NEW_DATA;
192 info.topic=sp->topic;
193 info.type=sp->typeName;
194 info.senderGUID=csChangeFromWriter->csChange->guid;
195 info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
196 info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
197 info.sn=csChangeFromWriter->csChange->sn;
198 objectEntryOID->recvCallBack(&info,
199 objectEntryOID->instance,
200 objectEntryOID->callBackParam);
201 if (sp->mode==IMMEDIATE) {
202 //setup new time for deadline timer
203 eventDetach(cstRemoteWriter->cstReader->domain,
204 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
205 &cstRemoteWriter->cstReader->deadlineTimer,
207 eventAdd(cstRemoteWriter->cstReader->domain,
208 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
209 &cstRemoteWriter->cstReader->deadlineTimer,
211 "CSTReaderDeadlineTimer",
212 CSTReaderDeadlineTimer,
213 &cstRemoteWriter->cstReader->lock,
214 cstRemoteWriter->cstReader,
217 if (sp->mode==PULLED) {
220 (getActualNtpTime()),
222 htimerUnicastCommon_set_expire(&cstRemoteWriter->
223 cstReader->deadlineTimer,timeNext);
228 /*****************************************************************************/
230 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
232 CSChangeFromWriter *csChangeFromWriter;
233 SequenceNumber snNext;
235 debug(54,10) ("CSTReaderProcIssue: start\n");
236 if (cstRemoteWriter==NULL) return;
237 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
238 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
240 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
242 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
243 if (!csChangeFromWriter) break;
244 if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
245 cstRemoteWriter->firstSN)>=0) {
246 SeqNumberInc(snNext,cstRemoteWriter->sn);
247 debug(54,10) ("CSTReaderProcChangesIssue: processing sn:%u,Change sn:%u\n",snNext.low,
248 csChangeFromWriter->csChange->sn.low);
249 if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
250 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
251 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
252 if ((cstRemoteWriter==
253 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
254 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
256 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
258 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
261 SeqNumberAdd(cstRemoteWriter->sn,
263 csChangeFromWriter->csChange->gapSN);
265 CSTReaderDestroyCSChange(cstRemoteWriter,
270 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
271 csChangeFromWriter,ORTE_FALSE);
276 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
277 if ((cstRemoteWriter!=
278 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
279 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL))
281 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
282 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
284 CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
285 CSTReaderDestroyCSChangeFromWriter(
292 CSTReaderSetupState(cstRemoteWriter);
293 debug(54,10) ("CSTReaderProcIssue: finished\n");