]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriterTimer.c
OCERA SF CVS tree of ORTE framework updated to
[orte.git] / orte / liborte / RTPSCSTWriterTimer.c
1 /*
2  *  $Id: RTPSCSTWriterTimer.c,v 0.0.0.1 2003/10/19 
3  *
4  *  DEBUG:  section 52                  CSTWriter timer functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 int 
26 CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
27   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
28   CSTRemoteReader *cstRemoteReader;
29   
30   debug(52,10) ("CSTWriterRefreshTimer: start\n");
31   
32   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
33     CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
34   }
35   eventDetach(d,
36       cstWriter->objectEntryOID->objectEntryAID,
37       &cstWriter->refreshPeriodTimer,
38       0);   //common timer
39   eventAdd(d,
40       cstWriter->objectEntryOID->objectEntryAID,
41       &cstWriter->refreshPeriodTimer,
42       0,   //common timer
43       "CSTWriterRefreshTimer",
44       CSTWriterRefreshTimer,
45       &cstWriter->lock,
46       cstWriter,
47       &cstWriter->params.refreshPeriod);               
48   debug(52,10) ("CSTWriterRefreshTimer: finished\n");
49   return 0;
50 }
51
52 /*****************************************************************************/
53 int 
54 CSTWriterAnnounceTimer(ORTEDomain *d,void *vcstRemoteReader) {
55   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
56
57   debug(52,10) ("CSTWriterAnnounceTimer: start\n");
58   if ((cstRemoteReader->commStateHB==MAYSENDHB) &&
59       ((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
60 //       (cstRemoteReader->unacknowledgedCounter))) {
61     //create HB
62     int len=RTPSHeardBeatCreate(
63         d->mbSend.cdrStream.bufferPtr,
64         getMaxMessageLength(d),
65         &cstRemoteReader->cstWriter->firstSN,
66         &cstRemoteReader->cstWriter->lastSN,
67         cstRemoteReader->cstWriter->guid.oid,
68         OID_UNKNOWN,
69         ORTE_FALSE);
70     if (len<0) {
71       //not enought space in sending buffer
72       d->mbSend.needSend=ORTE_TRUE;
73       return 1;
74     }
75     d->mbSend.cdrStream.bufferPtr+=len;
76     d->mbSend.cdrStream.length+=len;
77     debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
78                   cstRemoteReader->cstWriter->guid.oid,
79                   cstRemoteReader->guid.hid,
80                   cstRemoteReader->guid.aid);
81   }
82   eventDetach(d,
83       cstRemoteReader->objectEntryOID->objectEntryAID,
84       &cstRemoteReader->repeatAnnounceTimer,
85       1);
86   eventAdd(d,
87       cstRemoteReader->objectEntryOID->objectEntryAID,
88       &cstRemoteReader->repeatAnnounceTimer,
89       1,   //metatraffic timer
90       "CSTWriterAnnounceTimer",
91       CSTWriterAnnounceTimer,
92       &cstRemoteReader->cstWriter->lock,
93       cstRemoteReader,
94       &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
95   debug(52,10) ("CSTWriterAnnounceTimer: finished\n");
96   return 0;
97 }
98
99 /*****************************************************************************/
100 int 
101 CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
102   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
103   NtpTime         nextHB;
104   ORTEPublProp    *pp;
105
106   debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
107   pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
108   //create HB
109   d->mbSend.cdrStreamDirect=NULL;
110   int len=RTPSHeardBeatCreate(
111       d->mbSend.cdrStream.bufferPtr,
112       getMaxMessageLength(d),
113       &cstRemoteReader->cstWriter->firstSN,
114       &cstRemoteReader->cstWriter->lastSN,
115       cstRemoteReader->cstWriter->guid.oid,
116       OID_UNKNOWN,
117       ORTE_FALSE);
118   if (len<0) {
119     //not enought space in sending buffer
120     d->mbSend.needSend=ORTE_TRUE;
121     return 1;
122   }
123   d->mbSend.cdrStream.bufferPtr+=len;
124   d->mbSend.cdrStream.length+=len;
125   debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
126                 cstRemoteReader->cstWriter->guid.oid,
127                 cstRemoteReader->guid.hid,
128                 cstRemoteReader->guid.aid);
129   //next HB
130   if (cstRemoteReader->cstWriter->csChangesCounter>=pp->criticalQueueLevel) {
131     nextHB=pp->HBCQLRate;
132   } else {
133     nextHB=pp->HBNornalRate;
134   }
135   cstRemoteReader->HBRetriesCounter++;
136   eventDetach(d,
137       cstRemoteReader->objectEntryOID->objectEntryAID,
138       &cstRemoteReader->repeatAnnounceTimer,
139       2);
140   if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {              
141     eventAdd(d,
142         cstRemoteReader->objectEntryOID->objectEntryAID,
143         &cstRemoteReader->repeatAnnounceTimer,
144         2,   //metatraffic timer
145         "CSTWriterAnnounceIssueTimer",
146         CSTWriterAnnounceIssueTimer,
147         &cstRemoteReader->cstWriter->lock,
148         cstRemoteReader,
149         &nextHB);
150   } else {
151     //destroy all csChangesForReader
152     CSChangeForReader *csChangeForReader;
153     while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
154       CSTWriterDestroyCSChangeForReader(cstRemoteReader,
155           csChangeForReader,ORTE_TRUE);
156     }
157     debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
158                   cstRemoteReader->guid.hid,cstRemoteReader->guid.aid);
159   }
160   debug(52,10) ("CSTWriterAnnounceIssueTimer: finished\n");
161   return 0;
162 }
163
164 /**********************************************************************************/
165 int
166 CSChangeForReaderUnderwayTimer(ORTEDomain *d,void *vcsChangeForReader) {
167   CSChangeForReader *csChangeForReader=(CSChangeForReader*)vcsChangeForReader;
168   csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
169   return 0;
170 }
171
172 /**********************************************************************************/
173 int
174 CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
175   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
176   ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
177   CSChangeForReader *csChangeForReader=NULL;
178         
179   debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
180   d->mbSend.cdrStreamDirect=NULL;
181   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
182     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
183       if (csChangeForReader->commStateChFReader==TOSEND) {
184         CSChange *csChange=csChangeForReader->csChange;
185         csChangeForReader->commStateChFReader=UNDERWAY;
186         cstRemoteReader->commStateSend=MUSTSENDDATA;
187         cstRemoteReader->lastSentIssueTime=getActualNtpTime();
188         d->mbSend.cdrStreamDirect=&csChange->cdrStream;
189         debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x\n",
190                     cstRemoteReader->cstWriter->guid.oid,
191                     cstRemoteReader->guid.hid,
192                     cstRemoteReader->guid.aid);
193         ORTESendData(d,
194             cstRemoteReader->objectEntryOID->objectEntryAID,
195             ORTE_FALSE);
196         CSTWriterDestroyCSChangeForReader(cstRemoteReader,
197             csChangeForReader,ORTE_TRUE);
198         eventDetach(d,
199             cstRemoteReader->objectEntryOID->objectEntryAID,
200             &cstRemoteReader->delayResponceTimer,
201             2);   
202         //when is no csChange -> break processing 
203         if (cstRemoteReader->cstWriter->csChangesCounter==0) 
204           break;
205         eventAdd(d,
206             cstRemoteReader->objectEntryOID->objectEntryAID,
207             &cstRemoteReader->delayResponceTimer,
208             2,   
209             "CSTWriterSendBestEffortTimer",
210             CSTWriterSendBestEffortTimer,
211             &cstRemoteReader->cstWriter->lock,
212             cstRemoteReader,
213             &sp->minimumSeparation);
214         return 0;
215       }
216     }
217   }
218   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
219   debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
220   return 0;
221 }
222
223 /**********************************************************************************/
224 int
225 CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
226   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
227   CSChangeForReader *csChangeForReader=NULL;
228   unsigned int      max_msg_len;
229   CSChange          *csChange;
230   int               len;
231   Boolean           firstTrace=ORTE_TRUE;
232   
233   debug(52,10) ("CSTWriterSendStrictTimer: start\n");
234   max_msg_len=getMaxMessageLength(d);
235   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
236     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
237       csChange=csChangeForReader->csChange;
238       if (csChangeForReader->commStateChFReader==TOSEND) {
239         cstRemoteReader->commStateSend=MUSTSENDDATA;
240         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
241             !d->mbSend.containsInfoReply) {
242           firstTrace=ORTE_FALSE;
243           len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
244               IPADDRESS_INVALID,
245               ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->userdataUnicastPort);
246           if (len<0) {
247             d->mbSend.needSend=ORTE_TRUE;
248             return 1;
249           }
250           d->mbSend.containsInfoReply=ORTE_TRUE;  
251           d->mbSend.cdrStream.bufferPtr+=len;
252           d->mbSend.cdrStream.length+=len;
253           max_msg_len-=len;
254           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
255                        cstRemoteReader->cstWriter->guid.oid,
256                        cstRemoteReader->guid.hid,
257                        cstRemoteReader->guid.aid);
258         }
259         len=20+cstRemoteReader->cstWriter->typeRegister->getMaxSize;
260         if (max_msg_len<len) {
261           d->mbSend.needSend=ORTE_TRUE;
262           return 1;
263         }
264         memcpy(d->mbSend.cdrStream.bufferPtr,     //dest
265                csChange->cdrStream.bufferPtr-len, //src
266                len);                              //length
267         d->mbSend.cdrStream.bufferPtr+=len;
268         d->mbSend.cdrStream.length+=len;
269         max_msg_len-=len;
270         debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
271                     cstRemoteReader->cstWriter->guid.oid,
272                     cstRemoteReader->guid.hid,
273                     cstRemoteReader->guid.aid);
274       }
275     }
276   }
277   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
278   debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
279   //add HeardBeat  
280   return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
281 }
282
283 /**********************************************************************************/
284 int
285 CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
286   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
287   CSChangeForReader *csChangeForReader=NULL;
288   unsigned int      max_msg_len;
289   int               len,off;
290   Boolean           firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
291   
292   debug(52,10) ("CSTWriterSendTimer: start\n");
293   max_msg_len=getMaxMessageLength(d);
294   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
295     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
296       if (csChangeForReader->commStateChFReader==TOSEND) {
297         cstRemoteReader->commStateSend=MUSTSENDDATA;
298         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
299             !d->mbSend.containsInfoReply) {
300           f_bit=ORTE_FALSE;
301           firstTrace=ORTE_FALSE;
302           len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
303               IPADDRESS_INVALID,
304               ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->metatrafficUnicastPort);
305           if (len<0) {
306             d->mbSend.needSend=ORTE_TRUE;
307             return 1;
308           }
309           d->mbSend.containsInfoReply=ORTE_TRUE;  
310           d->mbSend.cdrStream.bufferPtr+=len;
311           d->mbSend.cdrStream.length+=len;
312           max_msg_len-=len;
313           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
314                        cstRemoteReader->cstWriter->guid.oid,
315                        cstRemoteReader->guid.hid,
316                        cstRemoteReader->guid.aid);
317         }
318         if (max_msg_len<32) {
319           d->mbSend.needSend=ORTE_TRUE;
320           return 1;
321         }
322         off=0;
323         //VAR ???
324         if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
325           debug(52,3) ("sent: RTPS_VAR(0x%x) to 0x%x-0x%x\n",
326                        cstRemoteReader->cstWriter->guid.oid,
327                        cstRemoteReader->guid.hid,
328                        cstRemoteReader->guid.aid);
329           len=32;
330           d->mbSend.cdrStream.bufferPtr[0]=(u_int8_t)VAR;
331           d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
332           if (csChangeForReader->csChange->alive) 
333             d->mbSend.cdrStream.bufferPtr[1]|=4;
334           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
335           conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
336           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
337             cstRemoteReader->cstWriter->guid.oid;
338           conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
339           if (csChangeForReader->csChange->guid.oid==OID_APP) {
340             d->mbSend.cdrStream.bufferPtr[1]|=8;
341             *((HostId*)(d->mbSend.cdrStream.bufferPtr+12))=
342               csChangeForReader->csChange->guid.hid;
343             conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+12),0);
344             *((AppId*)(d->mbSend.cdrStream.bufferPtr+16))=
345               csChangeForReader->csChange->guid.aid;
346             conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+16),0);
347           } else {
348             len-=8;
349             off=-8;
350           }
351           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+20+off))=
352             csChangeForReader->csChange->guid.oid;
353           conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+20+off),0);
354           *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+24+off))=
355             csChangeForReader->csChange->sn;
356           if (!CSChangeAttributes_is_empty(csChangeForReader->csChange)) {
357             int plen;
358             plen=parameterCodeStreamFromCSChange(csChangeForReader->csChange,
359                  d->mbSend.cdrStream.bufferPtr+32+off,max_msg_len-len);
360             if (plen<0) {
361               d->mbSend.needSend=ORTE_TRUE;
362               return 1;
363             }
364             d->mbSend.cdrStream.bufferPtr[1]|=2;
365             len+=plen;
366           }
367         } else {  //GAP ???
368           debug(52,3) ("sent: RTPS_GAP(0x%x) to 0x%x-0x%x\n",
369                        cstRemoteReader->cstWriter->guid.oid,
370                        cstRemoteReader->guid.hid,
371                        cstRemoteReader->guid.aid);
372           len=32;
373           d->mbSend.cdrStream.bufferPtr[0]=(u_int8_t)GAP;
374           d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
375           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
376           conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
377           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
378             cstRemoteReader->cstWriter->guid.oid;
379           conv_u32((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
380           *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12))=
381             csChangeForReader->csChange->sn;
382           conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12),ORTE_MY_MBO);
383           SeqNumberAdd(*((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20)),  
384                        csChangeForReader->csChange->sn,
385                        csChangeForReader->csChange->gapSN);
386           conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20),ORTE_MY_MBO);
387           *((u_int32_t*)(d->mbSend.cdrStream.bufferPtr+28))=0;    //NumBits 
388         }
389         *((ParameterLength*)(d->mbSend.cdrStream.bufferPtr+2))=len-4; 
390         d->mbSend.cdrStream.bufferPtr+=len;
391         d->mbSend.cdrStream.length+=len;
392         max_msg_len-=len;
393         //setup new state for csChangeForReader
394         if (NtpTimeCmp(zNtpTime,
395                 cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
396           csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
397         } else {
398           csChangeForReader->commStateChFReader=UNDERWAY;
399           eventDetach(d,
400               cstRemoteReader->objectEntryOID->objectEntryAID,
401               &csChangeForReader->waitWhileDataUnderwayTimer,
402               0);
403           eventAdd(d,
404               cstRemoteReader->objectEntryOID->objectEntryAID,
405               &csChangeForReader->waitWhileDataUnderwayTimer,
406               0,   //common timer
407               "CSChangeForReaderUnderwayTimer",
408               CSChangeForReaderUnderwayTimer,
409               &cstRemoteReader->cstWriter->lock,
410               csChangeForReader,
411               &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
412         }
413       }
414     }
415   }
416   //add HeardBeat
417   len=RTPSHeardBeatCreate(
418       d->mbSend.cdrStream.bufferPtr,max_msg_len,
419       &cstRemoteReader->cstWriter->firstSN,
420       &cstRemoteReader->cstWriter->lastSN,
421       cstRemoteReader->cstWriter->guid.oid,
422       OID_UNKNOWN,
423       f_bit);
424   if (len<0) {
425     d->mbSend.needSend=ORTE_TRUE;
426     return 1;
427   } else {
428     //schedule new time for Announce timer
429     eventDetach(d,
430          cstRemoteReader->objectEntryOID->objectEntryAID,
431          &cstRemoteReader->repeatAnnounceTimer,
432          1);
433     eventAdd(d,
434          cstRemoteReader->objectEntryOID->objectEntryAID,
435          &cstRemoteReader->repeatAnnounceTimer,
436          1,   //metatraffic timer
437          "CSTWriterAnnounceTimer",
438          CSTWriterAnnounceTimer,
439          &cstRemoteReader->cstWriter->lock,
440          cstRemoteReader,
441          &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
442   }
443   debug(52,3) ("sent: RTPS_HB(0x%x) to 0x%x-0x%x\n",
444                 cstRemoteReader->cstWriter->guid.oid,
445                 cstRemoteReader->guid.hid,
446                 cstRemoteReader->guid.aid);
447   if (cstRemoteReader->commStateHB==MUSTSENDHB) {
448     cstRemoteReader->commStateHB=MAYSENDHB;
449   }
450   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
451   d->mbSend.cdrStream.bufferPtr+=len;
452   d->mbSend.cdrStream.length+=len;
453   debug(52,10) ("CSTWriterSendTimer: finished\n");
454   return 0;
455 }