]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderTimer.c
ad3ccb5c10efabe61497c667e6c65001ae10ad4d
[orte.git] / orte / liborte / RTPSCSTReaderTimer.c
1 /*
2  *  $Id: RTPSCSTReaderTimer.c,v 0.0.0.1 2003/11/03 
3  *
4  *  DEBUG:  section 55                  CSTReader timer functions
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 int 
36 CSTReaderResponceTimer(ORTEDomain *d,void *vcstRemoteWriter) {
37   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
38   int             len;
39   char            queue=1;
40   
41   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
42     queue=2;
43   if (!d->taskSend.mb.containsInfoReply) { 
44     if (queue==1) {
45       len=RTPSInfoREPLYCreate(
46           &d->taskSend.mb.cdrCodec,
47           IPADDRESS_INVALID,
48           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
49     } else {
50       len=RTPSInfoREPLYCreate(
51           &d->taskSend.mb.cdrCodec,
52           IPADDRESS_INVALID,
53           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
54     }
55     if (len<0) {
56       d->taskSend.mb.needSend=ORTE_TRUE;
57       return 1;
58     }
59     d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
60     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
61                   cstRemoteWriter->cstReader->guid.oid,
62                   cstRemoteWriter->guid.hid,
63                   cstRemoteWriter->guid.aid);
64   }
65   len=RTPSAckCreate(
66        &d->taskSend.mb.cdrCodec,
67        &cstRemoteWriter->sn,
68        cstRemoteWriter->cstReader->guid.oid,
69        cstRemoteWriter->guid.oid,
70        ORTE_TRUE);
71   if (len<0) {
72     //not enought space in sending buffer
73     d->taskSend.mb.needSend=ORTE_TRUE;
74     return 1;
75   }
76   debug(55,3) ("sent: RTPS_ACKF(0x%x) to 0x%x-0x%x\n",
77                 cstRemoteWriter->cstReader->guid.oid,
78                 cstRemoteWriter->guid.hid,
79                 cstRemoteWriter->guid.aid);
80   if (cstRemoteWriter->commStateACK==PULLING) {
81     eventDetach(d,
82         cstRemoteWriter->spobject->objectEntryAID,
83         &cstRemoteWriter->delayResponceTimer,
84         queue); 
85     if (cstRemoteWriter->ACKRetriesCounter<
86         cstRemoteWriter->cstReader->params.ACKMaxRetries) {
87       cstRemoteWriter->ACKRetriesCounter++;
88       eventAdd(d,
89           cstRemoteWriter->spobject->objectEntryAID,
90           &cstRemoteWriter->delayResponceTimer,
91           queue,
92           "CSTReaderResponceTimer",
93           CSTReaderResponceTimer,
94           &cstRemoteWriter->cstReader->lock,
95           cstRemoteWriter,
96           &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
97     } else {
98       debug(55,3) ("sent: maxRetries ritch upper level (%d).\n",
99                     cstRemoteWriter->cstReader->params.ACKMaxRetries);
100     }
101   }
102   if (cstRemoteWriter->commStateACK==ACKPENDING) { 
103     cstRemoteWriter->commStateACK=WAITING;
104     eventDetach(d,
105         cstRemoteWriter->spobject->objectEntryAID,
106         &cstRemoteWriter->repeatActiveQueryTimer,
107         queue); 
108     if (NtpTimeCmp(cstRemoteWriter->cstReader->
109                    params.repeatActiveQueryTime,iNtpTime)!=0) {
110       eventAdd(d,
111           cstRemoteWriter->spobject->objectEntryAID,
112           &cstRemoteWriter->repeatActiveQueryTimer,
113           queue,
114           "CSTReaderQueryTimer",
115           CSTReaderQueryTimer,
116           &cstRemoteWriter->cstReader->lock,
117           cstRemoteWriter,
118           &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
119     }
120   }
121   return 0;
122 }
123
124 /*****************************************************************************/
125 int
126 CSTReaderQueryTimer(ORTEDomain *d,void *vcstRemoteWriter) {
127   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
128   int             len;
129   char            queue=1;
130   
131   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
132     queue=2;  
133   if (!d->taskSend.mb.containsInfoReply) { 
134     if (queue==1) {
135       len=RTPSInfoREPLYCreate(
136           &d->taskSend.mb.cdrCodec,
137           IPADDRESS_INVALID,
138           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
139     } else {
140       len=RTPSInfoREPLYCreate(
141           &d->taskSend.mb.cdrCodec,
142           IPADDRESS_INVALID,
143           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
144     }
145     if (len<0) {
146       d->taskSend.mb.needSend=ORTE_TRUE;
147       return 1;
148     }
149     d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
150     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
151                   cstRemoteWriter->cstReader->guid.oid,
152                   cstRemoteWriter->guid.hid,
153                   cstRemoteWriter->guid.aid);
154   }
155   len=RTPSAckCreate(
156       &d->taskSend.mb.cdrCodec,
157       &cstRemoteWriter->sn,
158       cstRemoteWriter->cstReader->guid.oid,
159       cstRemoteWriter->guid.oid,
160       ORTE_FALSE);
161   if (len<0) {
162     d->taskSend.mb.needSend=ORTE_TRUE;
163     return 1;
164   }
165   debug(55,3) ("sent: RTPS_ACKf(0x%x) to 0x%x-0x%x\n",
166                 cstRemoteWriter->cstReader->guid.oid,
167                 cstRemoteWriter->guid.hid,
168                 cstRemoteWriter->guid.aid);
169   eventDetach(d,
170       cstRemoteWriter->spobject->objectEntryAID,
171       &cstRemoteWriter->repeatActiveQueryTimer,
172       queue);   
173   if (NtpTimeCmp(cstRemoteWriter->cstReader->
174                  params.repeatActiveQueryTime,iNtpTime)!=0) {
175     eventAdd(d,
176         cstRemoteWriter->spobject->objectEntryAID,
177         &cstRemoteWriter->repeatActiveQueryTimer,
178         queue,
179         "CSTReaderQueryTimer",
180         CSTReaderQueryTimer,
181         &cstRemoteWriter->cstReader->lock,
182         cstRemoteWriter,
183         &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
184   }
185   return 0; 
186 }
187
188
189 /*****************************************************************************/
190 int
191 CSTReaderDeadlineTimer(ORTEDomain *d,void *vcstReader) {
192   CSTReader            *cstReader=(CSTReader*)vcstReader;
193   ORTESubsProp         *sp;
194   ORTERecvInfo         info;  
195   
196   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
197   memset(&info,0,sizeof(info));
198   info.status=DEADLINE;
199   info.topic=(char*)sp->topic;
200   info.type=(char*)sp->typeName;
201   if (cstReader->objectEntryOID->recvCallBack) {
202     cstReader->objectEntryOID->recvCallBack(&info,
203         cstReader->objectEntryOID->instance,
204         cstReader->objectEntryOID->callBackParam);
205   }
206   eventDetach(d,
207       cstReader->objectEntryOID->objectEntryAID,
208       &cstReader->deadlineTimer,
209       0);
210   eventAdd(d,
211       cstReader->objectEntryOID->objectEntryAID,
212       &cstReader->deadlineTimer,
213       0,   //common timer
214       "CSTReaderDeadlineTimer",
215       CSTReaderDeadlineTimer,
216       &cstReader->lock,
217       cstReader,
218       &sp->deadline);
219   return 0;
220 }
221
222 /*****************************************************************************/
223 int
224 CSTReaderPersistenceTimer(ORTEDomain *d,void *vcstReader) {
225   CSTReader            *cstReader=(CSTReader*)vcstReader;
226   CSTRemoteWriter      *cstRemoteWriter;
227   CSChangeFromWriter   *csChangeFromWriter;
228   ORTESubsProp         *sp;
229   ORTEPublProp         *pp;
230   int32_t            strength;
231   
232   if (cstReader->cstRemoteWriterSubscribed!=NULL) {
233     //keep only one csChange (last)
234     while (cstReader->cstRemoteWriterSubscribed->csChangesCounter>1) {
235       csChangeFromWriter=
236         CSChangeFromWriter_first(cstReader->cstRemoteWriterSubscribed);
237       if (csChangeFromWriter) {  
238         CSTReaderDestroyCSChangeFromWriter(
239             cstReader->cstRemoteWriterSubscribed,
240             csChangeFromWriter,
241             ORTE_FALSE);
242       }
243     }
244   }
245   cstReader->cstRemoteWriterSubscribed=NULL;
246   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
247   strength=0;
248   gavl_cust_for_each(CSTRemoteWriter,cstReader,cstRemoteWriter) {
249     pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
250     csChangeFromWriter=CSChangeFromWriter_last(cstRemoteWriter);
251     if ((pp->strength>strength) && (csChangeFromWriter!=NULL)){
252       NtpTime persistence,persistenceExpired,actTime;
253       actTime=getActualNtpTime();
254       NtpTimeAdd(persistenceExpired,
255                  csChangeFromWriter->csChange->localTimeReceived,
256                  pp->persistence);
257       if (NtpTimeCmp(persistenceExpired,actTime)>0) {
258         NtpTimeSub(persistence,
259                    persistenceExpired,
260                    actTime);
261         eventDetach(d,
262             cstReader->objectEntryOID->objectEntryAID,
263             &cstReader->persistenceTimer,
264             0);   //common timer
265         eventAdd(d,
266             cstReader->objectEntryOID->objectEntryAID,
267             &cstReader->persistenceTimer,
268             0,   //common timer
269             "CSTReaderPersistenceTimer",
270             CSTReaderPersistenceTimer,
271             &cstReader->lock,
272             cstReader,
273             &persistence);
274         cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
275       }
276     }
277   }
278   if ((cstReader->cstRemoteWriterSubscribed!=NULL) && 
279       (sp->mode==IMMEDIATE)) {
280     CSTReaderProcCSChangesIssue(
281         cstReader->cstRemoteWriterSubscribed,ORTE_FALSE);
282   }
283   return 0;
284 }