2 * $Id: RTPSCSTReaderTimer.c,v 0.0.0.1 2003/11/03
4 * DEBUG: section 55 CSTReader timer functions
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 CSTReaderResponceTimer(ORTEDomain *d,void *vcstRemoteWriter) {
27 CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
31 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
33 if (!d->mbSend.containsInfoReply) {
35 len=RTPSInfoREPLYCreate(
36 d->mbSend.cdrStream.bufferPtr,
37 getMaxMessageLength(d),
39 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
41 len=RTPSInfoREPLYCreate(
42 d->mbSend.cdrStream.bufferPtr,
43 getMaxMessageLength(d),
45 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
48 d->mbSend.needSend=ORTE_TRUE;
51 d->mbSend.containsInfoReply=ORTE_TRUE;
52 d->mbSend.cdrStream.bufferPtr+=len;
53 d->mbSend.cdrStream.length+=len;
54 debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
55 cstRemoteWriter->cstReader->guid.oid,
56 cstRemoteWriter->guid.hid,
57 cstRemoteWriter->guid.aid);
60 d->mbSend.cdrStream.bufferPtr,
61 getMaxMessageLength(d),
63 cstRemoteWriter->cstReader->guid.oid,
64 cstRemoteWriter->guid.oid,
67 //not enought space in sending buffer
68 d->mbSend.needSend=ORTE_TRUE;
71 d->mbSend.cdrStream.bufferPtr+=len;
72 d->mbSend.cdrStream.length+=len;
73 debug(55,3) ("sent: RTPS_ACKF(0x%x) to 0x%x-0x%x\n",
74 cstRemoteWriter->cstReader->guid.oid,
75 cstRemoteWriter->guid.hid,
76 cstRemoteWriter->guid.aid);
77 if (cstRemoteWriter->commStateACK==PULLING) {
79 cstRemoteWriter->objectEntryOID->objectEntryAID,
80 &cstRemoteWriter->delayResponceTimer,
82 if (cstRemoteWriter->ACKRetriesCounter<
83 cstRemoteWriter->cstReader->params.ACKMaxRetries) {
84 cstRemoteWriter->ACKRetriesCounter++;
86 cstRemoteWriter->objectEntryOID->objectEntryAID,
87 &cstRemoteWriter->delayResponceTimer,
89 "CSTReaderResponceTimer",
90 CSTReaderResponceTimer,
91 &cstRemoteWriter->cstReader->lock,
93 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
95 debug(55,3) ("sent: maxRetries ritch upper level (%d).\n",
96 cstRemoteWriter->cstReader->params.ACKMaxRetries);
99 if (cstRemoteWriter->commStateACK==ACKPENDING) {
100 cstRemoteWriter->commStateACK=WAITING;
102 cstRemoteWriter->objectEntryOID->objectEntryAID,
103 &cstRemoteWriter->repeatActiveQueryTimer,
105 if (NtpTimeCmp(cstRemoteWriter->cstReader->
106 params.repeatActiveQueryTime,iNtpTime)!=0) {
108 cstRemoteWriter->objectEntryOID->objectEntryAID,
109 &cstRemoteWriter->repeatActiveQueryTimer,
111 "CSTReaderQueryTimer",
113 &cstRemoteWriter->cstReader->lock,
115 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
121 /*****************************************************************************/
123 CSTReaderQueryTimer(ORTEDomain *d,void *vcstRemoteWriter) {
124 CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
128 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
130 if (!d->mbSend.containsInfoReply) {
132 len=RTPSInfoREPLYCreate(
133 d->mbSend.cdrStream.bufferPtr,
134 getMaxMessageLength(d),
136 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
138 len=RTPSInfoREPLYCreate(
139 d->mbSend.cdrStream.bufferPtr,
140 getMaxMessageLength(d),
142 ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
145 d->mbSend.needSend=ORTE_TRUE;
148 d->mbSend.containsInfoReply=ORTE_TRUE;
149 d->mbSend.cdrStream.bufferPtr+=len;
150 d->mbSend.cdrStream.length+=len;
151 debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
152 cstRemoteWriter->cstReader->guid.oid,
153 cstRemoteWriter->guid.hid,
154 cstRemoteWriter->guid.aid);
157 d->mbSend.cdrStream.bufferPtr,
158 getMaxMessageLength(d),
159 &cstRemoteWriter->sn,
160 cstRemoteWriter->cstReader->guid.oid,
161 cstRemoteWriter->guid.oid,
164 d->mbSend.needSend=ORTE_TRUE;
167 debug(55,3) ("sent: RTPS_ACKf(0x%x) to 0x%x-0x%x\n",
168 cstRemoteWriter->cstReader->guid.oid,
169 cstRemoteWriter->guid.hid,
170 cstRemoteWriter->guid.aid);
171 d->mbSend.cdrStream.bufferPtr+=len;
172 d->mbSend.cdrStream.length+=len;
174 cstRemoteWriter->objectEntryOID->objectEntryAID,
175 &cstRemoteWriter->repeatActiveQueryTimer,
177 if (NtpTimeCmp(cstRemoteWriter->cstReader->
178 params.repeatActiveQueryTime,iNtpTime)!=0) {
180 cstRemoteWriter->objectEntryOID->objectEntryAID,
181 &cstRemoteWriter->repeatActiveQueryTimer,
183 "CSTReaderQueryTimer",
185 &cstRemoteWriter->cstReader->lock,
187 &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
193 /*****************************************************************************/
195 CSTReaderDeadlineTimer(ORTEDomain *d,void *vcstReader) {
196 CSTReader *cstReader=(CSTReader*)vcstReader;
200 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
201 memset(&info,0,sizeof(info));
202 info.status=DEADLINE;
203 info.topic=sp->topic;
204 info.type=sp->typeName;
205 cstReader->objectEntryOID->recvCallBack(&info,
206 cstReader->objectEntryOID->instance,
207 cstReader->objectEntryOID->callBackParam);
209 cstReader->objectEntryOID->objectEntryAID,
210 &cstReader->deadlineTimer,
213 cstReader->objectEntryOID->objectEntryAID,
214 &cstReader->deadlineTimer,
216 "CSTReaderDeadlineTimer",
217 CSTReaderDeadlineTimer,
224 /*****************************************************************************/
226 CSTReaderPersistenceTimer(ORTEDomain *d,void *vcstReader) {
227 CSTReader *cstReader=(CSTReader*)vcstReader;
228 CSTRemoteWriter *cstRemoteWriter;
229 CSChangeFromWriter *csChangeFromWriter;
234 if (cstReader->cstRemoteWriterSubscribed!=NULL) {
235 //keep only one csChange (last)
236 while (cstReader->cstRemoteWriterSubscribed->csChangesCounter>1) {
238 CSChangeFromWriter_first(cstReader->cstRemoteWriterSubscribed);
239 if (csChangeFromWriter) {
240 CSTReaderDestroyCSChangeFromWriter(
241 cstReader->cstRemoteWriterSubscribed,
247 cstReader->cstRemoteWriterSubscribed=NULL;
248 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
250 gavl_cust_for_each(CSTRemoteWriter,cstReader,cstRemoteWriter) {
251 pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
252 csChangeFromWriter=CSChangeFromWriter_last(cstRemoteWriter);
253 if ((pp->strength>strength) && (csChangeFromWriter!=NULL)){
254 NtpTime persistence,persistenceExpired,actTime;
255 actTime=getActualNtpTime();
256 NtpTimeAdd(persistenceExpired,
257 csChangeFromWriter->csChange->localTimeReceived,
259 if (NtpTimeCmp(persistenceExpired,actTime)>0) {
260 NtpTimeSub(persistence,
264 cstReader->objectEntryOID->objectEntryAID,
265 &cstReader->persistenceTimer,
268 cstReader->objectEntryOID->objectEntryAID,
269 &cstReader->persistenceTimer,
271 "CSTReaderPersistenceTimer",
272 CSTReaderPersistenceTimer,
276 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
280 if ((cstReader->cstRemoteWriterSubscribed!=NULL) &&
281 (sp->mode==IMMEDIATE)) {
282 CSTReaderProcCSChangesIssue(
283 cstReader->cstRemoteWriterSubscribed,ORTE_FALSE);