2 * $Id: RTPSCSTReaderTimer.c,v 0.0.0.1 2003/11/03
4 * DEBUG: section 55 CSTReader timer functions
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 CSTReaderResponceTimer(ORTEDomain *d,void *vcstRemoteWriter) {
37 CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
41 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
43 if (!d->taskSend.mb.containsInfoReply) {
45 len=RTPSInfoREPLYCreate(
46 &d->taskSend.mb.cdrCodec,
48 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
50 len=RTPSInfoREPLYCreate(
51 &d->taskSend.mb.cdrCodec,
53 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
56 d->taskSend.mb.needSend=ORTE_TRUE;
59 d->taskSend.mb.containsInfoReply=ORTE_TRUE;
60 debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
61 cstRemoteWriter->cstReader->guid.oid,
62 cstRemoteWriter->guid.hid,
63 cstRemoteWriter->guid.aid);
66 &d->taskSend.mb.cdrCodec,
68 cstRemoteWriter->cstReader->guid.oid,
69 cstRemoteWriter->guid.oid,
72 //not enought space in sending buffer
73 d->taskSend.mb.needSend=ORTE_TRUE;
76 debug(55,3) ("sent: RTPS_ACKF(0x%x) to 0x%x-0x%x\n",
77 cstRemoteWriter->cstReader->guid.oid,
78 cstRemoteWriter->guid.hid,
79 cstRemoteWriter->guid.aid);
80 if (cstRemoteWriter->commStateACK==PULLING) {
82 cstRemoteWriter->spobject->objectEntryAID,
83 &cstRemoteWriter->delayResponceTimer,
85 if (cstRemoteWriter->ACKRetriesCounter<
86 cstRemoteWriter->cstReader->params.ACKMaxRetries) {
87 cstRemoteWriter->ACKRetriesCounter++;
89 cstRemoteWriter->spobject->objectEntryAID,
90 &cstRemoteWriter->delayResponceTimer,
92 "CSTReaderResponceTimer",
93 CSTReaderResponceTimer,
94 &cstRemoteWriter->cstReader->lock,
96 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
98 debug(55,3) ("sent: maxRetries ritch upper level (%d).\n",
99 cstRemoteWriter->cstReader->params.ACKMaxRetries);
102 if (cstRemoteWriter->commStateACK==ACKPENDING) {
103 cstRemoteWriter->commStateACK=WAITING;
105 cstRemoteWriter->spobject->objectEntryAID,
106 &cstRemoteWriter->repeatActiveQueryTimer,
108 if (NtpTimeCmp(cstRemoteWriter->cstReader->
109 params.repeatActiveQueryTime,iNtpTime)!=0) {
111 cstRemoteWriter->spobject->objectEntryAID,
112 &cstRemoteWriter->repeatActiveQueryTimer,
114 "CSTReaderQueryTimer",
116 &cstRemoteWriter->cstReader->lock,
118 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
124 /*****************************************************************************/
126 CSTReaderQueryTimer(ORTEDomain *d,void *vcstRemoteWriter) {
127 CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
131 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
133 if (!d->taskSend.mb.containsInfoReply) {
135 len=RTPSInfoREPLYCreate(
136 &d->taskSend.mb.cdrCodec,
138 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
140 len=RTPSInfoREPLYCreate(
141 &d->taskSend.mb.cdrCodec,
143 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
146 d->taskSend.mb.needSend=ORTE_TRUE;
149 d->taskSend.mb.containsInfoReply=ORTE_TRUE;
150 debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
151 cstRemoteWriter->cstReader->guid.oid,
152 cstRemoteWriter->guid.hid,
153 cstRemoteWriter->guid.aid);
156 &d->taskSend.mb.cdrCodec,
157 &cstRemoteWriter->sn,
158 cstRemoteWriter->cstReader->guid.oid,
159 cstRemoteWriter->guid.oid,
162 d->taskSend.mb.needSend=ORTE_TRUE;
165 debug(55,3) ("sent: RTPS_ACKf(0x%x) to 0x%x-0x%x\n",
166 cstRemoteWriter->cstReader->guid.oid,
167 cstRemoteWriter->guid.hid,
168 cstRemoteWriter->guid.aid);
170 cstRemoteWriter->spobject->objectEntryAID,
171 &cstRemoteWriter->repeatActiveQueryTimer,
173 if (NtpTimeCmp(cstRemoteWriter->cstReader->
174 params.repeatActiveQueryTime,iNtpTime)!=0) {
176 cstRemoteWriter->spobject->objectEntryAID,
177 &cstRemoteWriter->repeatActiveQueryTimer,
179 "CSTReaderQueryTimer",
181 &cstRemoteWriter->cstReader->lock,
183 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
189 /*****************************************************************************/
191 CSTReaderDeadlineTimer(ORTEDomain *d,void *vcstReader) {
192 CSTReader *cstReader=(CSTReader*)vcstReader;
196 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
197 memset(&info,0,sizeof(info));
198 info.status=DEADLINE;
199 info.topic=(char*)sp->topic;
200 info.type=(char*)sp->typeName;
201 if (cstReader->objectEntryOID->recvCallBack) {
202 cstReader->objectEntryOID->recvCallBack(&info,
203 cstReader->objectEntryOID->instance,
204 cstReader->objectEntryOID->callBackParam);
207 cstReader->objectEntryOID->objectEntryAID,
208 &cstReader->deadlineTimer,
211 cstReader->objectEntryOID->objectEntryAID,
212 &cstReader->deadlineTimer,
214 "CSTReaderDeadlineTimer",
215 CSTReaderDeadlineTimer,
222 /*****************************************************************************/
224 CSTReaderPersistenceTimer(ORTEDomain *d,void *vcstReader) {
225 CSTReader *cstReader=(CSTReader*)vcstReader;
226 CSTRemoteWriter *cstRemoteWriter;
227 CSChangeFromWriter *csChangeFromWriter;
232 if (cstReader->cstRemoteWriterSubscribed!=NULL) {
233 //keep only one csChange (last)
234 while (cstReader->cstRemoteWriterSubscribed->csChangesCounter>1) {
236 CSChangeFromWriter_first(cstReader->cstRemoteWriterSubscribed);
237 if (csChangeFromWriter) {
238 CSTReaderDestroyCSChangeFromWriter(
239 cstReader->cstRemoteWriterSubscribed,
245 cstReader->cstRemoteWriterSubscribed=NULL;
246 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
248 gavl_cust_for_each(CSTRemoteWriter,cstReader,cstRemoteWriter) {
249 pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
250 csChangeFromWriter=CSChangeFromWriter_last(cstRemoteWriter);
251 if ((pp->strength>strength) && (csChangeFromWriter!=NULL)){
252 NtpTime persistence,persistenceExpired,actTime;
253 actTime=getActualNtpTime();
254 NtpTimeAdd(persistenceExpired,
255 csChangeFromWriter->csChange->localTimeReceived,
257 if (NtpTimeCmp(persistenceExpired,actTime)>0) {
258 NtpTimeSub(persistence,
262 cstReader->objectEntryOID->objectEntryAID,
263 &cstReader->persistenceTimer,
266 cstReader->objectEntryOID->objectEntryAID,
267 &cstReader->persistenceTimer,
269 "CSTReaderPersistenceTimer",
270 CSTReaderPersistenceTimer,
274 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
278 if ((cstReader->cstRemoteWriterSubscribed!=NULL) &&
279 (sp->mode==IMMEDIATE)) {
280 CSTReaderProcCSChangesIssue(
281 cstReader->cstRemoteWriterSubscribed,ORTE_FALSE);