2 * $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 54 CSChanges processing
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27 CSChangeFromWriter *csChangeFromWriter) {
29 ObjectEntryOID *objectEntryOID;
32 csChange=csChangeFromWriter->csChange;
33 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34 if (!objectEntryOID) return;
35 if (!csChange->alive) {
37 objectEntryOID->objectEntryAID,
38 &objectEntryOID->expirationPurgeTimer,
41 objectEntryOID->objectEntryAID,
42 &objectEntryOID->expirationPurgeTimer,
45 objectEntryExpirationTimer,
51 switch (csChange->guid.aid & 0x03) {
53 //update parameters of object
54 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55 //copy csChange to writerManagers
56 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
59 pthread_rwlock_wrlock(&d->writerManagers.lock);
60 CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61 pthread_rwlock_unlock(&d->writerManagers.lock);
63 case MANAGEDAPPLICATION:
64 //update parameters of object
65 parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66 //changes can make only local Apps
67 if (cstRemoteWriter->objectEntryOID->appMOM) {
68 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
71 pthread_rwlock_wrlock(&d->writerApplications.lock);
72 CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73 pthread_rwlock_unlock(&d->writerApplications.lock);
79 /*****************************************************************************/
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82 CSChangeFromWriter *csChangeFromWriter) {
84 ObjectEntryOID *objectEntryOID;
86 csChange=csChangeFromWriter->csChange;
87 objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
88 if (!objectEntryOID) return;
89 if (!csChange->alive) {
91 objectEntryOID->objectEntryAID,
92 &objectEntryOID->expirationPurgeTimer,
95 objectEntryOID->objectEntryAID,
96 &objectEntryOID->expirationPurgeTimer,
99 objectEntryExpirationTimer,
105 switch (csChangeFromWriter->csChange->guid.oid & 0x07) {
106 case OID_APPLICATION:
108 case OID_PUBLICATION:
110 case OID_SUBSCRIPTION:
115 /*****************************************************************************/
117 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
118 CSChangeFromWriter *csChangeFromWriter;
119 SequenceNumber snNext;
121 debug(54,10) ("CSTReaderProcCSChanges: start\n");
122 if (!cstRemoteWriter) return;
124 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
125 if (!csChangeFromWriter) break;
126 if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
127 cstRemoteWriter->firstSN)>=0) {
128 SeqNumberInc(snNext,cstRemoteWriter->sn);
129 debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
130 csChangeFromWriter->csChange->sn.low);
131 if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
132 (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
133 if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
134 if ((d->guid.aid & 0x03)==MANAGER)
135 CSTReaderProcCSChangesManager(d,cstRemoteWriter,
137 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION)
138 CSTReaderProcCSChangesApp(d,cstRemoteWriter,
140 SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
143 SeqNumberAdd(cstRemoteWriter->sn,
145 csChangeFromWriter->csChange->gapSN);
147 CSTReaderDestroyCSChange(cstRemoteWriter, //note:csChange can be coped to another CSTWriter!!!
152 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
153 csChangeFromWriter,ORTE_FALSE);
156 CSTReaderSetupState(cstRemoteWriter);
157 debug(54,10) ("CSTReaderProcCSChanges: finished\n");
160 /*****************************************************************************/
162 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
164 CSChangeFromWriter *csChangeFromWriter;
167 debug(54,10) ("CSTReaderProcIssue: start\n");
168 if (cstRemoteWriter==NULL) return;
169 sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
171 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
174 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
175 if ((cstRemoteWriter!=
176 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
177 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL))
179 if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
180 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
181 ObjectEntryOID *objectEntryOID;
182 objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
183 if (objectEntryOID->recvCallBack) {
184 //deserialization routine
185 if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
186 cstRemoteWriter->cstReader->typeRegister->deserialize(
187 &csChangeFromWriter->csChange->cdrStream,
188 objectEntryOID->instance);
190 int length=csChangeFromWriter->csChange->cdrStream.length;
191 if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
192 length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
193 //no deserialization -> memcpy
194 memcpy(objectEntryOID->instance,
195 csChangeFromWriter->csChange->cdrStream.buffer,
198 info.status=NEW_DATA;
199 info.topic=sp->topic;
200 info.type=sp->typeName;
201 info.senderGUID=csChangeFromWriter->csChange->guid;
202 info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
203 info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
204 info.sn=csChangeFromWriter->csChange->sn;
205 objectEntryOID->recvCallBack(&info,
206 objectEntryOID->instance,
207 objectEntryOID->callBackParam);
208 if (sp->mode==IMMEDIATE) {
209 //setup new time for deadline timer
210 eventDetach(cstRemoteWriter->cstReader->domain,
211 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
212 &cstRemoteWriter->cstReader->deadlineTimer,
214 eventAdd(cstRemoteWriter->cstReader->domain,
215 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
216 &cstRemoteWriter->cstReader->deadlineTimer,
218 "CSTReaderDeadlineTimer",
219 CSTReaderDeadlineTimer,
220 &cstRemoteWriter->cstReader->lock,
221 cstRemoteWriter->cstReader,
224 if (sp->mode==PULLED) {
227 (getActualNtpTime()),
229 htimerUnicastCommon_set_expire(&cstRemoteWriter->
230 cstReader->deadlineTimer,timeNext);
233 CSTReaderDestroyCSChangeFromWriter(
240 CSTReaderSetupState(cstRemoteWriter);
241 debug(54,10) ("CSTReaderProcIssue: finished\n");