]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriterTimer.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSCSTWriterTimer.c
1 /*
2  *  $Id: RTPSCSTWriterTimer.c,v 0.0.0.1 2003/10/19 
3  *
4  *  DEBUG:  section 52                  CSTWriter timer functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 int 
26 CSTWriterRegistrationTimer(ORTEDomain *d,void *vcstWriter) {
27   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
28   CSTRemoteReader *cstRemoteReader;
29
30   debug(52,10) ("CSTWriterRegistrationTimer: start\n");
31
32   debug(52,5) ("CSTWriterRegistrationTimer: OID: 0xx%x - retries = %d\n",
33                 cstWriter->guid.oid,cstWriter->registrationCounter);
34   eventDetach(d,
35       cstWriter->objectEntryOID->objectEntryAID,
36       &cstWriter->registrationTimer,
37       0);   //common timer
38
39   if (cstWriter->registrationCounter!=0) {
40     cstWriter->registrationCounter--;
41     gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
42       CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
43     }
44     eventAdd(d,
45         cstWriter->objectEntryOID->objectEntryAID,
46         &cstWriter->registrationTimer,
47         0,   //common timer
48         "CSTWriterRegistrationTimer",
49         CSTWriterRegistrationTimer,
50         &cstWriter->lock,
51         cstWriter,
52         &cstWriter->params.registrationPeriod);               
53   } else {
54     if (d->domainEvents.onRegFail) {
55         d->domainEvents.onRegFail(d->domainEvents.onRegFailParam);
56     }
57   }
58
59   debug(52,10) ("CSTWriterRegistrationTimer: finished\n");
60   return 0;
61 }
62
63
64 /*****************************************************************************/
65 int 
66 CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
67   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
68   CSTRemoteReader *cstRemoteReader;
69   
70   debug(52,10) ("CSTWriterRefreshTimer: start\n");
71   
72   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
73     CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
74   }
75   eventDetach(d,
76       cstWriter->objectEntryOID->objectEntryAID,
77       &cstWriter->refreshPeriodTimer,
78       0);   //common timer
79   eventAdd(d,
80       cstWriter->objectEntryOID->objectEntryAID,
81       &cstWriter->refreshPeriodTimer,
82       0,   //common timer
83       "CSTWriterRefreshTimer",
84       CSTWriterRefreshTimer,
85       &cstWriter->lock,
86       cstWriter,
87       &cstWriter->params.refreshPeriod);               
88   debug(52,10) ("CSTWriterRefreshTimer: finished\n");
89   return 0;
90 }
91
92 /*****************************************************************************/
93 int 
94 CSTWriterAnnounceTimer(ORTEDomain *d,void *vcstRemoteReader) {
95   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
96
97   debug(52,10) ("CSTWriterAnnounceTimer: start\n");
98   if ((cstRemoteReader->commStateHB==MAYSENDHB) &&
99       ((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
100 //       (cstRemoteReader->unacknowledgedCounter))) {
101     //create HB
102     int len=RTPSHeartBeatCreate(
103         &d->taskSend.mb.cdrCodec,
104         &cstRemoteReader->cstWriter->firstSN,
105         &cstRemoteReader->cstWriter->lastSN,
106         OID_UNKNOWN,
107         cstRemoteReader->cstWriter->guid.oid,
108         ORTE_FALSE);
109     if (len<0) {
110       //not enought space in sending buffer
111       d->taskSend.mb.needSend=ORTE_TRUE;
112       return 1;
113     }
114     debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
115                   cstRemoteReader->cstWriter->guid.oid,
116                   cstRemoteReader->guid.hid,
117                   cstRemoteReader->guid.aid);
118   }
119   eventDetach(d,
120       cstRemoteReader->sobject->objectEntryAID,
121       &cstRemoteReader->repeatAnnounceTimer,
122       1);
123   eventAdd(d,
124       cstRemoteReader->sobject->objectEntryAID,
125       &cstRemoteReader->repeatAnnounceTimer,
126       1,   //metatraffic timer
127       "CSTWriterAnnounceTimer",
128       CSTWriterAnnounceTimer,
129       &cstRemoteReader->cstWriter->lock,
130       cstRemoteReader,
131       &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
132   debug(52,10) ("CSTWriterAnnounceTimer: finished\n");
133   return 0;
134 }
135
136 /*****************************************************************************/
137 int 
138 CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
139   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
140   NtpTime         nextHB;
141   ORTEPublProp    *pp;
142   int             len;
143
144   debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
145   pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
146   //create HB
147   d->taskSend.mb.cdrCodecDirect=NULL;
148   len=RTPSHeartBeatCreate(
149       &d->taskSend.mb.cdrCodec,
150       &cstRemoteReader->cstWriter->firstSN,
151       &cstRemoteReader->cstWriter->lastSN,
152       OID_UNKNOWN,
153       cstRemoteReader->cstWriter->guid.oid,
154       ORTE_FALSE);
155   if (len<0) {
156     //not enought space in sending buffer
157     d->taskSend.mb.needSend=ORTE_TRUE;
158     return 1;
159   }
160   debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
161                 cstRemoteReader->cstWriter->guid.oid,
162                 cstRemoteReader->guid.hid,
163                 cstRemoteReader->guid.aid);
164   //next HB
165   if (cstRemoteReader->cstWriter->csChangesCounter>=pp->criticalQueueLevel) {
166     nextHB=pp->HBCQLRate;
167   } else {
168     nextHB=pp->HBNornalRate;
169   }
170   cstRemoteReader->HBRetriesCounter++;
171   eventDetach(d,
172       cstRemoteReader->sobject->objectEntryAID,
173       &cstRemoteReader->repeatAnnounceTimer,
174       2);
175   if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {              
176     eventAdd(d,
177         cstRemoteReader->sobject->objectEntryAID,
178         &cstRemoteReader->repeatAnnounceTimer,
179         2,   //metatraffic timer
180         "CSTWriterAnnounceIssueTimer",
181         CSTWriterAnnounceIssueTimer,
182         &cstRemoteReader->cstWriter->lock,
183         cstRemoteReader,
184         &nextHB);
185   } else {
186     //destroy all csChangesForReader
187     CSChangeForReader *csChangeForReader;
188     while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
189       CSTWriterDestroyCSChangeForReader(
190           csChangeForReader,ORTE_TRUE);
191     }
192     debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
193                   cstRemoteReader->guid.hid,cstRemoteReader->guid.aid);
194   }
195   debug(52,10) ("CSTWriterAnnounceIssueTimer: finished\n");
196   return 0;
197 }
198
199 /**********************************************************************************/
200 int
201 CSChangeForReaderUnderwayTimer(ORTEDomain *d,void *vcsChangeForReader) {
202   CSChangeForReader *csChangeForReader=(CSChangeForReader*)vcsChangeForReader;
203   csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
204   return 0;
205 }
206
207 /**********************************************************************************/
208 int
209 CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
210   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
211   ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
212   CSChangeForReader *csChangeForReader=NULL;
213         
214   debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
215   d->taskSend.mb.cdrCodecDirect=NULL;
216   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
217     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
218       if (csChangeForReader->commStateChFReader==TOSEND) {
219         CSChange *csChange=csChangeForReader->csChange;
220
221         csChangeForReader->commStateChFReader=UNDERWAY;
222         cstRemoteReader->commStateSend=MUSTSENDDATA;
223         cstRemoteReader->lastSentIssueTime=getActualNtpTime();
224         d->taskSend.mb.cdrCodecDirect=&csChange->cdrCodec;
225
226         if (cstRemoteReader->sobject) {
227           debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x-0x%x\n",
228                         cstRemoteReader->cstWriter->guid.oid,
229                         GUID_PRINTF(cstRemoteReader->sobject->guid));
230         }
231
232         ORTESendData(d,
233             cstRemoteReader->sobject->objectEntryAID,
234             ORTE_FALSE);
235
236         //it's not nessecary to NewState, there is setuped only new state & after is deleted
237         CSTWriterCSChangeForReaderNewState(csChangeForReader);
238
239         /* mark multicast messages like processed */
240         CSTWriterMulticast(csChangeForReader);
241
242         CSTWriterDestroyCSChangeForReader(
243             csChangeForReader,ORTE_TRUE);
244
245         eventDetach(d,
246             cstRemoteReader->sobject->objectEntryAID,
247             &cstRemoteReader->delayResponceTimer,
248             2);   
249
250         //when is no csChange -> break processing 
251         if (cstRemoteReader->cstWriter->csChangesCounter==0) 
252           break;
253
254         eventAdd(d,
255             cstRemoteReader->sobject->objectEntryAID,
256             &cstRemoteReader->delayResponceTimer,
257             2,   
258             "CSTWriterSendBestEffortTimer",
259             CSTWriterSendBestEffortTimer,
260             &cstRemoteReader->cstWriter->lock,
261             cstRemoteReader,
262             &sp->minimumSeparation);
263         return 0;
264
265       }
266     }
267   }
268   debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
269   return 0;
270 }
271
272 /**********************************************************************************/
273 int
274 CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
275   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
276   CSChangeForReader *csChangeForReader=NULL;
277   int               len,data_offset,wptr_max;
278   CSChange          *csChange;
279   Boolean           firstTrace=ORTE_TRUE;
280   
281   debug(52,10) ("CSTWriterSendStrictTimer: start\n");
282   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
283     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
284       csChange=csChangeForReader->csChange;
285       if (csChangeForReader->commStateChFReader==TOSEND) {
286         cstRemoteReader->commStateSend=MUSTSENDDATA;
287
288         wptr_max=d->taskSend.mb.cdrCodec.wptr_max;
289         d->taskSend.mb.cdrCodec.wptr_max=csChange->cdrCodec.wptr_max;
290         /* infoReply */
291         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
292             !d->taskSend.mb.containsInfoReply) {
293           AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
294           firstTrace=ORTE_FALSE;
295           len=RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
296               IPADDRESS_INVALID,
297               ap->userdataUnicastPort);
298           if (len<0) {
299             d->taskSend.mb.needSend=ORTE_TRUE;
300             d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
301             return 1;
302           }
303           d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
304           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
305                        cstRemoteReader->cstWriter->guid.oid,
306                        cstRemoteReader->guid.hid,
307                        cstRemoteReader->guid.aid);
308         }
309
310         data_offset=RTPS_HEADER_LENGTH+12;
311         if (CDR_buffer_puts(&d->taskSend.mb.cdrCodec,
312                             csChange->cdrCodec.buffer+data_offset, //src
313                             csChange->cdrCodec.wptr-data_offset)==CORBA_FALSE) {
314             d->taskSend.mb.needSend=ORTE_TRUE;
315             d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
316             return 1;
317         }
318     
319         d->taskSend.mb.cdrCodec.wptr_max=wptr_max;
320
321         /* setup new state for csChangeForReader */
322         CSTWriterCSChangeForReaderNewState(csChangeForReader);
323
324         /* mark multicast messages like processed */
325         CSTWriterMulticast(csChangeForReader);
326
327         debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
328                     cstRemoteReader->cstWriter->guid.oid,
329                     cstRemoteReader->guid.hid,
330                     cstRemoteReader->guid.aid);
331       }
332     }
333   }
334   debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
335   //add HeardBeat  
336   return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
337 }
338
339 /**********************************************************************************/
340 int
341 CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
342   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
343   CSChangeForReader *csChangeForReader=NULL;
344   Boolean           firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
345   
346   debug(52,10) ("CSTWriterSendTimer: start\n");
347
348   /* setup f_bit of object */
349   if (cstRemoteReader->cstWriter->params.fullAcknowledge)
350     f_bit=ORTE_FALSE;
351
352   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
353
354     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
355
356       if (csChangeForReader->commStateChFReader==TOSEND) {
357         cstRemoteReader->commStateSend=MUSTSENDDATA;
358
359         /* infoReply */
360         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
361             !d->taskSend.mb.containsInfoReply) {
362           AppParams *ap=cstRemoteReader->cstWriter->objectEntryOID->attributes;
363           firstTrace=ORTE_FALSE;
364           if (RTPSInfoREPLYCreate(&d->taskSend.mb.cdrCodec,
365                                   IPADDRESS_INVALID,
366                                   ap->metatrafficUnicastPort) < 0) {
367             d->taskSend.mb.needSend=ORTE_TRUE;
368             return 1;
369           }
370           d->taskSend.mb.containsInfoReply=ORTE_TRUE;  
371           debug(52,3) ("sent: RTPS_InfoREPLY from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
372                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
373                         GUID_PRINTF(cstRemoteReader->guid));
374         }
375
376         /* VAR */
377         if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
378           debug(52,3) ("sent: RTPS_VAR from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
379                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
380                         GUID_PRINTF(cstRemoteReader->guid));
381
382           if (RTPSVarCreate(&d->taskSend.mb.cdrCodec,
383                             OID_UNKNOWN,
384                             cstRemoteReader->cstWriter->guid.oid,
385                             csChangeForReader->csChange) < 0) {
386             d->taskSend.mb.needSend=ORTE_TRUE;
387             return 1;
388           }
389
390         } else {
391         /* GAP */
392           debug(52,3) ("sent: RTPS_GAP from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
393                         GUID_PRINTF(cstRemoteReader->cstWriter->guid),
394                         GUID_PRINTF(cstRemoteReader->guid));
395
396           if (RTPSGapCreate(&d->taskSend.mb.cdrCodec,
397                             OID_UNKNOWN,
398                             cstRemoteReader->cstWriter->guid.oid,
399                             csChangeForReader->csChange) < 0) {
400             d->taskSend.mb.needSend=ORTE_TRUE;
401             return 1;
402           }
403         }
404
405         /* setup new state for csChangeForReader */
406         CSTWriterCSChangeForReaderNewState(csChangeForReader);
407
408         /* mark multicast messages like processed */
409         CSTWriterMulticast(csChangeForReader);
410
411       }
412     } /* gavl_cust_for_each */
413
414     cstRemoteReader->commStateHB=MUSTSENDHB;
415
416   }
417
418   if (cstRemoteReader->commStateHB==MUSTSENDHB) {
419     //add HeartBeat
420     if (RTPSHeartBeatCreate(
421         &d->taskSend.mb.cdrCodec,
422         &cstRemoteReader->cstWriter->firstSN,
423         &cstRemoteReader->cstWriter->lastSN,
424         OID_UNKNOWN,
425         cstRemoteReader->cstWriter->guid.oid,
426         f_bit)<0) {
427       d->taskSend.mb.needSend=ORTE_TRUE;
428       return 1;
429     } else {
430       //schedule new time for Announce timer
431       eventDetach(d,
432            cstRemoteReader->sobject->objectEntryAID,
433            &cstRemoteReader->repeatAnnounceTimer,
434            1);
435       eventAdd(d,
436            cstRemoteReader->sobject->objectEntryAID,
437            &cstRemoteReader->repeatAnnounceTimer,
438            1,   //metatraffic timer
439            "CSTWriterAnnounceTimer",
440            CSTWriterAnnounceTimer,
441            &cstRemoteReader->cstWriter->lock,
442            cstRemoteReader,
443            &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
444     }
445
446     debug(52,3) ("sent: RTPS_HB from 0x%x-0x%x-0x%x to 0x%x-0x%x-0x%x\n",
447                   GUID_PRINTF(cstRemoteReader->cstWriter->guid),
448                   GUID_PRINTF(cstRemoteReader->guid));
449
450     cstRemoteReader->commStateHB=MAYSENDHB;
451   }
452  
453   debug(52,10) ("CSTWriterSendTimer: finished\n");
454   return 0;
455 }