]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderTimer.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSCSTReaderTimer.c
1 /*
2  *  $Id: RTPSCSTReaderTimer.c,v 0.0.0.1 2003/11/03 
3  *
4  *  DEBUG:  section 55                  CSTReader timer functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 int 
26 CSTReaderResponceTimer(ORTEDomain *d,void *vcstRemoteWriter) {
27   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
28   int             len;
29   char            queue=1;
30   
31   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
32     queue=2;
33   if (!d->taskSend.mb.containsInfoReply) { 
34     if (queue==1) {
35       len=RTPSInfoREPLYCreate(
36           &d->taskSend.mb.cdrCodec,
37           IPADDRESS_INVALID,
38           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
39     } else {
40       len=RTPSInfoREPLYCreate(
41           &d->taskSend.mb.cdrCodec,
42           IPADDRESS_INVALID,
43           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
44     }
45     if (len<0) {
46       d->taskSend.mb.needSend=ORTE_TRUE;
47       return 1;
48     }
49     d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
50     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
51                   cstRemoteWriter->cstReader->guid.oid,
52                   cstRemoteWriter->guid.hid,
53                   cstRemoteWriter->guid.aid);
54   }
55   len=RTPSAckCreate(
56        &d->taskSend.mb.cdrCodec,
57        &cstRemoteWriter->sn,
58        cstRemoteWriter->cstReader->guid.oid,
59        cstRemoteWriter->guid.oid,
60        ORTE_TRUE);
61   if (len<0) {
62     //not enought space in sending buffer
63     d->taskSend.mb.needSend=ORTE_TRUE;
64     return 1;
65   }
66   debug(55,3) ("sent: RTPS_ACKF(0x%x) to 0x%x-0x%x\n",
67                 cstRemoteWriter->cstReader->guid.oid,
68                 cstRemoteWriter->guid.hid,
69                 cstRemoteWriter->guid.aid);
70   if (cstRemoteWriter->commStateACK==PULLING) {
71     eventDetach(d,
72         cstRemoteWriter->spobject->objectEntryAID,
73         &cstRemoteWriter->delayResponceTimer,
74         queue); 
75     if (cstRemoteWriter->ACKRetriesCounter<
76         cstRemoteWriter->cstReader->params.ACKMaxRetries) {
77       cstRemoteWriter->ACKRetriesCounter++;
78       eventAdd(d,
79           cstRemoteWriter->spobject->objectEntryAID,
80           &cstRemoteWriter->delayResponceTimer,
81           queue,
82           "CSTReaderResponceTimer",
83           CSTReaderResponceTimer,
84           &cstRemoteWriter->cstReader->lock,
85           cstRemoteWriter,
86           &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
87     } else {
88       debug(55,3) ("sent: maxRetries ritch upper level (%d).\n",
89                     cstRemoteWriter->cstReader->params.ACKMaxRetries);
90     }
91   }
92   if (cstRemoteWriter->commStateACK==ACKPENDING) { 
93     cstRemoteWriter->commStateACK=WAITING;
94     eventDetach(d,
95         cstRemoteWriter->spobject->objectEntryAID,
96         &cstRemoteWriter->repeatActiveQueryTimer,
97         queue); 
98     if (NtpTimeCmp(cstRemoteWriter->cstReader->
99                    params.repeatActiveQueryTime,iNtpTime)!=0) {
100       eventAdd(d,
101           cstRemoteWriter->spobject->objectEntryAID,
102           &cstRemoteWriter->repeatActiveQueryTimer,
103           queue,
104           "CSTReaderQueryTimer",
105           CSTReaderQueryTimer,
106           &cstRemoteWriter->cstReader->lock,
107           cstRemoteWriter,
108           &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
109     }
110   }
111   return 0;
112 }
113
114 /*****************************************************************************/
115 int
116 CSTReaderQueryTimer(ORTEDomain *d,void *vcstRemoteWriter) {
117   CSTRemoteWriter *cstRemoteWriter=(CSTRemoteWriter*)vcstRemoteWriter;
118   int             len;
119   char            queue=1;
120   
121   if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
122     queue=2;  
123   if (!d->taskSend.mb.containsInfoReply) { 
124     if (queue==1) {
125       len=RTPSInfoREPLYCreate(
126           &d->taskSend.mb.cdrCodec,
127           IPADDRESS_INVALID,
128           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->metatrafficUnicastPort);
129     } else {
130       len=RTPSInfoREPLYCreate(
131           &d->taskSend.mb.cdrCodec,
132           IPADDRESS_INVALID,
133           ((AppParams*)cstRemoteWriter->cstReader->objectEntryOID->attributes)->userdataUnicastPort);
134     }
135     if (len<0) {
136       d->taskSend.mb.needSend=ORTE_TRUE;
137       return 1;
138     }
139     d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
140     debug(55,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
141                   cstRemoteWriter->cstReader->guid.oid,
142                   cstRemoteWriter->guid.hid,
143                   cstRemoteWriter->guid.aid);
144   }
145   len=RTPSAckCreate(
146       &d->taskSend.mb.cdrCodec,
147       &cstRemoteWriter->sn,
148       cstRemoteWriter->cstReader->guid.oid,
149       cstRemoteWriter->guid.oid,
150       ORTE_FALSE);
151   if (len<0) {
152     d->taskSend.mb.needSend=ORTE_TRUE;
153     return 1;
154   }
155   debug(55,3) ("sent: RTPS_ACKf(0x%x) to 0x%x-0x%x\n",
156                 cstRemoteWriter->cstReader->guid.oid,
157                 cstRemoteWriter->guid.hid,
158                 cstRemoteWriter->guid.aid);
159   eventDetach(d,
160       cstRemoteWriter->spobject->objectEntryAID,
161       &cstRemoteWriter->repeatActiveQueryTimer,
162       queue);   
163   if (NtpTimeCmp(cstRemoteWriter->cstReader->
164                  params.repeatActiveQueryTime,iNtpTime)!=0) {
165     eventAdd(d,
166         cstRemoteWriter->spobject->objectEntryAID,
167         &cstRemoteWriter->repeatActiveQueryTimer,
168         queue,
169         "CSTReaderQueryTimer",
170         CSTReaderQueryTimer,
171         &cstRemoteWriter->cstReader->lock,
172         cstRemoteWriter,
173         &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
174   }
175   return 0; 
176 }
177
178
179 /*****************************************************************************/
180 int
181 CSTReaderDeadlineTimer(ORTEDomain *d,void *vcstReader) {
182   CSTReader            *cstReader=(CSTReader*)vcstReader;
183   ORTESubsProp         *sp;
184   ORTERecvInfo         info;  
185   
186   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
187   memset(&info,0,sizeof(info));
188   info.status=DEADLINE;
189   info.topic=sp->topic;
190   info.type=sp->typeName;
191   if (cstReader->objectEntryOID->recvCallBack) {
192     cstReader->objectEntryOID->recvCallBack(&info,
193         cstReader->objectEntryOID->instance,
194         cstReader->objectEntryOID->callBackParam);
195   }
196   eventDetach(d,
197       cstReader->objectEntryOID->objectEntryAID,
198       &cstReader->deadlineTimer,
199       0);
200   eventAdd(d,
201       cstReader->objectEntryOID->objectEntryAID,
202       &cstReader->deadlineTimer,
203       0,   //common timer
204       "CSTReaderDeadlineTimer",
205       CSTReaderDeadlineTimer,
206       &cstReader->lock,
207       cstReader,
208       &sp->deadline);
209   return 0;
210 }
211
212 /*****************************************************************************/
213 int
214 CSTReaderPersistenceTimer(ORTEDomain *d,void *vcstReader) {
215   CSTReader            *cstReader=(CSTReader*)vcstReader;
216   CSTRemoteWriter      *cstRemoteWriter;
217   CSChangeFromWriter   *csChangeFromWriter;
218   ORTESubsProp         *sp;
219   ORTEPublProp         *pp;
220   int32_t            strength;
221   
222   if (cstReader->cstRemoteWriterSubscribed!=NULL) {
223     //keep only one csChange (last)
224     while (cstReader->cstRemoteWriterSubscribed->csChangesCounter>1) {
225       csChangeFromWriter=
226         CSChangeFromWriter_first(cstReader->cstRemoteWriterSubscribed);
227       if (csChangeFromWriter) {  
228         CSTReaderDestroyCSChangeFromWriter(
229             cstReader->cstRemoteWriterSubscribed,
230             csChangeFromWriter,
231             ORTE_FALSE);
232       }
233     }
234   }
235   cstReader->cstRemoteWriterSubscribed=NULL;
236   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
237   strength=0;
238   gavl_cust_for_each(CSTRemoteWriter,cstReader,cstRemoteWriter) {
239     pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
240     csChangeFromWriter=CSChangeFromWriter_last(cstRemoteWriter);
241     if ((pp->strength>strength) && (csChangeFromWriter!=NULL)){
242       NtpTime persistence,persistenceExpired,actTime;
243       actTime=getActualNtpTime();
244       NtpTimeAdd(persistenceExpired,
245                  csChangeFromWriter->csChange->localTimeReceived,
246                  pp->persistence);
247       if (NtpTimeCmp(persistenceExpired,actTime)>0) {
248         NtpTimeSub(persistence,
249                    persistenceExpired,
250                    actTime);
251         eventDetach(d,
252             cstReader->objectEntryOID->objectEntryAID,
253             &cstReader->persistenceTimer,
254             0);   //common timer
255         eventAdd(d,
256             cstReader->objectEntryOID->objectEntryAID,
257             &cstReader->persistenceTimer,
258             0,   //common timer
259             "CSTReaderPersistenceTimer",
260             CSTReaderPersistenceTimer,
261             &cstReader->lock,
262             cstReader,
263             &persistence);
264         cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
265       }
266     }
267   }
268   if ((cstReader->cstRemoteWriterSubscribed!=NULL) && 
269       (sp->mode==IMMEDIATE)) {
270     CSTReaderProcCSChangesIssue(
271         cstReader->cstRemoteWriterSubscribed,ORTE_FALSE);
272   }
273   return 0;
274 }