]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
version 0.2.2 (mac, solaris patch)
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
25                        CSTPublications, CSTWriter, GUID_RTPS,
26                        cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
28                        CSTWriter, CSTRemoteReader, GUID_RTPS,
29                        cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
32                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(51,10) ("CSTWriterInit: start\n");
40   //init values of cstwriter
41   cstWriter->guid.hid=object->objectEntryHID->hid;
42   cstWriter->guid.aid=object->objectEntryAID->aid;
43   cstWriter->guid.oid=oid;
44   cstWriter->objectEntryOID=object;
45   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46   cstWriter->strictReliableCounter=0;
47   cstWriter->bestEffortsCounter=0;
48   cstWriter->csChangesCounter=0;
49   cstWriter->cstRemoteReaderCounter=0;
50   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52   CSTWriterCSChange_init_head(cstWriter);
53   CSTRemoteReader_init_root_field(cstWriter);
54   pthread_rwlock_init(&cstWriter->lock,NULL);
55   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
56   cstWriter->domain=d;
57   cstWriter->typeRegister=typeRegister;
58   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59     pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
60     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
61     cstWriter->condValueCSChangeDestroyed=0;
62   }
63   //add event for refresh
64   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
65     CSTWriterRefreshTimer(d,(void*)cstWriter);
66   }
67   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
68                 cstWriter->guid.hid,
69                 cstWriter->guid.aid,
70                 cstWriter->guid.oid);
71   debug(51,10) ("CSTWriterInit: finished\n");
72 }
73
74 /*****************************************************************************/
75 void 
76 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
77   CSTRemoteReader       *cstRemoteReader;
78   CSChange              *csChange;
79
80   debug(51,10) ("CSTWriterDelete: start\n");
81   
82   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
83                 cstWriter->guid.hid,
84                 cstWriter->guid.aid,
85                 cstWriter->guid.oid);
86   //Destroy all cstRemoteReader connected on cstWriter
87   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
88     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
89   }
90   //Destroy all csChnages connected on cstWriter
91   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
92     parameterDelete(csChange);
93     FREE(csChange);
94   }
95   eventDetach(d,
96       cstWriter->objectEntryOID->objectEntryAID,
97       &cstWriter->refreshPeriodTimer,
98       0);
99   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
100     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
101     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
102   }
103   pthread_rwlock_destroy(&cstWriter->lock);
104   debug(51,10) ("CSTWriterDelete: finished\n");
105 }
106
107 /*****************************************************************************/
108 void
109 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
110                          ObjectId oid) {
111   CSTRemoteReader     *cstRemoteReader;
112   CSChangeForReader   *csChangeForReader;
113   CSChange            *csChange=NULL;
114   
115   cstWriter->cstRemoteReaderCounter++;
116   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
117   cstRemoteReader->guid.hid=object->objectEntryHID->hid;
118   cstRemoteReader->guid.aid=object->objectEntryAID->aid;
119   cstRemoteReader->guid.oid=oid;
120   cstRemoteReader->objectEntryOID=object;
121   cstRemoteReader->cstWriter=cstWriter;
122   CSChangeForReader_init_root_field(cstRemoteReader);
123   cstRemoteReader->commStateHB=MAYSENDHB;
124   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
125   cstRemoteReader->HBRetriesCounter=0;
126   cstRemoteReader->csChangesCounter=0;
127   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
128   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
129   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
130   //insert remote reader 
131   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
132   //copy all csChanges (not for publication)
133   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
134     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
135       csChange->remoteReaderCount++;  
136       cstRemoteReader->csChangesCounter++;
137       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
138       csChangeForReader->commStateChFReader=TOSEND;
139       csChangeForReader->csChange=csChange;
140       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
141       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
142       cstRemoteReader->commStateSend=MUSTSENDDATA;
143     }
144     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
145       eventAdd(d,
146           cstRemoteReader->objectEntryOID->objectEntryAID,
147           &cstRemoteReader->delayResponceTimer,
148           1,   
149           "CSTWriterSendTimer",
150           CSTWriterSendTimer,
151           &cstRemoteReader->cstWriter->lock,
152           cstRemoteReader,
153           &cstRemoteReader->cstWriter->params.delayResponceTime);               
154     }
155   } else {
156     //Publication
157     ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
158     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
159       cstWriter->strictReliableCounter++;
160     else {
161       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
162         cstWriter->bestEffortsCounter++;
163     }
164   }
165   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
166                 cstRemoteReader->guid.hid,
167                 cstRemoteReader->guid.aid,
168                 cstRemoteReader->guid.oid);
169 }
170
171 /*****************************************************************************/
172 void 
173 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
174   CSChangeForReader   *csChangeForReader;
175   
176   if (!cstRemoteReader) return;
177   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
178   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
179                 cstRemoteReader->guid.hid,
180                 cstRemoteReader->guid.aid,
181                 cstRemoteReader->guid.oid);
182   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
183     ORTESubsProp *sp;
184     sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
185     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
186       cstRemoteReader->cstWriter->strictReliableCounter--;
187     else {
188       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
189         cstRemoteReader->cstWriter->bestEffortsCounter--;
190     }
191   }  
192   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
193     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
194         csChangeForReader,ORTE_TRUE);
195   }
196   eventDetach(d,
197       cstRemoteReader->objectEntryOID->objectEntryAID,
198       &cstRemoteReader->delayResponceTimer,
199       1);   //metatraffic timer
200   eventDetach(d,
201       cstRemoteReader->objectEntryOID->objectEntryAID,
202       &cstRemoteReader->delayResponceTimer,
203       2);   //userdata timer
204   eventDetach(d,
205       cstRemoteReader->objectEntryOID->objectEntryAID,
206       &cstRemoteReader->repeatAnnounceTimer,
207       1);   //metatraffic timer
208   eventDetach(d,
209       cstRemoteReader->objectEntryOID->objectEntryAID,
210       &cstRemoteReader->repeatAnnounceTimer,
211       2);   //userdata timer
212   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
213   FREE(cstRemoteReader);
214 }
215
216 /*****************************************************************************/
217 void
218 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
219   CSChange            *csChange,*csChange1;
220
221   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
222     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
223         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
224       //VAR->GAP - inc gap_sn_no
225       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
226       parameterDelete(csChange);
227       //is Gap in prior or next position?
228       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
229       if (csChange1) {
230         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
231           SeqNumberAdd(csChange1->gapSN,
232                        csChange1->gapSN,
233                        csChange->gapSN);
234           CSTWriterDestroyCSChange(d,cstWriter,csChange);
235           csChange=csChange1;
236         }
237       }
238       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
239       if (csChange1) {
240         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
241           SeqNumberAdd(csChange->gapSN,
242                        csChange->gapSN,
243                        csChange1->gapSN);
244           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
245         }
246       }
247       break;
248     }
249   }
250 }
251
252 /*****************************************************************************/
253 void
254 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
255   CSChangeForReader   *csChangeForReader;
256   CSTRemoteReader     *cstRemoteReader;
257   CSChange            *csChangeFSN;
258   
259   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
260                cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
261   cstWriter->csChangesCounter++;
262   //look for old cschange
263   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
264     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
265   //insert cschange into database changes
266   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
267   csChange->sn=cstWriter->lastSN;
268   SEQUENCE_NUMBER_NONE(csChange->gapSN);
269   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
270   csChange->remoteReaderBest=0;
271   csChange->remoteReaderStrict=0;
272   CSTWriterCSChange_insert(cstWriter,csChange);
273   //update FirstSN
274   csChangeFSN=CSTWriterCSChange_first(cstWriter);
275   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
276     //minimal are 2 SNs (GAP,VAR) ...
277 //    CSTWriterDestroyCSChange(cstWriter,csChange);
278   }
279   csChangeFSN=CSTWriterCSChange_first(cstWriter);
280   cstWriter->firstSN=csChangeFSN->sn;
281   //insert new cschange for each reader
282   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
283     //csChangeForReader
284     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
285     csChangeForReader->commStateChFReader=TOSEND;
286     csChangeForReader->csChange=csChange;
287     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
288     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
289     cstRemoteReader->csChangesCounter++;
290     cstRemoteReader->HBRetriesCounter=0;
291     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
292       cstRemoteReader->commStateSend=MUSTSENDDATA;
293       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
294         eventDetach(d,
295             cstRemoteReader->objectEntryOID->objectEntryAID,
296             &cstRemoteReader->delayResponceTimer,
297             1);
298         eventAdd(d,
299             cstRemoteReader->objectEntryOID->objectEntryAID,
300             &cstRemoteReader->delayResponceTimer,
301             1,   
302             "CSTWriterSendTimer",
303             CSTWriterSendTimer,
304             &cstRemoteReader->cstWriter->lock,
305             cstRemoteReader,
306             NULL);
307       } else {
308         ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
309         
310         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
311           //Strict reliable subscription
312           csChange->remoteReaderStrict++;
313           eventDetach(d,
314               cstRemoteReader->objectEntryOID->objectEntryAID,
315               &cstRemoteReader->delayResponceTimer,
316               2);
317           eventAdd(d,
318               cstRemoteReader->objectEntryOID->objectEntryAID,
319               &cstRemoteReader->delayResponceTimer,
320               2,   
321               "CSTWriterSendStrictTimer",
322               CSTWriterSendStrictTimer,
323               &cstRemoteReader->cstWriter->lock,
324               cstRemoteReader,
325               NULL);
326         } else {
327           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
328             //best efforts subscription
329             NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
330             
331             csChange->remoteReaderBest++;
332             NtpTimeAdd(nextIssueTime,
333                       cstRemoteReader->lastSentIssueTime,
334                       sp->minimumSeparation);
335             NtpTimeSub(nextIssueDelay,
336                       nextIssueTime,
337                       actTime);
338             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
339               NTPTIME_ZERO(nextIssueDelay);
340             eventDetach(d,
341                 cstRemoteReader->objectEntryOID->objectEntryAID,
342                 &cstRemoteReader->delayResponceTimer,
343                 2);
344             if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
345               //direct sent issue, for case zero time
346               CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
347             } else {
348               //schedule sent issue (future)
349               eventAdd(d,
350                   cstRemoteReader->objectEntryOID->objectEntryAID,
351                   &cstRemoteReader->delayResponceTimer,
352                   2,   
353                   "CSTWriterSendBestEffortTimer",
354                   CSTWriterSendBestEffortTimer,
355                   &cstRemoteReader->cstWriter->lock,
356                   cstRemoteReader,
357                   &nextIssueDelay);
358             }
359           } else {
360             //!Best_Effort & !Strict_Reliable
361             CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
362               ORTE_TRUE);
363             debug(51,5) ("CSTWriterAddCSChange: destryed\n");
364               
365           }
366         }
367       }
368     }
369     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
370   }
371   debug(51,5) ("CSTWriterAddCSChange: finished\n");
372 }
373
374 /*****************************************************************************/
375 void 
376 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
377     CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
378   CSChange *csChange;
379   if (!csChangeForReader) return;
380   csChange=csChangeForReader->csChange;
381   csChange->remoteReaderCount--;  
382   cstRemoteReader->csChangesCounter--;
383   if (!cstRemoteReader->csChangesCounter) {
384     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
385   }
386   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
387     ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
388     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
389         csChange->remoteReaderStrict--;
390     } else {
391       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
392         csChange->remoteReaderBest--;
393       }
394     }
395   }  
396   eventDetach(cstRemoteReader->cstWriter->domain,
397       cstRemoteReader->objectEntryOID->objectEntryAID,
398       &csChangeForReader->waitWhileDataUnderwayTimer,
399       0);
400   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
401   FREE(csChangeForReader);
402   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
403     if (!csChange->remoteReaderCount) {
404       if (destroyCSChange) {
405         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
406             cstRemoteReader->cstWriter,csChange);
407       }
408       pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
409       cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
410       pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
411       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
412       debug(51,5) ("Publication: new queue level (%d)\n",
413                   cstRemoteReader->cstWriter->csChangesCounter);
414     }
415   }
416 }
417
418 /*****************************************************************************/
419 void 
420 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
421   CSTRemoteReader     *cstRemoteReader;
422   CSChangeForReader   *csChangeForReader;
423   CSChange            *csChangeFSN;
424
425   if (!csChange) return;
426   cstWriter->csChangesCounter--;
427   CSTWriterCSChange_delete(cstWriter,csChange);
428   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
429     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
430     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
431         csChangeForReader,ORTE_FALSE);
432   }
433   if (csChange->cdrStream.buffer)
434     FREE(csChange->cdrStream.buffer);
435   parameterDelete(csChange);
436   FREE(csChange);
437   //update first SN
438   csChangeFSN=CSTWriterCSChange_first(cstWriter);
439   if (csChangeFSN)
440     cstWriter->firstSN=csChangeFSN->sn;
441   else
442     cstWriter->firstSN=cstWriter->lastSN;
443 }
444
445 /*****************************************************************************/
446 Boolean
447 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
448   CSChange *csChange;
449   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
450     if (!csChange->remoteReaderStrict) {
451       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
452       return ORTE_TRUE;
453     }
454   }
455   return ORTE_FALSE;
456 }
457
458 /*****************************************************************************/
459 void
460 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
461   CSChangeForReader   *csChangeForReader;
462   int32_t             timerQueue=1;
463   
464   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
465     timerQueue=2; //userdata timer queue
466   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
467     //refresh only VAR's
468     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
469       csChangeForReader->commStateChFReader=TOSEND;
470       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
471         cstRemoteReader->commStateSend=MUSTSENDDATA;
472         eventDetach(d,
473             cstRemoteReader->objectEntryOID->objectEntryAID,
474             &cstRemoteReader->delayResponceTimer,
475             timerQueue);
476         eventAdd(d,
477             cstRemoteReader->objectEntryOID->objectEntryAID,
478             &cstRemoteReader->delayResponceTimer,
479             timerQueue,  
480             "CSTWriterSendTimer",
481             CSTWriterSendTimer,
482             &cstRemoteReader->cstWriter->lock,
483             cstRemoteReader,
484             &cstRemoteReader->cstWriter->params.delayResponceTime);               
485       }
486     }
487   }
488 }