]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderTimer.c
documentation patch
[orte.git] / orte / liborte / RTPSCSTReaderTimer.c
1 /*
2  *  $Id: RTPSCSTReaderTimer.c,v 0.0.0.1 2003/11/03 
3  *
4  *  DEBUG:  section 55                  CSTReader timer functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 int 
26 CSTReaderResponceTimer(ORTEDomain *d,void *vcstRemoteWriter) {
27   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
28   int             len;
29   char            queue=1;
30   
31   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
32     queue=2;
33   if (!d->mbSend.containsInfoReply) { 
34     if (queue==1) {
35       len=RTPSInfoREPLYCreate(
36           d->mbSend.cdrStream.bufferPtr,
37           getMaxMessageLength(d),
38           IPADDRESS_INVALID,
39           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
40     } else {
41       len=RTPSInfoREPLYCreate(
42           d->mbSend.cdrStream.bufferPtr,
43           getMaxMessageLength(d),
44           IPADDRESS_INVALID,
45           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
46     }
47     if (len<0) {
48       d->mbSend.needSend=ORTE_TRUE;
49       return 1;
50     }
51     d->mbSend.containsInfoReply=ORTE_TRUE;  
52     d->mbSend.cdrStream.bufferPtr+=len;
53     d->mbSend.cdrStream.length+=len;
54     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
55                   cstRemoteWriter->cstReader->guid.oid,
56                   cstRemoteWriter->guid.hid,
57                   cstRemoteWriter->guid.aid);
58   }
59   len=RTPSAckCreate(
60        d->mbSend.cdrStream.bufferPtr,
61        getMaxMessageLength(d),
62        &cstRemoteWriter->sn,
63        cstRemoteWriter->cstReader->guid.oid,
64        cstRemoteWriter->guid.oid,
65        ORTE_TRUE);
66   if (len<0) {
67     //not enought space in sending buffer
68     d->mbSend.needSend=ORTE_TRUE;
69     return 1;
70   }
71   d->mbSend.cdrStream.bufferPtr+=len;
72   d->mbSend.cdrStream.length+=len;
73   debug(55,3) ("sent: RTPS_ACKF(0x%x) to 0x%x-0x%x\n",
74                 cstRemoteWriter->cstReader->guid.oid,
75                 cstRemoteWriter->guid.hid,
76                 cstRemoteWriter->guid.aid);
77   if (cstRemoteWriter->commStateACK==PULLING) {
78     eventDetach(d,
79         cstRemoteWriter->objectEntryOID->objectEntryAID,
80         &cstRemoteWriter->delayResponceTimer,
81         queue); 
82     if (cstRemoteWriter->ACKRetriesCounter<
83         cstRemoteWriter->cstReader->params.ACKMaxRetries) {
84       cstRemoteWriter->ACKRetriesCounter++;
85       eventAdd(d,
86           cstRemoteWriter->objectEntryOID->objectEntryAID,
87           &cstRemoteWriter->delayResponceTimer,
88           queue,
89           "CSTReaderResponceTimer",
90           CSTReaderResponceTimer,
91           &cstRemoteWriter->cstReader->lock,
92           cstRemoteWriter,
93           &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
94     } else {
95       debug(55,3) ("sent: maxRetries ritch upper level (%d).\n",
96                     cstRemoteWriter->cstReader->params.ACKMaxRetries);
97     }
98   }
99   if (cstRemoteWriter->commStateACK==ACKPENDING) { 
100     cstRemoteWriter->commStateACK=WAITING;
101     eventDetach(d,
102         cstRemoteWriter->objectEntryOID->objectEntryAID,
103         &cstRemoteWriter->repeatActiveQueryTimer,
104         queue); 
105     if (NtpTimeCmp(cstRemoteWriter->cstReader->
106                    params.repeatActiveQueryTime,iNtpTime)!=0) {
107       eventAdd(d,
108           cstRemoteWriter->objectEntryOID->objectEntryAID,
109           &cstRemoteWriter->repeatActiveQueryTimer,
110           queue,
111           "CSTReaderQueryTimer",
112           CSTReaderQueryTimer,
113           &cstRemoteWriter->cstReader->lock,
114           cstRemoteWriter,
115           &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
116     }
117   }
118   return 0;
119 }
120
121 /*****************************************************************************/
122 int
123 CSTReaderQueryTimer(ORTEDomain *d,void *vcstRemoteWriter) {
124   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
125   int             len;
126   char            queue=1;
127   
128   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
129     queue=2;  
130   if (!d->mbSend.containsInfoReply) { 
131     if (queue==1) {
132       len=RTPSInfoREPLYCreate(
133           d->mbSend.cdrStream.bufferPtr,
134           getMaxMessageLength(d),
135           IPADDRESS_INVALID,
136           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
137     } else {
138       len=RTPSInfoREPLYCreate(
139           d->mbSend.cdrStream.bufferPtr,
140           getMaxMessageLength(d),
141           IPADDRESS_INVALID,
142           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
143     }
144     if (len<0) {
145       d->mbSend.needSend=ORTE_TRUE;
146       return 1;
147     }
148     d->mbSend.containsInfoReply=ORTE_TRUE;  
149     d->mbSend.cdrStream.bufferPtr+=len;
150     d->mbSend.cdrStream.length+=len;
151     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
152                   cstRemoteWriter->cstReader->guid.oid,
153                   cstRemoteWriter->guid.hid,
154                   cstRemoteWriter->guid.aid);
155   }
156   len=RTPSAckCreate(
157       d->mbSend.cdrStream.bufferPtr,
158       getMaxMessageLength(d),
159       &cstRemoteWriter->sn,
160       cstRemoteWriter->cstReader->guid.oid,
161       cstRemoteWriter->guid.oid,
162       ORTE_FALSE);
163   if (len<0) {
164     d->mbSend.needSend=ORTE_TRUE;
165     return 1;
166   }
167   debug(55,3) ("sent: RTPS_ACKf(0x%x) to 0x%x-0x%x\n",
168                 cstRemoteWriter->cstReader->guid.oid,
169                 cstRemoteWriter->guid.hid,
170                 cstRemoteWriter->guid.aid);
171   d->mbSend.cdrStream.bufferPtr+=len;
172   d->mbSend.cdrStream.length+=len;
173   eventDetach(d,
174       cstRemoteWriter->objectEntryOID->objectEntryAID,
175       &cstRemoteWriter->repeatActiveQueryTimer,
176       queue);   
177   if (NtpTimeCmp(cstRemoteWriter->cstReader->
178                  params.repeatActiveQueryTime,iNtpTime)!=0) {
179     eventAdd(d,
180         cstRemoteWriter->objectEntryOID->objectEntryAID,
181         &cstRemoteWriter->repeatActiveQueryTimer,
182         queue,
183         "CSTReaderQueryTimer",
184         CSTReaderQueryTimer,
185         &cstRemoteWriter->cstReader->lock,
186         cstRemoteWriter,
187         &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
188   }
189   return 0; 
190 }
191
192
193 /*****************************************************************************/
194 int
195 CSTReaderDeadlineTimer(ORTEDomain *d,void *vcstReader) {
196   CSTReader            *cstReader=(CSTReader*)vcstReader;
197   ORTESubsProp         *sp;
198   ORTERecvInfo         info;  
199   
200   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
201   memset(&info,0,sizeof(info));
202   info.status=DEADLINE;
203   info.topic=sp->topic;
204   info.type=sp->typeName;
205   cstReader->objectEntryOID->recvCallBack(&info,
206       cstReader->objectEntryOID->instance,
207       cstReader->objectEntryOID->callBackParam);
208   eventDetach(d,
209       cstReader->objectEntryOID->objectEntryAID,
210       &cstReader->deadlineTimer,
211       0);
212   eventAdd(d,
213       cstReader->objectEntryOID->objectEntryAID,
214       &cstReader->deadlineTimer,
215       0,   //common timer
216       "CSTReaderDeadlineTimer",
217       CSTReaderDeadlineTimer,
218       &cstReader->lock,
219       cstReader,
220       &sp->deadline);
221   return 0;
222 }
223
224 /*****************************************************************************/
225 int
226 CSTReaderPersistenceTimer(ORTEDomain *d,void *vcstReader) {
227   CSTReader            *cstReader=(CSTReader*)vcstReader;
228   CSTRemoteWriter      *cstRemoteWriter;
229   CSChangeFromWriter   *csChangeFromWriter;
230   ORTESubsProp         *sp;
231   ORTEPublProp         *pp;
232   int32_t            strength;
233   
234   if (cstReader->cstRemoteWriterSubscribed!=NULL) {
235     //keep only one csChange (last)
236     while (cstReader->cstRemoteWriterSubscribed->csChangesCounter>1) {
237       csChangeFromWriter=
238         CSChangeFromWriter_first(cstReader->cstRemoteWriterSubscribed);
239       if (csChangeFromWriter) {  
240         CSTReaderDestroyCSChangeFromWriter(
241             cstReader->cstRemoteWriterSubscribed,
242             csChangeFromWriter,
243             ORTE_FALSE);
244       }
245     }
246   }
247   cstReader->cstRemoteWriterSubscribed=NULL;
248   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
249   strength=0;
250   gavl_cust_for_each(CSTRemoteWriter,cstReader,cstRemoteWriter) {
251     pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
252     csChangeFromWriter=CSChangeFromWriter_last(cstRemoteWriter);
253     if ((pp->strength>strength) && (csChangeFromWriter!=NULL)){
254       NtpTime persistence,persistenceExpired,actTime;
255       actTime=getActualNtpTime();
256       NtpTimeAdd(persistenceExpired,
257                  csChangeFromWriter->csChange->localTimeReceived,
258                  pp->persistence);
259       if (NtpTimeCmp(persistenceExpired,actTime)>0) {
260         NtpTimeSub(persistence,
261                    persistenceExpired,
262                    actTime);
263         eventDetach(d,
264             cstReader->objectEntryOID->objectEntryAID,
265             &cstReader->persistenceTimer,
266             0);   //common timer
267         eventAdd(d,
268             cstReader->objectEntryOID->objectEntryAID,
269             &cstReader->persistenceTimer,
270             0,   //common timer
271             "CSTReaderPersistenceTimer",
272             CSTReaderPersistenceTimer,
273             &cstReader->lock,
274             cstReader,
275             &persistence);
276         cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
277       }
278     }
279   }
280   if ((cstReader->cstRemoteWriterSubscribed!=NULL) && 
281       (sp->mode==IMMEDIATE)) {
282     CSTReaderProcCSChangesIssue(
283         cstReader->cstRemoteWriterSubscribed,ORTE_FALSE);
284   }
285   return 0;
286 }