2 * $Id: RTPSCSTReader.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 53 CSTReader
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTReader,
25 CSTSubscriptions, CSTReader, GUID_RTPS,
26 cstReader, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter,
28 CSTReader, CSTRemoteWriter, GUID_RTPS,
29 cstRemoteWriter, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
31 CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
32 csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTReaderInit(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
37 ObjectId oid,CSTReaderParams *params,ORTETypeRegister *typeRegister) {
39 debug(53,10) ("CSTReaderInit: start\n");
40 //init values of cstReader
41 cstReader->guid.hid=object->objectEntryHID->hid;
42 cstReader->guid.aid=object->objectEntryAID->aid;
43 cstReader->guid.oid=oid;
44 cstReader->objectEntryOID=object;
45 memcpy(&cstReader->params,params,sizeof(CSTReaderParams));
46 cstReader->strictReliableCounter=0;
47 cstReader->bestEffortsCounter=0;
48 cstReader->cstRemoteWriterCounter=0;
49 cstReader->createdByPattern=ORTE_FALSE;
50 CSTReaderCSChange_init_head(cstReader);
51 CSTRemoteWriter_init_root_field(cstReader);
52 pthread_rwlock_init(&cstReader->lock,NULL);
54 cstReader->typeRegister=typeRegister;
55 ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
56 ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
57 cstReader->cstRemoteWriterSubscribed=NULL;
58 if ((oid & 0x07) == OID_SUBSCRIPTION) {
60 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
61 if (NtpTimeCmp(sp->deadline,zNtpTime)!=0) {
62 if (sp->mode==IMMEDIATE) {
64 cstReader->objectEntryOID->objectEntryAID,
65 &cstReader->deadlineTimer,
67 "CSTReaderDeadlineTimer",
68 CSTReaderDeadlineTimer,
73 if (sp->mode==PULLED) {
78 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
82 debug(53,4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
86 debug(53,10) ("CSTReaderInit: finished\n");
89 /*****************************************************************************/
91 CSTReaderDelete(ORTEDomain *d,CSTReader *cstReader) {
92 CSTRemoteWriter *cstRemoteWriter;
94 debug(53,10)("CSTReaderDelete: start\n");
95 debug(53,4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
99 //Destroy all cstRemoteReader connected on cstWriter
100 while((cstRemoteWriter=CSTRemoteWriter_first(cstReader))) {
101 CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
104 cstReader->objectEntryOID->objectEntryAID,
105 &cstReader->deadlineTimer,
108 cstReader->objectEntryOID->objectEntryAID,
109 &cstReader->persistenceTimer,
111 pthread_rwlock_destroy(&cstReader->lock);
112 debug(53,10) ("CSTReaderDelete: finished\n");
115 /*****************************************************************************/
117 CSTReaderAddRemoteWriter(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
119 CSTRemoteWriter *cstRemoteWriter;
121 cstReader->cstRemoteWriterCounter++;
122 cstRemoteWriter=(CSTRemoteWriter*)MALLOC(sizeof(CSTRemoteWriter));
123 cstRemoteWriter->guid.hid=object->objectEntryHID->hid;
124 cstRemoteWriter->guid.aid=object->objectEntryAID->aid;
125 cstRemoteWriter->guid.oid=oid;
126 cstRemoteWriter->spobject=object;
127 cstRemoteWriter->cstReader=cstReader;
128 cstRemoteWriter->csChangesCounter=0;
129 cstRemoteWriter->ACKRetriesCounter=0;
130 cstRemoteWriter->commStateACK=WAITING;
131 CSChangeFromWriter_init_root_field(cstRemoteWriter);
132 SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
133 SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
134 SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
135 ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
136 ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
137 CSTRemoteWriter_insert(cstReader,cstRemoteWriter);
138 //add event for repeatActiveTime
139 if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime,iNtpTime)!=0) {
141 cstRemoteWriter->spobject->objectEntryAID,
142 &cstRemoteWriter->repeatActiveQueryTimer,
143 1, //metatraffic timer
144 "CSTReaderQueryTimer",
146 &cstRemoteWriter->cstReader->lock,
150 if ((cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
151 ORTEPublProp *pp=(ORTEPublProp*)object->attributes;
152 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
153 cstReader->strictReliableCounter++;
155 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
156 cstReader->bestEffortsCounter++;
159 debug(53,4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
160 cstRemoteWriter->guid.hid,
161 cstRemoteWriter->guid.aid,
162 cstRemoteWriter->guid.oid);
163 return cstRemoteWriter;
166 /*****************************************************************************/
168 CSTReaderDestroyRemoteWriter(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
169 CSChangeFromWriter *csChangeFromWriter;
171 if (!cstRemoteWriter) return;
172 cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
173 debug(53,4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
174 cstRemoteWriter->guid.hid,
175 cstRemoteWriter->guid.aid,
176 cstRemoteWriter->guid.oid);
177 if ((cstRemoteWriter->cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
179 pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
180 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
181 cstRemoteWriter->cstReader->strictReliableCounter++;
183 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
184 cstRemoteWriter->cstReader->bestEffortsCounter++;
187 if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==cstRemoteWriter)
188 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed=NULL;
189 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
190 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
191 csChangeFromWriter,ORTE_FALSE);
194 cstRemoteWriter->spobject->objectEntryAID,
195 &cstRemoteWriter->delayResponceTimer,
196 1); //metatraffic timer
198 cstRemoteWriter->spobject->objectEntryAID,
199 &cstRemoteWriter->repeatActiveQueryTimer,
200 1); //metatraffic timer
201 CSTRemoteWriter_delete(cstRemoteWriter->cstReader,cstRemoteWriter);
202 FREE(cstRemoteWriter);
205 /*****************************************************************************/
207 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter,CSChange *csChange) {
208 CSChangeFromWriter *csChangeFromWriter;
210 cstRemoteWriter->csChangesCounter++;
211 cstRemoteWriter->ACKRetriesCounter=0;
212 csChangeFromWriter=(CSChangeFromWriter*)MALLOC(sizeof(CSChangeFromWriter));
213 csChangeFromWriter->csChange=csChange;
214 csChangeFromWriter->commStateChFWriter=RECEIVED;
215 CSChangeFromWriter_insert(cstRemoteWriter,csChangeFromWriter);
216 CSTReaderCSChange_insert(cstRemoteWriter->cstReader,csChange);
219 /*****************************************************************************/
221 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
222 CSChangeFromWriter *csChangeFromWriter,Boolean keepCSChange) {
224 if ((!csChangeFromWriter) || (!cstRemoteWriter)) return;
225 CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
226 csChangeFromWriter->csChange);
228 if (csChangeFromWriter->csChange->cdrCodec.buffer)
229 FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
230 parameterDelete(csChangeFromWriter->csChange);
231 FREE(csChangeFromWriter->csChange);
233 CSChangeFromWriter_delete(cstRemoteWriter,csChangeFromWriter);
234 FREE(csChangeFromWriter);
235 cstRemoteWriter->csChangesCounter--;
238 /*****************************************************************************/
240 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter,SequenceNumber *sn,
241 Boolean keepCSChange) {
242 CSChangeFromWriter *csChangeFromWriter;
244 csChangeFromWriter=CSChangeFromWriter_find(cstRemoteWriter,sn);
245 if (csChangeFromWriter) {
246 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
247 csChangeFromWriter,keepCSChange);
251 /*****************************************************************************/
253 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter) {
255 if (CSChangeFromWriter_first(cstRemoteWriter)==NULL) { //no csChanges
256 if (SeqNumberCmp(cstRemoteWriter->sn,cstRemoteWriter->lastSN)!=0) {
257 if (cstRemoteWriter->commStateACK!=WAITING) {
258 cstRemoteWriter->commStateACK=PULLING;
259 cstRemoteWriter->ACKRetriesCounter=0;
260 eventDetach(cstRemoteWriter->cstReader->domain,
261 cstRemoteWriter->spobject->objectEntryAID,
262 &cstRemoteWriter->repeatActiveQueryTimer,
263 1); //metatraffic timer
264 eventDetach(cstRemoteWriter->cstReader->domain,
265 cstRemoteWriter->spobject->objectEntryAID,
266 &cstRemoteWriter->delayResponceTimer,
267 1); //metatraffic timer
268 eventAdd(cstRemoteWriter->cstReader->domain,
269 cstRemoteWriter->spobject->objectEntryAID,
270 &cstRemoteWriter->delayResponceTimer,
271 1, //metatraffic timer
272 "CSTReaderResponceTimer",
273 CSTReaderResponceTimer,
274 &cstRemoteWriter->cstReader->lock,
276 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
279 if (cstRemoteWriter->commStateACK==PULLING) {
280 cstRemoteWriter->commStateACK=WAITING;
281 cstRemoteWriter->ACKRetriesCounter=0;
282 eventDetach(cstRemoteWriter->cstReader->domain,
283 cstRemoteWriter->spobject->objectEntryAID,
284 &cstRemoteWriter->delayResponceTimer,
285 1); //metatraffic timer
286 if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
288 eventDetach(cstRemoteWriter->cstReader->domain,
289 cstRemoteWriter->spobject->objectEntryAID,
290 &cstRemoteWriter->repeatActiveQueryTimer,
291 1); //metatraffic timer
292 eventAdd(cstRemoteWriter->cstReader->domain,
293 cstRemoteWriter->spobject->objectEntryAID,
294 &cstRemoteWriter->repeatActiveQueryTimer,
295 1, //metatraffic timer
296 "CSTReaderQueryTimer",
298 &cstRemoteWriter->cstReader->lock,
300 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
305 if (cstRemoteWriter->commStateACK==WAITING) {
306 cstRemoteWriter->commStateACK=PULLING;
307 cstRemoteWriter->ACKRetriesCounter=0;
308 eventDetach(cstRemoteWriter->cstReader->domain,
309 cstRemoteWriter->spobject->objectEntryAID,
310 &cstRemoteWriter->repeatActiveQueryTimer,
311 1); //metatraffic timer
312 eventDetach(cstRemoteWriter->cstReader->domain,
313 cstRemoteWriter->spobject->objectEntryAID,
314 &cstRemoteWriter->delayResponceTimer,
315 1); //metatraffic timer
316 eventAdd(cstRemoteWriter->cstReader->domain,
317 cstRemoteWriter->spobject->objectEntryAID,
318 &cstRemoteWriter->delayResponceTimer,
319 1, //metatraffic timer
320 "CSTReaderResponceTimer",
321 CSTReaderResponceTimer,
322 &cstRemoteWriter->cstReader->lock,
324 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);