2 * $Id: RTPSCSTReader.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 53 CSTReader
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(CSTReader,
35 CSTSubscriptions, CSTReader, GUID_RTPS,
36 cstReader, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter,
38 CSTReader, CSTRemoteWriter, GUID_RTPS,
39 cstRemoteWriter, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
41 CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
42 csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
44 /*****************************************************************************/
46 CSTReaderInit(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
47 ObjectId oid,CSTReaderParams *params,ORTETypeRegister *typeRegister) {
49 debug(53,10) ("CSTReaderInit: start\n");
50 //init values of cstReader
51 cstReader->guid.hid=object->objectEntryHID->hid;
52 cstReader->guid.aid=object->objectEntryAID->aid;
53 cstReader->guid.oid=oid;
54 cstReader->objectEntryOID=object;
55 memcpy(&cstReader->params,params,sizeof(CSTReaderParams));
56 cstReader->strictReliableCounter=0;
57 cstReader->bestEffortsCounter=0;
58 cstReader->cstRemoteWriterCounter=0;
59 cstReader->createdByPattern=ORTE_FALSE;
60 CSTReaderCSChange_init_head(cstReader);
61 CSTRemoteWriter_init_root_field(cstReader);
62 pthread_rwlock_init(&cstReader->lock,NULL);
64 cstReader->typeRegister=typeRegister;
65 ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
66 ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
67 cstReader->cstRemoteWriterSubscribed=NULL;
68 if ((oid & 0x07) == OID_SUBSCRIPTION) {
70 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
71 if (NtpTimeCmp(sp->deadline,zNtpTime)!=0) {
72 if (sp->mode==IMMEDIATE) {
74 cstReader->objectEntryOID->objectEntryAID,
75 &cstReader->deadlineTimer,
77 "CSTReaderDeadlineTimer",
78 CSTReaderDeadlineTimer,
83 if (sp->mode==PULLED) {
88 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
92 debug(53,4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
96 debug(53,10) ("CSTReaderInit: finished\n");
99 /*****************************************************************************/
101 CSTReaderDelete(ORTEDomain *d,CSTReader *cstReader) {
102 CSTRemoteWriter *cstRemoteWriter;
104 debug(53,10)("CSTReaderDelete: start\n");
105 debug(53,4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
108 cstReader->guid.oid);
109 //Destroy all cstRemoteReader connected on cstWriter
110 while((cstRemoteWriter=CSTRemoteWriter_first(cstReader))) {
111 CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
114 cstReader->objectEntryOID->objectEntryAID,
115 &cstReader->deadlineTimer,
118 cstReader->objectEntryOID->objectEntryAID,
119 &cstReader->persistenceTimer,
121 pthread_rwlock_destroy(&cstReader->lock);
122 debug(53,10) ("CSTReaderDelete: finished\n");
125 /*****************************************************************************/
127 CSTReaderAddRemoteWriter(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
129 CSTRemoteWriter *cstRemoteWriter;
131 cstReader->cstRemoteWriterCounter++;
132 cstRemoteWriter=(CSTRemoteWriter*)MALLOC(sizeof(CSTRemoteWriter));
133 cstRemoteWriter->guid.hid=object->objectEntryHID->hid;
134 cstRemoteWriter->guid.aid=object->objectEntryAID->aid;
135 cstRemoteWriter->guid.oid=oid;
136 cstRemoteWriter->spobject=object;
137 cstRemoteWriter->cstReader=cstReader;
138 cstRemoteWriter->csChangesCounter=0;
139 cstRemoteWriter->ACKRetriesCounter=0;
140 cstRemoteWriter->commStateACK=WAITING;
141 CSChangeFromWriter_init_root_field(cstRemoteWriter);
142 SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
143 SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
144 SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
145 ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
146 ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
147 CSTRemoteWriter_insert(cstReader,cstRemoteWriter);
148 //add event for repeatActiveTime
149 if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime,iNtpTime)!=0) {
151 cstRemoteWriter->spobject->objectEntryAID,
152 &cstRemoteWriter->repeatActiveQueryTimer,
153 1, //metatraffic timer
154 "CSTReaderQueryTimer",
156 &cstRemoteWriter->cstReader->lock,
160 if ((cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
161 ORTEPublProp *pp=(ORTEPublProp*)object->attributes;
162 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
163 cstReader->strictReliableCounter++;
165 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
166 cstReader->bestEffortsCounter++;
169 debug(53,4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
170 cstRemoteWriter->guid.hid,
171 cstRemoteWriter->guid.aid,
172 cstRemoteWriter->guid.oid);
173 return cstRemoteWriter;
176 /*****************************************************************************/
178 CSTReaderDestroyRemoteWriter(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
179 CSChangeFromWriter *csChangeFromWriter;
181 if (!cstRemoteWriter) return;
182 cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
183 debug(53,4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
184 cstRemoteWriter->guid.hid,
185 cstRemoteWriter->guid.aid,
186 cstRemoteWriter->guid.oid);
187 if ((cstRemoteWriter->cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
189 pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
190 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
191 cstRemoteWriter->cstReader->strictReliableCounter++;
193 if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
194 cstRemoteWriter->cstReader->bestEffortsCounter++;
197 if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==cstRemoteWriter)
198 cstRemoteWriter->cstReader->cstRemoteWriterSubscribed=NULL;
199 while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
200 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
201 csChangeFromWriter,ORTE_FALSE);
204 cstRemoteWriter->spobject->objectEntryAID,
205 &cstRemoteWriter->delayResponceTimer,
206 1); //metatraffic timer
208 cstRemoteWriter->spobject->objectEntryAID,
209 &cstRemoteWriter->repeatActiveQueryTimer,
210 1); //metatraffic timer
211 CSTRemoteWriter_delete(cstRemoteWriter->cstReader,cstRemoteWriter);
212 FREE(cstRemoteWriter);
215 /*****************************************************************************/
217 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter,CSChange *csChange) {
218 CSChangeFromWriter *csChangeFromWriter;
220 cstRemoteWriter->csChangesCounter++;
221 cstRemoteWriter->ACKRetriesCounter=0;
222 csChangeFromWriter=(CSChangeFromWriter*)MALLOC(sizeof(CSChangeFromWriter));
223 csChangeFromWriter->csChange=csChange;
224 csChangeFromWriter->commStateChFWriter=RECEIVED;
225 CSChangeFromWriter_insert(cstRemoteWriter,csChangeFromWriter);
226 CSTReaderCSChange_insert(cstRemoteWriter->cstReader,csChange);
229 /*****************************************************************************/
231 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
232 CSChangeFromWriter *csChangeFromWriter,Boolean keepCSChange) {
234 if ((!csChangeFromWriter) || (!cstRemoteWriter)) return;
235 CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
236 csChangeFromWriter->csChange);
238 if (csChangeFromWriter->csChange->cdrCodec.buffer)
239 FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
240 parameterDelete(csChangeFromWriter->csChange);
241 FREE(csChangeFromWriter->csChange);
243 CSChangeFromWriter_delete(cstRemoteWriter,csChangeFromWriter);
244 FREE(csChangeFromWriter);
245 cstRemoteWriter->csChangesCounter--;
248 /*****************************************************************************/
250 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter,SequenceNumber *sn,
251 Boolean keepCSChange) {
252 CSChangeFromWriter *csChangeFromWriter;
254 csChangeFromWriter=CSChangeFromWriter_find(cstRemoteWriter,sn);
255 if (csChangeFromWriter) {
256 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
257 csChangeFromWriter,keepCSChange);
261 /*****************************************************************************/
263 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter) {
265 if (CSChangeFromWriter_first(cstRemoteWriter)==NULL) { //no csChanges
266 if (SeqNumberCmp(cstRemoteWriter->sn,cstRemoteWriter->lastSN)!=0) {
267 if (cstRemoteWriter->commStateACK!=WAITING) {
268 cstRemoteWriter->commStateACK=PULLING;
269 cstRemoteWriter->ACKRetriesCounter=0;
270 eventDetach(cstRemoteWriter->cstReader->domain,
271 cstRemoteWriter->spobject->objectEntryAID,
272 &cstRemoteWriter->repeatActiveQueryTimer,
273 1); //metatraffic timer
274 eventDetach(cstRemoteWriter->cstReader->domain,
275 cstRemoteWriter->spobject->objectEntryAID,
276 &cstRemoteWriter->delayResponceTimer,
277 1); //metatraffic timer
278 eventAdd(cstRemoteWriter->cstReader->domain,
279 cstRemoteWriter->spobject->objectEntryAID,
280 &cstRemoteWriter->delayResponceTimer,
281 1, //metatraffic timer
282 "CSTReaderResponceTimer",
283 CSTReaderResponceTimer,
284 &cstRemoteWriter->cstReader->lock,
286 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
289 if (cstRemoteWriter->commStateACK==PULLING) {
290 cstRemoteWriter->commStateACK=WAITING;
291 cstRemoteWriter->ACKRetriesCounter=0;
292 eventDetach(cstRemoteWriter->cstReader->domain,
293 cstRemoteWriter->spobject->objectEntryAID,
294 &cstRemoteWriter->delayResponceTimer,
295 1); //metatraffic timer
296 if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
298 eventDetach(cstRemoteWriter->cstReader->domain,
299 cstRemoteWriter->spobject->objectEntryAID,
300 &cstRemoteWriter->repeatActiveQueryTimer,
301 1); //metatraffic timer
302 eventAdd(cstRemoteWriter->cstReader->domain,
303 cstRemoteWriter->spobject->objectEntryAID,
304 &cstRemoteWriter->repeatActiveQueryTimer,
305 1, //metatraffic timer
306 "CSTReaderQueryTimer",
308 &cstRemoteWriter->cstReader->lock,
310 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
315 if (cstRemoteWriter->commStateACK==WAITING) {
316 cstRemoteWriter->commStateACK=PULLING;
317 cstRemoteWriter->ACKRetriesCounter=0;
318 eventDetach(cstRemoteWriter->cstReader->domain,
319 cstRemoteWriter->spobject->objectEntryAID,
320 &cstRemoteWriter->repeatActiveQueryTimer,
321 1); //metatraffic timer
322 eventDetach(cstRemoteWriter->cstReader->domain,
323 cstRemoteWriter->spobject->objectEntryAID,
324 &cstRemoteWriter->delayResponceTimer,
325 1); //metatraffic timer
326 eventAdd(cstRemoteWriter->cstReader->domain,
327 cstRemoteWriter->spobject->objectEntryAID,
328 &cstRemoteWriter->delayResponceTimer,
329 1, //metatraffic timer
330 "CSTReaderResponceTimer",
331 CSTReaderResponceTimer,
332 &cstRemoteWriter->cstReader->lock,
334 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);