]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriterTimer.c
new version 0.2.3
[orte.git] / orte / liborte / RTPSCSTWriterTimer.c
1 /*
2  *  $Id: RTPSCSTWriterTimer.c,v 0.0.0.1 2003/10/19 
3  *
4  *  DEBUG:  section 52                  CSTWriter timer functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 int 
26 CSTWriterRefreshTimer(ORTEDomain *d,void *vcstWriter) {
27   CSTWriter *cstWriter=(CSTWriter*)vcstWriter;
28   CSTRemoteReader *cstRemoteReader;
29   
30   debug(52,10) ("CSTWriterRefreshTimer: start\n");
31   
32   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
33     CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
34   }
35   eventDetach(d,
36       cstWriter->objectEntryOID->objectEntryAID,
37       &cstWriter->refreshPeriodTimer,
38       0);   //common timer
39   eventAdd(d,
40       cstWriter->objectEntryOID->objectEntryAID,
41       &cstWriter->refreshPeriodTimer,
42       0,   //common timer
43       "CSTWriterRefreshTimer",
44       CSTWriterRefreshTimer,
45       &cstWriter->lock,
46       cstWriter,
47       &cstWriter->params.refreshPeriod);               
48   debug(52,10) ("CSTWriterRefreshTimer: finished\n");
49   return 0;
50 }
51
52 /*****************************************************************************/
53 int 
54 CSTWriterAnnounceTimer(ORTEDomain *d,void *vcstRemoteReader) {
55   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
56
57   debug(52,10) ("CSTWriterAnnounceTimer: start\n");
58   if ((cstRemoteReader->commStateHB==MAYSENDHB) &&
59       ((!cstRemoteReader->cstWriter->params.fullAcknowledge))) {// ||
60 //       (cstRemoteReader->unacknowledgedCounter))) {
61     //create HB
62     int len=RTPSHeardBeatCreate(
63         d->mbSend.cdrStream.bufferPtr,
64         getMaxMessageLength(d),
65         &cstRemoteReader->cstWriter->firstSN,
66         &cstRemoteReader->cstWriter->lastSN,
67         cstRemoteReader->cstWriter->guid.oid,
68         OID_UNKNOWN,
69         ORTE_FALSE);
70     if (len<0) {
71       //not enought space in sending buffer
72       d->mbSend.needSend=ORTE_TRUE;
73       return 1;
74     }
75     d->mbSend.cdrStream.bufferPtr+=len;
76     d->mbSend.cdrStream.length+=len;
77     debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
78                   cstRemoteReader->cstWriter->guid.oid,
79                   cstRemoteReader->guid.hid,
80                   cstRemoteReader->guid.aid);
81   }
82   eventDetach(d,
83       cstRemoteReader->objectEntryOID->objectEntryAID,
84       &cstRemoteReader->repeatAnnounceTimer,
85       1);
86   eventAdd(d,
87       cstRemoteReader->objectEntryOID->objectEntryAID,
88       &cstRemoteReader->repeatAnnounceTimer,
89       1,   //metatraffic timer
90       "CSTWriterAnnounceTimer",
91       CSTWriterAnnounceTimer,
92       &cstRemoteReader->cstWriter->lock,
93       cstRemoteReader,
94       &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
95   debug(52,10) ("CSTWriterAnnounceTimer: finished\n");
96   return 0;
97 }
98
99 /*****************************************************************************/
100 int 
101 CSTWriterAnnounceIssueTimer(ORTEDomain *d,void *vcstRemoteReader) {
102   CSTRemoteReader *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
103   NtpTime         nextHB;
104   ORTEPublProp    *pp;
105   int             len;
106
107   debug(52,10) ("CSTWriterAnnounceIssueTimer: start\n");
108   pp=(ORTEPublProp*)cstRemoteReader->cstWriter->objectEntryOID->attributes;
109   //create HB
110   d->mbSend.cdrStreamDirect=NULL;
111   len=RTPSHeardBeatCreate(
112       d->mbSend.cdrStream.bufferPtr,
113       getMaxMessageLength(d),
114       &cstRemoteReader->cstWriter->firstSN,
115       &cstRemoteReader->cstWriter->lastSN,
116       cstRemoteReader->cstWriter->guid.oid,
117       OID_UNKNOWN,
118       ORTE_FALSE);
119   if (len<0) {
120     //not enought space in sending buffer
121     d->mbSend.needSend=ORTE_TRUE;
122     return 1;
123   }
124   d->mbSend.cdrStream.bufferPtr+=len;
125   d->mbSend.cdrStream.length+=len;
126   debug(52,3) ("sent: RTPS_HBF(0x%x) to 0x%x-0x%x\n",
127                 cstRemoteReader->cstWriter->guid.oid,
128                 cstRemoteReader->guid.hid,
129                 cstRemoteReader->guid.aid);
130   //next HB
131   if (cstRemoteReader->cstWriter->csChangesCounter>=pp->criticalQueueLevel) {
132     nextHB=pp->HBCQLRate;
133   } else {
134     nextHB=pp->HBNornalRate;
135   }
136   cstRemoteReader->HBRetriesCounter++;
137   eventDetach(d,
138       cstRemoteReader->objectEntryOID->objectEntryAID,
139       &cstRemoteReader->repeatAnnounceTimer,
140       2);
141   if (cstRemoteReader->HBRetriesCounter<pp->HBMaxRetries) {              
142     eventAdd(d,
143         cstRemoteReader->objectEntryOID->objectEntryAID,
144         &cstRemoteReader->repeatAnnounceTimer,
145         2,   //metatraffic timer
146         "CSTWriterAnnounceIssueTimer",
147         CSTWriterAnnounceIssueTimer,
148         &cstRemoteReader->cstWriter->lock,
149         cstRemoteReader,
150         &nextHB);
151   } else {
152     //destroy all csChangesForReader
153     CSChangeForReader *csChangeForReader;
154     while ((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
155       CSTWriterDestroyCSChangeForReader(cstRemoteReader,
156           csChangeForReader,ORTE_TRUE);
157     }
158     debug(52,3) ("CSTWriterAnnounceIssueTimer: HB RR(0x%x-0x%x) ritch MaxRetries\n",
159                   cstRemoteReader->guid.hid,cstRemoteReader->guid.aid);
160   }
161   debug(52,10) ("CSTWriterAnnounceIssueTimer: finished\n");
162   return 0;
163 }
164
165 /**********************************************************************************/
166 int
167 CSChangeForReaderUnderwayTimer(ORTEDomain *d,void *vcsChangeForReader) {
168   CSChangeForReader *csChangeForReader=(CSChangeForReader*)vcsChangeForReader;
169   csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
170   return 0;
171 }
172
173 /**********************************************************************************/
174 int
175 CSTWriterSendBestEffortTimer(ORTEDomain *d,void *vcstRemoteReader) {
176   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
177   ORTESubsProp      *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
178   CSChangeForReader *csChangeForReader=NULL;
179         
180   debug(52,10) ("CSTWriterSendBestEffortTimer: start\n");
181   d->mbSend.cdrStreamDirect=NULL;
182   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
183     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
184       if (csChangeForReader->commStateChFReader==TOSEND) {
185         CSChange *csChange=csChangeForReader->csChange;
186         csChangeForReader->commStateChFReader=UNDERWAY;
187         cstRemoteReader->commStateSend=MUSTSENDDATA;
188         cstRemoteReader->lastSentIssueTime=getActualNtpTime();
189         d->mbSend.cdrStreamDirect=&csChange->cdrStream;
190         debug(52,3) ("sent: RTPS_ISSUE_BEST(0x%x) to 0x%x-0x%x\n",
191                     cstRemoteReader->cstWriter->guid.oid,
192                     cstRemoteReader->guid.hid,
193                     cstRemoteReader->guid.aid);
194         ORTESendData(d,
195             cstRemoteReader->objectEntryOID->objectEntryAID,
196             ORTE_FALSE);
197         CSTWriterDestroyCSChangeForReader(cstRemoteReader,
198             csChangeForReader,ORTE_TRUE);
199         eventDetach(d,
200             cstRemoteReader->objectEntryOID->objectEntryAID,
201             &cstRemoteReader->delayResponceTimer,
202             2);   
203         //when is no csChange -> break processing 
204         if (cstRemoteReader->cstWriter->csChangesCounter==0) 
205           break;
206         eventAdd(d,
207             cstRemoteReader->objectEntryOID->objectEntryAID,
208             &cstRemoteReader->delayResponceTimer,
209             2,   
210             "CSTWriterSendBestEffortTimer",
211             CSTWriterSendBestEffortTimer,
212             &cstRemoteReader->cstWriter->lock,
213             cstRemoteReader,
214             &sp->minimumSeparation);
215         return 0;
216       }
217     }
218   }
219   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
220   debug(52,10) ("CSTWriterSendBestEffortTimer: finished\n");
221   return 0;
222 }
223
224 /**********************************************************************************/
225 int
226 CSTWriterSendStrictTimer(ORTEDomain *d,void *vcstRemoteReader) {
227   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
228   CSChangeForReader *csChangeForReader=NULL;
229   int               max_msg_len,len;
230   CSChange          *csChange;
231   Boolean           firstTrace=ORTE_TRUE;
232   
233   debug(52,10) ("CSTWriterSendStrictTimer: start\n");
234   max_msg_len=getMaxMessageLength(d);
235   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
236     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
237       csChange=csChangeForReader->csChange;
238       if (csChangeForReader->commStateChFReader==TOSEND) {
239         cstRemoteReader->commStateSend=MUSTSENDDATA;
240         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
241             !d->mbSend.containsInfoReply) {
242           firstTrace=ORTE_FALSE;
243           len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
244               IPADDRESS_INVALID,
245               ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->userdataUnicastPort);
246           if (len<0) {
247             d->mbSend.needSend=ORTE_TRUE;
248             return 1;
249           }
250           d->mbSend.containsInfoReply=ORTE_TRUE;  
251           d->mbSend.cdrStream.bufferPtr+=len;
252           d->mbSend.cdrStream.length+=len;
253           max_msg_len-=len;
254           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
255                        cstRemoteReader->cstWriter->guid.oid,
256                        cstRemoteReader->guid.hid,
257                        cstRemoteReader->guid.aid);
258         }
259         len=20+cstRemoteReader->cstWriter->typeRegister->getMaxSize;
260         if (max_msg_len<len) {
261           d->mbSend.needSend=ORTE_TRUE;
262           return 1;
263         }
264         memcpy(d->mbSend.cdrStream.bufferPtr,     //dest
265 //               csChange->cdrStream.bufferPtr-len, //src
266                csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12, //src
267                len);                              //length
268         d->mbSend.cdrStream.bufferPtr+=len;
269         d->mbSend.cdrStream.length+=len;
270         max_msg_len-=len;
271         debug(52,3) ("sent: RTPS_ISSUE_STRICT(0x%x) to 0x%x-0x%x\n",
272                     cstRemoteReader->cstWriter->guid.oid,
273                     cstRemoteReader->guid.hid,
274                     cstRemoteReader->guid.aid);
275       }
276     }
277   }
278   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
279   debug(52,10) ("CSTWriterSendStrictTimer: finished\n");
280   //add HeardBeat  
281   return CSTWriterAnnounceIssueTimer(d,cstRemoteReader);
282 }
283
284 /**********************************************************************************/
285 int
286 CSTWriterSendTimer(ORTEDomain *d,void *vcstRemoteReader) {
287   CSTRemoteReader   *cstRemoteReader=(CSTRemoteReader*)vcstRemoteReader;
288   CSChangeForReader *csChangeForReader=NULL;
289   unsigned int      max_msg_len;
290   int               len,off;
291   Boolean           firstTrace=ORTE_TRUE,f_bit=ORTE_TRUE;
292   
293   debug(52,10) ("CSTWriterSendTimer: start\n");
294   max_msg_len=getMaxMessageLength(d);
295   //setup f_bit of object
296   if (cstRemoteReader->cstWriter->params.fullAcknowledge)
297     f_bit=ORTE_FALSE;
298   if (cstRemoteReader->commStateSend!=NOTHNIGTOSEND) {
299     gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
300       if (csChangeForReader->commStateChFReader==TOSEND) {
301         cstRemoteReader->commStateSend=MUSTSENDDATA;
302         if ((firstTrace) && (cstRemoteReader->cstWriter->params.fullAcknowledge) &&
303             !d->mbSend.containsInfoReply) {
304           firstTrace=ORTE_FALSE;
305           len=RTPSInfoREPLYCreate(d->mbSend.cdrStream.bufferPtr,max_msg_len,
306               IPADDRESS_INVALID,
307               ((AppParams*)cstRemoteReader->cstWriter->objectEntryOID->attributes)->metatrafficUnicastPort);
308           if (len<0) {
309             d->mbSend.needSend=ORTE_TRUE;
310             return 1;
311           }
312           d->mbSend.containsInfoReply=ORTE_TRUE;  
313           d->mbSend.cdrStream.bufferPtr+=len;
314           d->mbSend.cdrStream.length+=len;
315           max_msg_len-=len;
316           debug(52,3) ("sent: RTPS_InfoREPLY(0x%x) to 0x%x-0x%x\n",
317                        cstRemoteReader->cstWriter->guid.oid,
318                        cstRemoteReader->guid.hid,
319                        cstRemoteReader->guid.aid);
320         }
321         if (max_msg_len<32) {
322           d->mbSend.needSend=ORTE_TRUE;
323           return 1;
324         }
325         off=0;
326         //VAR ???
327         if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
328           debug(52,3) ("sent: RTPS_VAR(0x%x) to 0x%x-0x%x\n",
329                        cstRemoteReader->cstWriter->guid.oid,
330                        cstRemoteReader->guid.hid,
331                        cstRemoteReader->guid.aid);
332           len=32;
333           d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)VAR;
334           d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
335           if (csChangeForReader->csChange->alive) 
336             d->mbSend.cdrStream.bufferPtr[1]|=4;
337           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
338           conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
339           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
340             cstRemoteReader->cstWriter->guid.oid;
341           conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
342           if (csChangeForReader->csChange->guid.oid==OID_APP) {
343             d->mbSend.cdrStream.bufferPtr[1]|=8;
344             *((HostId*)(d->mbSend.cdrStream.bufferPtr+12))=
345               csChangeForReader->csChange->guid.hid;
346             conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+12),0);
347             *((AppId*)(d->mbSend.cdrStream.bufferPtr+16))=
348               csChangeForReader->csChange->guid.aid;
349             conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+16),0);
350           } else {
351             len-=8;
352             off=-8;
353           }
354           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+20+off))=
355             csChangeForReader->csChange->guid.oid;
356           conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+20+off),0);
357           *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+24+off))=
358             csChangeForReader->csChange->sn;
359           if (!CSChangeAttributes_is_empty(csChangeForReader->csChange)) {
360             int plen;
361             plen=parameterCodeStreamFromCSChange(csChangeForReader->csChange,
362                  d->mbSend.cdrStream.bufferPtr+32+off,max_msg_len-len);
363             if (plen<0) {
364               d->mbSend.needSend=ORTE_TRUE;
365               return 1;
366             }
367             d->mbSend.cdrStream.bufferPtr[1]|=2;
368             len+=plen;
369           }
370         } else {  //GAP ???
371           debug(52,3) ("sent: RTPS_GAP(0x%x) to 0x%x-0x%x\n",
372                        cstRemoteReader->cstWriter->guid.oid,
373                        cstRemoteReader->guid.hid,
374                        cstRemoteReader->guid.aid);
375           len=32;
376           d->mbSend.cdrStream.bufferPtr[0]=(uint8_t)GAP;
377           d->mbSend.cdrStream.bufferPtr[1]=ORTE_MY_MBO;
378           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+4))=OID_UNKNOWN;
379           conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+4),0);
380           *((ObjectId*)(d->mbSend.cdrStream.bufferPtr+8))=
381             cstRemoteReader->cstWriter->guid.oid;
382           conv_u32((uint32_t*)(d->mbSend.cdrStream.bufferPtr+8),0);
383           *((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12))=
384             csChangeForReader->csChange->sn;
385           conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+12),ORTE_MY_MBO);
386           SeqNumberAdd(*((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20)),  
387                        csChangeForReader->csChange->sn,
388                        csChangeForReader->csChange->gapSN);
389           conv_sn((SequenceNumber*)(d->mbSend.cdrStream.bufferPtr+20),ORTE_MY_MBO);
390           *((uint32_t*)(d->mbSend.cdrStream.bufferPtr+28))=0;    //NumBits 
391         }
392         *((ParameterLength*)(d->mbSend.cdrStream.bufferPtr+2))=len-4; 
393         d->mbSend.cdrStream.bufferPtr+=len;
394         d->mbSend.cdrStream.length+=len;
395         max_msg_len-=len;
396         //setup new state for csChangeForReader
397         if (NtpTimeCmp(zNtpTime,
398                 cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
399           csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
400         } else {
401           csChangeForReader->commStateChFReader=UNDERWAY;
402           eventDetach(d,
403               cstRemoteReader->objectEntryOID->objectEntryAID,
404               &csChangeForReader->waitWhileDataUnderwayTimer,
405               0);
406           eventAdd(d,
407               cstRemoteReader->objectEntryOID->objectEntryAID,
408               &csChangeForReader->waitWhileDataUnderwayTimer,
409               0,   //common timer
410               "CSChangeForReaderUnderwayTimer",
411               CSChangeForReaderUnderwayTimer,
412               &cstRemoteReader->cstWriter->lock,
413               csChangeForReader,
414               &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
415         }
416       }
417     }
418   }
419   //add HeardBeat
420   len=RTPSHeardBeatCreate(
421       d->mbSend.cdrStream.bufferPtr,max_msg_len,
422       &cstRemoteReader->cstWriter->firstSN,
423       &cstRemoteReader->cstWriter->lastSN,
424       cstRemoteReader->cstWriter->guid.oid,
425       OID_UNKNOWN,
426       f_bit);
427   if (len<0) {
428     d->mbSend.needSend=ORTE_TRUE;
429     return 1;
430   } else {
431     //schedule new time for Announce timer
432     eventDetach(d,
433          cstRemoteReader->objectEntryOID->objectEntryAID,
434          &cstRemoteReader->repeatAnnounceTimer,
435          1);
436     eventAdd(d,
437          cstRemoteReader->objectEntryOID->objectEntryAID,
438          &cstRemoteReader->repeatAnnounceTimer,
439          1,   //metatraffic timer
440          "CSTWriterAnnounceTimer",
441          CSTWriterAnnounceTimer,
442          &cstRemoteReader->cstWriter->lock,
443          cstRemoteReader,
444          &cstRemoteReader->cstWriter->params.repeatAnnounceTime);
445   }
446   debug(52,3) ("sent: RTPS_HB(0x%x) to 0x%x-0x%x\n",
447                 cstRemoteReader->cstWriter->guid.oid,
448                 cstRemoteReader->guid.hid,
449                 cstRemoteReader->guid.aid);
450   if (cstRemoteReader->commStateHB==MUSTSENDHB) {
451     cstRemoteReader->commStateHB=MAYSENDHB;
452   }
453   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
454   d->mbSend.cdrStream.bufferPtr+=len;
455   d->mbSend.cdrStream.length+=len;
456   debug(52,10) ("CSTWriterSendTimer: finished\n");
457   return 0;
458 }