2 * $Id: RTPSCSTReader.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 53 CSTReader
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(CSTReader,
35 CSTSubscriptions, CSTReader, GUID_RTPS,
36 cstReader, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter,
38 CSTReader, CSTRemoteWriter, GUID_RTPS,
39 cstRemoteWriter, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
41 CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
42 csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
44 /*****************************************************************************/
46 CSTReaderInit(ORTEDomain *d, CSTReader *cstReader, ObjectEntryOID *object,
47 ObjectId oid, CSTReaderParams *params, ORTETypeRegister *typeRegister)
50 debug(53, 10) ("CSTReaderInit: start\n");
51 //init values of cstReader
52 cstReader->guid.hid = object->objectEntryHID->hid;
53 cstReader->guid.aid = object->objectEntryAID->aid;
54 cstReader->guid.oid = oid;
55 cstReader->objectEntryOID = object;
56 memcpy(&cstReader->params, params, sizeof(CSTReaderParams));
57 cstReader->strictReliableCounter = 0;
58 cstReader->bestEffortsCounter = 0;
59 cstReader->cstRemoteWriterCounter = 0;
60 cstReader->createdByPattern = ORTE_FALSE;
61 CSTReaderCSChange_init_head(cstReader);
62 CSTRemoteWriter_init_root_field(cstReader);
63 pthread_rwlock_init(&cstReader->lock, NULL);
64 cstReader->domain = d;
65 cstReader->typeRegister = typeRegister;
66 ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
67 ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
68 cstReader->cstRemoteWriterSubscribed = NULL;
69 if ((oid & 0x07) == OID_SUBSCRIPTION) {
71 sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
72 if (NtpTimeCmp(sp->deadline, zNtpTime) != 0) {
73 if (sp->mode == IMMEDIATE) {
75 cstReader->objectEntryOID->objectEntryAID,
76 &cstReader->deadlineTimer,
78 "CSTReaderDeadlineTimer",
79 CSTReaderDeadlineTimer,
84 if (sp->mode == PULLED) {
89 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer, timeNext);
93 debug(53, 4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
97 debug(53, 10) ("CSTReaderInit: finished\n");
100 /*****************************************************************************/
102 CSTReaderDelete(ORTEDomain *d, CSTReader *cstReader)
104 CSTRemoteWriter *cstRemoteWriter;
106 debug(53, 10) ("CSTReaderDelete: start\n");
107 debug(53, 4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
110 cstReader->guid.oid);
111 //Destroy all cstRemoteReader connected on cstWriter
112 while ((cstRemoteWriter = CSTRemoteWriter_first(cstReader)))
113 CSTReaderDestroyRemoteWriter(d, cstRemoteWriter);
115 cstReader->objectEntryOID->objectEntryAID,
116 &cstReader->deadlineTimer,
119 cstReader->objectEntryOID->objectEntryAID,
120 &cstReader->persistenceTimer,
122 pthread_rwlock_destroy(&cstReader->lock);
123 debug(53, 10) ("CSTReaderDelete: finished\n");
126 /*****************************************************************************/
128 CSTReaderAddRemoteWriter(ORTEDomain *d, CSTReader *cstReader, ObjectEntryOID *object,
131 CSTRemoteWriter *cstRemoteWriter;
133 cstReader->cstRemoteWriterCounter++;
134 cstRemoteWriter = (CSTRemoteWriter *)MALLOC(sizeof(CSTRemoteWriter));
135 cstRemoteWriter->guid.hid = object->objectEntryHID->hid;
136 cstRemoteWriter->guid.aid = object->objectEntryAID->aid;
137 cstRemoteWriter->guid.oid = oid;
138 cstRemoteWriter->spobject = object;
139 cstRemoteWriter->cstReader = cstReader;
140 cstRemoteWriter->csChangesCounter = 0;
141 cstRemoteWriter->ACKRetriesCounter = 0;
142 cstRemoteWriter->commStateACK = WAITING;
143 CSChangeFromWriter_init_root_field(cstRemoteWriter);
144 SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
145 SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
146 SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
147 ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
148 ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
149 CSTRemoteWriter_insert(cstReader, cstRemoteWriter);
150 //add event for repeatActiveTime
151 if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime, iNtpTime) != 0) {
153 cstRemoteWriter->spobject->objectEntryAID,
154 &cstRemoteWriter->repeatActiveQueryTimer,
155 1, //metatraffic timer
156 "CSTReaderQueryTimer",
158 &cstRemoteWriter->cstReader->lock,
162 if ((cstReader->guid.oid & 0x07) == OID_SUBSCRIPTION) {
163 ORTEPublProp *pp = (ORTEPublProp *)object->attributes;
164 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT) != 0)
165 cstReader->strictReliableCounter++;
167 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
168 cstReader->bestEffortsCounter++;
171 debug(53, 4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
172 cstRemoteWriter->guid.hid,
173 cstRemoteWriter->guid.aid,
174 cstRemoteWriter->guid.oid);
175 return cstRemoteWriter;
178 /*****************************************************************************/
180 CSTReaderDestroyRemoteWriter(ORTEDomain *d, CSTRemoteWriter *cstRemoteWriter)
182 CSChangeFromWriter *csChangeFromWriter;
184 if (!cstRemoteWriter)
186 cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
187 debug(53, 4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
188 cstRemoteWriter->guid.hid,
189 cstRemoteWriter->guid.aid,
190 cstRemoteWriter->guid.oid);
191 if ((cstRemoteWriter->cstReader->guid.oid & 0x07) == OID_SUBSCRIPTION) {
193 pp = (ORTEPublProp *)cstRemoteWriter->spobject->attributes;
194 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT) != 0)
195 cstRemoteWriter->cstReader->strictReliableCounter++;
197 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
198 cstRemoteWriter->cstReader->bestEffortsCounter++;
201 if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed == cstRemoteWriter)
202 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed = NULL;
203 while ((csChangeFromWriter = CSChangeFromWriter_first(cstRemoteWriter)))
204 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
205 csChangeFromWriter, ORTE_FALSE);
207 cstRemoteWriter->spobject->objectEntryAID,
208 &cstRemoteWriter->delayResponceTimer,
209 1); //metatraffic timer
211 cstRemoteWriter->spobject->objectEntryAID,
212 &cstRemoteWriter->repeatActiveQueryTimer,
213 1); //metatraffic timer
214 CSTRemoteWriter_delete(cstRemoteWriter->cstReader, cstRemoteWriter);
215 FREE(cstRemoteWriter);
218 /*****************************************************************************/
220 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter, CSChange *csChange)
222 CSChangeFromWriter *csChangeFromWriter;
224 cstRemoteWriter->csChangesCounter++;
225 cstRemoteWriter->ACKRetriesCounter = 0;
226 csChangeFromWriter = (CSChangeFromWriter *)MALLOC(sizeof(CSChangeFromWriter));
227 csChangeFromWriter->csChange = csChange;
228 csChangeFromWriter->commStateChFWriter = RECEIVED;
229 CSChangeFromWriter_insert(cstRemoteWriter, csChangeFromWriter);
230 CSTReaderCSChange_insert(cstRemoteWriter->cstReader, csChange);
233 /*****************************************************************************/
235 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
236 CSChangeFromWriter *csChangeFromWriter, Boolean keepCSChange)
239 if ((!csChangeFromWriter) || (!cstRemoteWriter))
241 CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
242 csChangeFromWriter->csChange);
244 if (csChangeFromWriter->csChange->cdrCodec.buffer)
245 FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
246 parameterDelete(csChangeFromWriter->csChange);
247 FREE(csChangeFromWriter->csChange);
249 CSChangeFromWriter_delete(cstRemoteWriter, csChangeFromWriter);
250 FREE(csChangeFromWriter);
251 cstRemoteWriter->csChangesCounter--;
254 /*****************************************************************************/
256 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter, SequenceNumber *sn,
257 Boolean keepCSChange)
259 CSChangeFromWriter *csChangeFromWriter;
261 csChangeFromWriter = CSChangeFromWriter_find(cstRemoteWriter, sn);
262 if (csChangeFromWriter) {
263 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
264 csChangeFromWriter, keepCSChange);
268 /*****************************************************************************/
270 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter)
273 if (CSChangeFromWriter_first(cstRemoteWriter) == NULL) { //no csChanges
274 if (SeqNumberCmp(cstRemoteWriter->sn, cstRemoteWriter->lastSN) != 0) {
275 if (cstRemoteWriter->commStateACK != WAITING) {
276 cstRemoteWriter->commStateACK = PULLING;
277 cstRemoteWriter->ACKRetriesCounter = 0;
278 eventDetach(cstRemoteWriter->cstReader->domain,
279 cstRemoteWriter->spobject->objectEntryAID,
280 &cstRemoteWriter->repeatActiveQueryTimer,
281 1); //metatraffic timer
282 eventDetach(cstRemoteWriter->cstReader->domain,
283 cstRemoteWriter->spobject->objectEntryAID,
284 &cstRemoteWriter->delayResponceTimer,
285 1); //metatraffic timer
286 eventAdd(cstRemoteWriter->cstReader->domain,
287 cstRemoteWriter->spobject->objectEntryAID,
288 &cstRemoteWriter->delayResponceTimer,
289 1, //metatraffic timer
290 "CSTReaderResponceTimer",
291 CSTReaderResponceTimer,
292 &cstRemoteWriter->cstReader->lock,
294 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
297 if (cstRemoteWriter->commStateACK == PULLING) {
298 cstRemoteWriter->commStateACK = WAITING;
299 cstRemoteWriter->ACKRetriesCounter = 0;
300 eventDetach(cstRemoteWriter->cstReader->domain,
301 cstRemoteWriter->spobject->objectEntryAID,
302 &cstRemoteWriter->delayResponceTimer,
303 1); //metatraffic timer
304 if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
306 eventDetach(cstRemoteWriter->cstReader->domain,
307 cstRemoteWriter->spobject->objectEntryAID,
308 &cstRemoteWriter->repeatActiveQueryTimer,
309 1); //metatraffic timer
310 eventAdd(cstRemoteWriter->cstReader->domain,
311 cstRemoteWriter->spobject->objectEntryAID,
312 &cstRemoteWriter->repeatActiveQueryTimer,
313 1, //metatraffic timer
314 "CSTReaderQueryTimer",
316 &cstRemoteWriter->cstReader->lock,
318 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
323 if (cstRemoteWriter->commStateACK == WAITING) {
324 cstRemoteWriter->commStateACK = PULLING;
325 cstRemoteWriter->ACKRetriesCounter = 0;
326 eventDetach(cstRemoteWriter->cstReader->domain,
327 cstRemoteWriter->spobject->objectEntryAID,
328 &cstRemoteWriter->repeatActiveQueryTimer,
329 1); //metatraffic timer
330 eventDetach(cstRemoteWriter->cstReader->domain,
331 cstRemoteWriter->spobject->objectEntryAID,
332 &cstRemoteWriter->delayResponceTimer,
333 1); //metatraffic timer
334 eventAdd(cstRemoteWriter->cstReader->domain,
335 cstRemoteWriter->spobject->objectEntryAID,
336 &cstRemoteWriter->delayResponceTimer,
337 1, //metatraffic timer
338 "CSTReaderResponceTimer",
339 CSTReaderResponceTimer,
340 &cstRemoteWriter->cstReader->lock,
342 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);