]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
RTNet and MinGW compilation correction
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
25                        CSTPublications, CSTWriter, GUID_RTPS,
26                        cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
28                        CSTWriter, CSTRemoteReader, GUID_RTPS,
29                        cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
32                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(51,10) ("CSTWriterInit: start\n");
40   //init values of cstwriter
41   cstWriter->guid.hid=object->objectEntryHID->hid;
42   cstWriter->guid.aid=object->objectEntryAID->aid;
43   cstWriter->guid.oid=oid;
44   cstWriter->objectEntryOID=object;
45   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46   cstWriter->strictReliableCounter=0;
47   cstWriter->bestEffortsCounter=0;
48   cstWriter->csChangesCounter=0;
49   cstWriter->cstRemoteReaderCounter=0;
50   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52   CSTWriterCSChange_init_head(cstWriter);
53   CSTRemoteReader_init_root_field(cstWriter);
54   pthread_rwlock_init(&cstWriter->lock,NULL);
55   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
56   cstWriter->domain=d;
57   cstWriter->typeRegister=typeRegister;
58   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59     pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
60     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
61     cstWriter->condValueCSChangeDestroyed=0;
62   }
63   //add event for refresh
64   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
65     CSTWriterRefreshTimer(d,(void*)cstWriter);
66   }
67   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
68                 cstWriter->guid.hid,
69                 cstWriter->guid.aid,
70                 cstWriter->guid.oid);
71   debug(51,10) ("CSTWriterInit: finished\n");
72 }
73
74 /*****************************************************************************/
75 void 
76 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
77   CSTRemoteReader       *cstRemoteReader;
78   CSChange              *csChange;
79
80   debug(51,10) ("CSTWriterDelete: start\n");
81   
82   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
83                 cstWriter->guid.hid,
84                 cstWriter->guid.aid,
85                 cstWriter->guid.oid);
86   //Destroy all cstRemoteReader connected on cstWriter
87   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
88     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
89   }
90   //Destroy all csChnages connected on cstWriter
91   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
92     parameterDelete(csChange);
93     FREE(csChange);
94   }
95   eventDetach(d,
96       cstWriter->objectEntryOID->objectEntryAID,
97       &cstWriter->refreshPeriodTimer,
98       0);
99   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
100     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
101     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
102   }
103   pthread_rwlock_destroy(&cstWriter->lock);
104   debug(51,10) ("CSTWriterDelete: finished\n");
105 }
106
107 /*****************************************************************************/
108 void
109 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
110                          ObjectId oid) {
111   CSTRemoteReader     *cstRemoteReader;
112   CSChangeForReader   *csChangeForReader;
113   CSChange            *csChange=NULL;
114   
115   cstWriter->cstRemoteReaderCounter++;
116   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
117   cstRemoteReader->guid.hid=object->objectEntryHID->hid;
118   cstRemoteReader->guid.aid=object->objectEntryAID->aid;
119   cstRemoteReader->guid.oid=oid;
120   cstRemoteReader->objectEntryOID=object;
121   cstRemoteReader->cstWriter=cstWriter;
122   CSChangeForReader_init_root_field(cstRemoteReader);
123   cstRemoteReader->commStateHB=MAYSENDHB;
124   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
125   cstRemoteReader->HBRetriesCounter=0;
126   cstRemoteReader->csChangesCounter=0;
127   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
128   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
129   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
130   //insert remote reader 
131   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
132   //copy all csChanges (not for publication)
133   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
134     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
135       csChange->remoteReaderCount++;  
136       cstRemoteReader->csChangesCounter++;
137       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
138       csChangeForReader->commStateChFReader=TOSEND;
139       csChangeForReader->csChange=csChange;
140       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
141       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
142       cstRemoteReader->commStateSend=MUSTSENDDATA;
143     }
144     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
145       eventAdd(d,
146           cstRemoteReader->objectEntryOID->objectEntryAID,
147           &cstRemoteReader->delayResponceTimer,
148           1,   
149           "CSTWriterSendTimer",
150           CSTWriterSendTimer,
151           &cstRemoteReader->cstWriter->lock,
152           cstRemoteReader,
153           &cstRemoteReader->cstWriter->params.delayResponceTime);               
154     }
155   } else {
156     //Publication
157     ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
158     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
159       cstWriter->strictReliableCounter++;
160     else {
161       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
162         cstWriter->bestEffortsCounter++;
163     }
164   }
165   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
166                 cstRemoteReader->guid.hid,
167                 cstRemoteReader->guid.aid,
168                 cstRemoteReader->guid.oid);
169 }
170
171 /*****************************************************************************/
172 void 
173 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
174   CSChangeForReader   *csChangeForReader;
175   
176   if (!cstRemoteReader) return;
177   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
178   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
179                 cstRemoteReader->guid.hid,
180                 cstRemoteReader->guid.aid,
181                 cstRemoteReader->guid.oid);
182   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
183     ORTESubsProp *sp;
184     sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
185     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
186       cstRemoteReader->cstWriter->strictReliableCounter--;
187     else {
188       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
189         cstRemoteReader->cstWriter->bestEffortsCounter--;
190     }
191   }  
192   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
193     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
194         csChangeForReader,ORTE_TRUE);
195   }
196   eventDetach(d,
197       cstRemoteReader->objectEntryOID->objectEntryAID,
198       &cstRemoteReader->delayResponceTimer,
199       1);   //metatraffic timer
200   eventDetach(d,
201       cstRemoteReader->objectEntryOID->objectEntryAID,
202       &cstRemoteReader->delayResponceTimer,
203       2);   //userdata timer
204   eventDetach(d,
205       cstRemoteReader->objectEntryOID->objectEntryAID,
206       &cstRemoteReader->repeatAnnounceTimer,
207       1);   //metatraffic timer
208   eventDetach(d,
209       cstRemoteReader->objectEntryOID->objectEntryAID,
210       &cstRemoteReader->repeatAnnounceTimer,
211       2);   //userdata timer
212   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
213   FREE(cstRemoteReader);
214 }
215
216 /*****************************************************************************/
217 void
218 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
219   CSChange            *csChange,*csChange1;
220
221   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
222     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
223         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
224       //VAR->GAP - inc gap_sn_no
225       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
226       parameterDelete(csChange);
227       //is Gap in prior or next position?
228       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
229       if (csChange1) {
230         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
231           SeqNumberAdd(csChange1->gapSN,
232                        csChange1->gapSN,
233                        csChange->gapSN);
234           CSTWriterDestroyCSChange(d,cstWriter,csChange);
235           csChange=csChange1;
236         }
237       }
238       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
239       if (csChange1) {
240         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
241           SeqNumberAdd(csChange->gapSN,
242                        csChange->gapSN,
243                        csChange1->gapSN);
244           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
245         }
246       }
247       break;
248     }
249   }
250 }
251
252 /*****************************************************************************/
253 void
254 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
255   CSChangeForReader   *csChangeForReader;
256   CSTRemoteReader     *cstRemoteReader;
257   CSChange            *csChangeFSN;
258   
259   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
260                cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
261   cstWriter->csChangesCounter++;
262   //look for old cschange
263   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
264     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
265   //insert cschange into database changes
266   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
267   csChange->sn=cstWriter->lastSN;
268   SEQUENCE_NUMBER_NONE(csChange->gapSN);
269   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
270   csChange->remoteReaderBest=0;
271   csChange->remoteReaderStrict=0;
272   CSTWriterCSChange_insert(cstWriter,csChange);
273   debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
274                csChange->sn.low);
275   //update FirstSN
276   csChangeFSN=CSTWriterCSChange_first(cstWriter);
277   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
278     //minimal are 2 SNs (GAP,VAR) ...
279 //    CSTWriterDestroyCSChange(cstWriter,csChange);
280   }
281   csChangeFSN=CSTWriterCSChange_first(cstWriter);
282   cstWriter->firstSN=csChangeFSN->sn;
283   //insert new cschange for each reader
284   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
285     //csChangeForReader
286     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
287     csChangeForReader->commStateChFReader=TOSEND;
288     csChangeForReader->csChange=csChange;
289     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
290     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
291     cstRemoteReader->csChangesCounter++;
292     cstRemoteReader->HBRetriesCounter=0;
293     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
294       cstRemoteReader->commStateSend=MUSTSENDDATA;
295       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
296         eventDetach(d,
297             cstRemoteReader->objectEntryOID->objectEntryAID,
298             &cstRemoteReader->delayResponceTimer,
299             1);
300         eventAdd(d,
301             cstRemoteReader->objectEntryOID->objectEntryAID,
302             &cstRemoteReader->delayResponceTimer,
303             1,   
304             "CSTWriterSendTimer",
305             CSTWriterSendTimer,
306             &cstRemoteReader->cstWriter->lock,
307             cstRemoteReader,
308             NULL);
309       } else {
310         ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
311         
312         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
313           //Strict reliable subscription
314           csChange->remoteReaderStrict++;
315           eventDetach(d,
316               cstRemoteReader->objectEntryOID->objectEntryAID,
317               &cstRemoteReader->delayResponceTimer,
318               2);
319           eventAdd(d,
320               cstRemoteReader->objectEntryOID->objectEntryAID,
321               &cstRemoteReader->delayResponceTimer,
322               2,   
323               "CSTWriterSendStrictTimer",
324               CSTWriterSendStrictTimer,
325               &cstRemoteReader->cstWriter->lock,
326               cstRemoteReader,
327               NULL);
328         } else {
329           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
330             //best efforts subscription
331             NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
332             
333             csChange->remoteReaderBest++;
334             NtpTimeAdd(nextIssueTime,
335                       cstRemoteReader->lastSentIssueTime,
336                       sp->minimumSeparation);
337             NtpTimeSub(nextIssueDelay,
338                       nextIssueTime,
339                       actTime);
340             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
341               NTPTIME_ZERO(nextIssueDelay);
342             eventDetach(d,
343                 cstRemoteReader->objectEntryOID->objectEntryAID,
344                 &cstRemoteReader->delayResponceTimer,
345                 2);
346             if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
347               //direct sent issue, for case zero time
348               CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
349             } else {
350               //schedule sent issue (future)
351               eventAdd(d,
352                   cstRemoteReader->objectEntryOID->objectEntryAID,
353                   &cstRemoteReader->delayResponceTimer,
354                   2,   
355                   "CSTWriterSendBestEffortTimer",
356                   CSTWriterSendBestEffortTimer,
357                   &cstRemoteReader->cstWriter->lock,
358                   cstRemoteReader,
359                   &nextIssueDelay);
360             }
361           } else {
362             //!Best_Effort & !Strict_Reliable
363             CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
364               ORTE_TRUE);
365             debug(51,5) ("CSTWriterAddCSChange: destryed\n");
366               
367           }
368         }
369       }
370     }
371     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
372   }
373   debug(51,5) ("CSTWriterAddCSChange: finished\n");
374 }
375
376 /*****************************************************************************/
377 void 
378 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
379     CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
380   CSChange *csChange;
381   if (!csChangeForReader) return;
382   csChange=csChangeForReader->csChange;
383   csChange->remoteReaderCount--;  
384   cstRemoteReader->csChangesCounter--;
385   if (!cstRemoteReader->csChangesCounter) {
386     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
387   }
388   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
389     ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
390     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
391         csChange->remoteReaderStrict--;
392     } else {
393       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
394         csChange->remoteReaderBest--;
395       }
396     }
397   }  
398   eventDetach(cstRemoteReader->cstWriter->domain,
399       cstRemoteReader->objectEntryOID->objectEntryAID,
400       &csChangeForReader->waitWhileDataUnderwayTimer,
401       0);
402   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
403   FREE(csChangeForReader);
404   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
405     if (!csChange->remoteReaderCount) {
406       if (destroyCSChange) {
407         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
408             cstRemoteReader->cstWriter,csChange);
409       }
410       pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
411       cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
412       pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
413       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
414       debug(51,5) ("Publication: new queue level (%d)\n",
415                   cstRemoteReader->cstWriter->csChangesCounter);
416     }
417   }
418 }
419
420 /*****************************************************************************/
421 void 
422 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
423   CSTRemoteReader     *cstRemoteReader;
424   CSChangeForReader   *csChangeForReader;
425   CSChange            *csChangeFSN;
426
427   if (!csChange) return;
428   cstWriter->csChangesCounter--;
429   CSTWriterCSChange_delete(cstWriter,csChange);
430   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
431     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
432     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
433         csChangeForReader,ORTE_FALSE);
434   }
435   if (csChange->cdrStream.buffer)
436     FREE(csChange->cdrStream.buffer);
437   parameterDelete(csChange);
438   FREE(csChange);
439   //update first SN
440   csChangeFSN=CSTWriterCSChange_first(cstWriter);
441   if (csChangeFSN)
442     cstWriter->firstSN=csChangeFSN->sn;
443   else
444     cstWriter->firstSN=cstWriter->lastSN;
445 }
446
447 /*****************************************************************************/
448 Boolean
449 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
450   CSChange *csChange;
451   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
452     if (!csChange->remoteReaderStrict) {
453       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
454       return ORTE_TRUE;
455     }
456   }
457   return ORTE_FALSE;
458 }
459
460 /*****************************************************************************/
461 void
462 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
463   CSChangeForReader   *csChangeForReader;
464   int32_t             timerQueue=1;
465   
466   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
467     timerQueue=2; //userdata timer queue
468   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
469     //refresh only VAR's
470     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
471       csChangeForReader->commStateChFReader=TOSEND;
472       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
473         cstRemoteReader->commStateSend=MUSTSENDDATA;
474         eventDetach(d,
475             cstRemoteReader->objectEntryOID->objectEntryAID,
476             &cstRemoteReader->delayResponceTimer,
477             timerQueue);
478         eventAdd(d,
479             cstRemoteReader->objectEntryOID->objectEntryAID,
480             &cstRemoteReader->delayResponceTimer,
481             timerQueue,  
482             "CSTWriterSendTimer",
483             CSTWriterSendTimer,
484             &cstRemoteReader->cstWriter->lock,
485             cstRemoteReader,
486             &cstRemoteReader->cstWriter->params.delayResponceTime);               
487       }
488     }
489   }
490 }