]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriterTimer.c
f9d9ce4912a1e31e9e69f47b9f6c9d1bded67fdb
[orte.git] / orte / liborte / RTPSCSTWriterTimer.c
1 /*
2  *  $Id: RTPSCSTWriterTimer.c,v 0.0.0.1 2003/10/19 
3  *
4  *  DEBUG:  section 52                  CSTWriter timer functions
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 int 
36 CSTWriterRegistrationTimer(ORTEDomain *d,void *vcstWriter) {
37   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
38   CSTRemoteReader *cstRemoteReader;
39
40   debug(52,10) ("CSTWriterRegistrationTimer: start\n");
41
42   debug(52,5) ("CSTWriterRegistrationTimer: OID: 0xx%x - retries = %d\n",
43                 cstWriter->guid.oid,cstWriter->registrationCounter);
44   eventDetach(d,
45       cstWriter->objectEntryOID->objectEntryAID,
46       &cstWriter->registrationTimer,
47       0);   //common timer
48
49   if (cstWriter->registrationCounter!=0) {
50     cstWriter->registrationCounter--;
51     gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
52       CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
53     }
54     eventAdd(d,
55         cstWriter->objectEntryOID->objectEntryAID,
56         &cstWriter->registrationTimer,
57         0,   //common timer
58         "CSTWriterRegistrationTimer",
59         CSTWriterRegistrationTimer,
60         &cstWriter->lock,
61         cstWriter,
62         &cstWriter->params.registrationPeriod);               
63   } else {
64     if (d->domainEvents.onRegFail) {
65         d->domainEvents.onRegFail(d->domainEvents.onRegFailParam);
66     }
67   }
68
69   debug(52,10) ("CSTWriterRegistrationTimer: finished\n");
70   return 0;
71 }
72
73
74 /*****************************************************************************/
75 int 
76 CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
77   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
78   CSTRemoteReader *cstRemoteReader;
79   
80   debug(52,10) ("CSTWriterRefreshTimer: start\n");
81   
82   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
83     CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
84   }
85   eventDetach(d,
86       cstWriter->objectEntryOID->objectEntryAID,
87       &cstWriter->refreshPeriodTimer,
88       0);   //common timer
89   eventAdd(d,
90       cstWriter->objectEntryOID->objectEntryAID,
91       &cstWriter->refreshPeriodTimer,
92       0,   //common timer
93       "CSTWriterRefreshTimer",
94       CSTWriterRefreshTimer,
95       &cstWriter->lock,
96       cstWriter,
97       &cstWriter->params.refreshPeriod);               
98   debug(52,10) ("CSTWriterRefreshTimer: finished\n");
99   return 0;
100 }
101
102 /*****************************************************************************/
103 int 
104 CSTWriterAnnounceTimer(ORTEDomain *d,void *vcstRemoteReader) {
105   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
106
107   debug(52,10) ("CSTWriterAnnounceTimer: start\n");
108   if ((cstRemoteReader->commStateHB==MAYSENDHB) &&
109       ((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
110 //       (cstRemoteReader->unacknowledgedCounter))) {
111     //create HB
112     int len=RTPSHeartBeatCreate(
113         &d->taskSend.mb.cdrCodec,
114         &cstRemoteReader->cstWriter->firstSN,
115         &cstRemoteReader->cstWriter->lastSN,
116         OID_UNKNOWN,
117         cstRemoteReader->cstWriter->guid.oid,
118         ORTE_FALSE);
119     if (len<0) {
120       //not enought space in sending buffer
121       d->taskSend.mb.needSend=ORTE_TRUE;
122       return 1;
123     }
124     debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
125                   cstRemoteReader->cstWriter->guid.oid,
126                   cstRemoteReader->guid.hid,
127                   cstRemoteReader->guid.aid);
128   }
129   eventDetach(d,
130       cstRemoteReader->sobject->objectEntryAID,
131       &cstRemoteReader->repeatAnnounceTimer,
132       1);
133   eventAdd(d,
134       cstRemoteReader->sobject->objectEntryAID,
135       &cstRemoteReader->repeatAnnounceTimer,
136       1,   //metatraffic timer
137       "CSTWriterAnnounceTimer",
138       CSTWriterAnnounceTimer,
139       &cstRemoteReader->cstWriter->lock,
140       cstRemoteReader,
141       &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
142   debug(52,10) ("CSTWriterAnnounceTimer: finished\n");
143   return 0;
144 }
145
146 /*****************************************************************************/
147 int 
148 CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
149   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
150   NtpTime         nextHB;
151   ORTEPublProp    *pp;
152   int             len;
153
154   debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
155   pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
156   //create HB
157   d->taskSend.mb.cdrCodecDirect=NULL;
158   len=RTPSHeartBeatCreate(
159       &d->taskSend.mb.cdrCodec,
160       &cstRemoteReader->cstWriter->firstSN,
161       &cstRemoteReader->cstWriter->lastSN,
162       OID_UNKNOWN,
163       cstRemoteReader->cstWriter->guid.oid,
164       ORTE_FALSE);
165   if (len<0) {
166     //not enought space in sending buffer
167     d->taskSend.mb.needSend=ORTE_TRUE;
168     return 1;
169   }
170   debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
171                 cstRemoteReader->cstWriter->guid.oid,
172                 cstRemoteReader->guid.hid,
173                 cstRemoteReader->guid.aid);
174   //next HB
175   if (cstRemoteReader->cstWriter->csChangesCounter>=pp->criticalQueueLevel) {
176     nextHB=pp->HBCQLRate;
177   } else {
178     nextHB=pp->HBNornalRate;
179   }
180   cstRemoteReader->HBRetriesCounter++;
181   eventDetach(d,
182       cstRemoteReader->sobject->objectEntryAID,
183       &cstRemoteReader->repeatAnnounceTimer,
184       2);
185   if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {              
186     eventAdd(d,
187         cstRemoteReader->sobject->objectEntryAID,
188         &cstRemoteReader->repeatAnnounceTimer,
189         2,   //metatraffic timer
190         "CSTWriterAnnounceIssueTimer",
191         CSTWriterAnnounceIssueTimer,
192         &cstRemoteReader->cstWriter->lock,
193         cstRemoteReader,
194         &nextHB);
195   } else {
196     //destroy all csChangesForReader
197     CSChangeForReader *csChangeForReader;
198     while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
199       CSTWriterDestroyCSChangeForReader(
200           csChangeForReader,ORTE_TRUE);
201     }
202     debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
203                   cstRemoteReader->guid.hid,cstRemoteReader->guid.aid);
204   }
205   debug(52,10) ("CSTWriterAnnounceIssueTimer: finished\n");
206   return 0;
207 }
208
209 /**********************************************************************************/
210 int
211 CSChangeForReaderUnderwayTimer(ORTEDomain *d,void *vcsChangeForReader) {
212   CSChangeForReader *csChangeForReader=(CSChangeForReader*)vcsChangeForReader;
213   csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
214   return 0;
215 }
216
217 /**********************************************************************************/
218 int
219 CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
220   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
221   ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
222   CSChangeForReader *csChangeForReader=NULL;
223         
224   debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
225   d->taskSend.mb.cdrCodecDirect=NULL;
226   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
227     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
228       if (csChangeForReader->commStateChFReader==TOSEND) {
229         CSChange *csChange=csChangeForReader->csChange;
230
231         csChangeForReader->commStateChFReader=UNDERWAY;
232         cstRemoteReader->commStateSend=MUSTSENDDATA;
233         cstRemoteReader->lastSentIssueTime=getActualNtpTime();
234         d->taskSend.mb.cdrCodecDirect=&csChange->cdrCodec;
235
236         if (cstRemoteReader->sobject) {
237           debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x-0x%x\n",
238                         cstRemoteReader->cstWriter->guid.oid,
239                         GUID_PRINTF(cstRemoteReader->sobject->guid));
240         }
241
242         ORTESendData(d,
243             cstRemoteReader->sobject->objectEntryAID,
244             ORTE_FALSE);
245
246         //it's not nessecary to NewState, there is setuped only new state & after is deleted
247         CSTWriterCSChangeForReaderNewState(csChangeForReader);
248
249         /* mark multicast messages like processed */
250         CSTWriterMulticast(csChangeForReader);
251
252         CSTWriterDestroyCSChangeForReader(
253             csChangeForReader,ORTE_TRUE);
254
255         eventDetach(d,
256             cstRemoteReader->sobject->objectEntryAID,
257             &cstRemoteReader->delayResponceTimer,
258             2);   
259
260         //when is no csChange -> break processing 
261         if (cstRemoteReader->cstWriter->csChangesCounter==0) 
262           break;
263
264         eventAdd(d,
265             cstRemoteReader->sobject->objectEntryAID,
266             &cstRemoteReader->delayResponceTimer,
267             2,   
268             "CSTWriterSendBestEffortTimer",
269             CSTWriterSendBestEffortTimer,
270             &cstRemoteReader->cstWriter->lock,
271             cstRemoteReader,
272             &sp->minimumSeparation);
273         return 0;
274
275       }
276     }
277   }
278   debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
279   return 0;
280 }
281
282 /**********************************************************************************/
283 int
284 CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
285   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
286   CSChangeForReader *csChangeForReader=NULL;
287   int               len,data_offset,wptr_max;
288   CSChange          *csChange;
289   Boolean           firstTrace=ORTE_TRUE;
290   
291   debug(52,10) ("CSTWriterSendStrictTimer: start\n");
292   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
293     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
294       csChange=csChangeForReader->csChange;
295       if (csChangeForReader->commStateChFReader==TOSEND) {
296         cstRemoteReader->commStateSend=MUSTSENDDATA;
297
298         wptr_max=d->taskSend.mb.cdrCodec.wptr_max;
299         d->taskSend.mb.cdrCodec.wptr_max=csChange->cdrCodec.wptr_max;
300         /* infoReply */
301         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
302             !d->taskSend.mb.containsInfoReply) {
303           AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
304           firstTrace=ORTE_FALSE;
305           len=RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
306               IPADDRESS_INVALID,
307               ap->userdataUnicastPort);
308           if (len<0) {
309             d->taskSend.mb.needSend=ORTE_TRUE;
310             d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
311             return 1;
312           }
313           d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
314           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
315                        cstRemoteReader->cstWriter->guid.oid,
316                        cstRemoteReader->guid.hid,
317                        cstRemoteReader->guid.aid);
318         }
319
320         data_offset=RTPS_HEADER_LENGTH+12;
321         if (CDR_buffer_puts(&d->taskSend.mb.cdrCodec,
322                             csChange->cdrCodec.buffer+data_offset, //src
323                             csChange->cdrCodec.wptr-data_offset)==CORBA_FALSE) {
324             d->taskSend.mb.needSend=ORTE_TRUE;
325             d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
326             return 1;
327         }
328     
329         d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
330
331         /* setup new state for csChangeForReader */
332         CSTWriterCSChangeForReaderNewState(csChangeForReader);
333
334         /* mark multicast messages like processed */
335         CSTWriterMulticast(csChangeForReader);
336
337         debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
338                     cstRemoteReader->cstWriter->guid.oid,
339                     cstRemoteReader->guid.hid,
340                     cstRemoteReader->guid.aid);
341       }
342     }
343   }
344   debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
345   //add HeardBeat  
346   return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
347 }
348
349 /**********************************************************************************/
350 int
351 CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
352   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
353   CSChangeForReader *csChangeForReader=NULL;
354   Boolean           firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
355   
356   debug(52,10) ("CSTWriterSendTimer: start\n");
357
358   /* setup f_bit of object */
359   if (cstRemoteReader->cstWriter->params.fullAcknowledge)
360     f_bit=ORTE_FALSE;
361
362   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
363
364     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
365
366       if (csChangeForReader->commStateChFReader==TOSEND) {
367         cstRemoteReader->commStateSend=MUSTSENDDATA;
368
369         /* infoReply */
370         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
371             !d->taskSend.mb.containsInfoReply) {
372           AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
373           firstTrace=ORTE_FALSE;
374           if (RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
375                                   IPADDRESS_INVALID,
376                                   ap->metatrafficUnicastPort) < 0) {
377             d->taskSend.mb.needSend=ORTE_TRUE;
378             return 1;
379           }
380           d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
381           debug(52,3) ("sent: RTPS_InfoREPLY from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
382                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
383                         GUID_PRINTF(cstRemoteReader->guid));
384         }
385
386         /* VAR */
387         if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
388           debug(52,3) ("sent: RTPS_VAR from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
389                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
390                         GUID_PRINTF(cstRemoteReader->guid));
391
392           if (RTPSVarCreate(&d->taskSend.mb.cdrCodec,
393                             OID_UNKNOWN,
394                             cstRemoteReader->cstWriter->guid.oid,
395                             csChangeForReader->csChange) < 0) {
396             d->taskSend.mb.needSend=ORTE_TRUE;
397             return 1;
398           }
399
400         } else {
401         /* GAP */
402           debug(52,3) ("sent: RTPS_GAP from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
403                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
404                         GUID_PRINTF(cstRemoteReader->guid));
405
406           if (RTPSGapCreate(&d->taskSend.mb.cdrCodec,
407                             OID_UNKNOWN,
408                             cstRemoteReader->cstWriter->guid.oid,
409                             csChangeForReader->csChange) < 0) {
410             d->taskSend.mb.needSend=ORTE_TRUE;
411             return 1;
412           }
413         }
414
415         /* setup new state for csChangeForReader */
416         CSTWriterCSChangeForReaderNewState(csChangeForReader);
417
418         /* mark multicast messages like processed */
419         CSTWriterMulticast(csChangeForReader);
420
421       }
422     } /* gavl_cust_for_each */
423
424     cstRemoteReader->commStateHB=MUSTSENDHB;
425
426   }
427
428   if (cstRemoteReader->commStateHB==MUSTSENDHB) {
429     //add HeartBeat
430     if (RTPSHeartBeatCreate(
431         &d->taskSend.mb.cdrCodec,
432         &cstRemoteReader->cstWriter->firstSN,
433         &cstRemoteReader->cstWriter->lastSN,
434         OID_UNKNOWN,
435         cstRemoteReader->cstWriter->guid.oid,
436         f_bit)<0) {
437       d->taskSend.mb.needSend=ORTE_TRUE;
438       return 1;
439     } else {
440       //schedule new time for Announce timer
441       eventDetach(d,
442            cstRemoteReader->sobject->objectEntryAID,
443            &cstRemoteReader->repeatAnnounceTimer,
444            1);
445       eventAdd(d,
446            cstRemoteReader->sobject->objectEntryAID,
447            &cstRemoteReader->repeatAnnounceTimer,
448            1,   //metatraffic timer
449            "CSTWriterAnnounceTimer",
450            CSTWriterAnnounceTimer,
451            &cstRemoteReader->cstWriter->lock,
452            cstRemoteReader,
453            &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
454     }
455
456     debug(52,3) ("sent: RTPS_HB from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
457                   GUID_PRINTF(cstRemoteReader->cstWriter->guid),
458                   GUID_PRINTF(cstRemoteReader->guid));
459
460     cstRemoteReader->commStateHB=MAYSENDHB;
461   }
462  
463   debug(52,10) ("CSTWriterSendTimer: finished\n");
464   return 0;
465 }