]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
OCERA SF CVS tree of ORTE framework updated to
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
25                        CSTPublications, CSTWriter, GUID_RTPS,
26                        cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
28                        CSTWriter, CSTRemoteReader, GUID_RTPS,
29                        cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
32                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(51,10) ("CSTWriterInit: start\n");
40   //init values of cstwriter
41   cstWriter->guid.hid=object->objectEntryHID->hid;
42   cstWriter->guid.aid=object->objectEntryAID->aid;
43   cstWriter->guid.oid=oid;
44   cstWriter->objectEntryOID=object;
45   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46   cstWriter->strictReliableCounter=0;
47   cstWriter->bestEffortsCounter=0;
48   cstWriter->csChangesCounter=0;
49   cstWriter->cstRemoteReaderCounter=0;
50   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52   CSTWriterCSChange_init_head(cstWriter);
53   CSTRemoteReader_init_root_field(cstWriter);
54   pthread_rwlock_init(&cstWriter->lock,NULL);
55   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
56   cstWriter->domain=d;
57   cstWriter->typeRegister=typeRegister;
58   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59     sem_init(&cstWriter->semCSChangeDestroyed, 0, 0);
60   }
61   //add event for refresh
62   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
63     CSTWriterRefreshTimer(d,(void*)cstWriter);
64   }
65   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
66                 cstWriter->guid.hid,
67                 cstWriter->guid.aid,
68                 cstWriter->guid.oid);
69   debug(51,10) ("CSTWriterInit: finished\n");
70 }
71
72 /*****************************************************************************/
73 void 
74 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
75   CSTRemoteReader       *cstRemoteReader;
76   CSChange              *csChange;
77
78   debug(51,10) ("CSTWriterDelete: start\n");
79   
80   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
81                 cstWriter->guid.hid,
82                 cstWriter->guid.aid,
83                 cstWriter->guid.oid);
84   //Destroy all cstRemoteReader connected on cstWriter
85   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
86     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
87   }
88   //Destroy all csChnages connected on cstWriter
89   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
90     parameterDelete(csChange);
91     FREE(csChange);
92   }
93   eventDetach(d,
94       cstWriter->objectEntryOID->objectEntryAID,
95       &cstWriter->refreshPeriodTimer,
96       0);
97   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
98     sem_destroy(&cstWriter->semCSChangeDestroyed);
99   }
100   pthread_rwlock_destroy(&cstWriter->lock);
101   debug(51,10) ("CSTWriterDelete: finished\n");
102 }
103
104 /*****************************************************************************/
105 void
106 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
107                          ObjectId oid) {
108   CSTRemoteReader     *cstRemoteReader;
109   CSChangeForReader   *csChangeForReader;
110   CSChange            *csChange=NULL;
111   
112   cstWriter->cstRemoteReaderCounter++;
113   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
114   cstRemoteReader->guid.hid=object->objectEntryHID->hid;
115   cstRemoteReader->guid.aid=object->objectEntryAID->aid;
116   cstRemoteReader->guid.oid=oid;
117   cstRemoteReader->objectEntryOID=object;
118   cstRemoteReader->cstWriter=cstWriter;
119   CSChangeForReader_init_root_field(cstRemoteReader);
120   cstRemoteReader->commStateHB=MAYSENDHB;
121   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
122   cstRemoteReader->HBRetriesCounter=0;
123   cstRemoteReader->csChangesCounter=0;
124   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
125   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
126   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
127   //insert remote reader 
128   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
129   //copy all csChanges (not for publication)
130   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
131     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
132       csChange->remoteReaderCount++;  
133       cstRemoteReader->csChangesCounter++;
134       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
135       csChangeForReader->commStateChFReader=TOSEND;
136       csChangeForReader->csChange=csChange;
137       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
138       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
139       cstRemoteReader->commStateSend=MUSTSENDDATA;
140     }
141     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
142       eventAdd(d,
143           cstRemoteReader->objectEntryOID->objectEntryAID,
144           &cstRemoteReader->delayResponceTimer,
145           1,   
146           "CSTWriterSendTimer",
147           CSTWriterSendTimer,
148           &cstRemoteReader->cstWriter->lock,
149           cstRemoteReader,
150           &cstRemoteReader->cstWriter->params.delayResponceTime);               
151     }
152   } else {
153     //Publication
154     ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
155     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
156       cstWriter->strictReliableCounter++;
157     else {
158       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
159         cstWriter->bestEffortsCounter++;
160     }
161   }
162   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
163                 cstRemoteReader->guid.hid,
164                 cstRemoteReader->guid.aid,
165                 cstRemoteReader->guid.oid);
166 }
167
168 /*****************************************************************************/
169 void 
170 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
171   CSChangeForReader   *csChangeForReader;
172   
173   if (!cstRemoteReader) return;
174   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
175   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
176                 cstRemoteReader->guid.hid,
177                 cstRemoteReader->guid.aid,
178                 cstRemoteReader->guid.oid);
179   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
180     ORTESubsProp *sp;
181     sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
182     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
183       cstRemoteReader->cstWriter->strictReliableCounter--;
184     else {
185       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
186         cstRemoteReader->cstWriter->bestEffortsCounter--;
187     }
188   }  
189   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
190     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
191         csChangeForReader,ORTE_TRUE);
192   }
193   eventDetach(d,
194       cstRemoteReader->objectEntryOID->objectEntryAID,
195       &cstRemoteReader->delayResponceTimer,
196       1);   //metatraffic timer
197   eventDetach(d,
198       cstRemoteReader->objectEntryOID->objectEntryAID,
199       &cstRemoteReader->delayResponceTimer,
200       2);   //userdata timer
201   eventDetach(d,
202       cstRemoteReader->objectEntryOID->objectEntryAID,
203       &cstRemoteReader->repeatAnnounceTimer,
204       1);   //metatraffic timer
205   eventDetach(d,
206       cstRemoteReader->objectEntryOID->objectEntryAID,
207       &cstRemoteReader->repeatAnnounceTimer,
208       2);   //userdata timer
209   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
210   FREE(cstRemoteReader);
211 }
212
213 /*****************************************************************************/
214 void
215 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
216   CSChange            *csChange,*csChange1;
217
218   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
219     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
220         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
221       //VAR->GAP - inc gap_sn_no
222       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
223       parameterDelete(csChange);
224       //is Gap in prior or next position?
225       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
226       if (csChange1) {
227         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
228           SeqNumberAdd(csChange1->gapSN,
229                        csChange1->gapSN,
230                        csChange->gapSN);
231           CSTWriterDestroyCSChange(d,cstWriter,csChange);
232           csChange=csChange1;
233         }
234       }
235       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
236       if (csChange1) {
237         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
238           SeqNumberAdd(csChange->gapSN,
239                        csChange->gapSN,
240                        csChange1->gapSN);
241           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
242         }
243       }
244       break;
245     }
246   }
247 }
248
249 /*****************************************************************************/
250 void
251 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
252   CSChangeForReader   *csChangeForReader;
253   CSTRemoteReader     *cstRemoteReader;
254   CSChange            *csChangeFSN;
255   
256   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
257                cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
258   cstWriter->csChangesCounter++;
259   //look for old cschange
260   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
261     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
262   //insert cschange into database changes
263   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
264   csChange->sn=cstWriter->lastSN;
265   SEQUENCE_NUMBER_NONE(csChange->gapSN);
266   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
267   csChange->remoteReaderBest=0;
268   csChange->remoteReaderStrict=0;
269   CSTWriterCSChange_insert(cstWriter,csChange);
270   //update FirstSN
271   csChangeFSN=CSTWriterCSChange_first(cstWriter);
272   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
273     //minimal are 2 SNs (GAP,VAR) ...
274 //    CSTWriterDestroyCSChange(cstWriter,csChange);
275   }
276   csChangeFSN=CSTWriterCSChange_first(cstWriter);
277   cstWriter->firstSN=csChangeFSN->sn;
278   //insert new cschange for each reader
279   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
280     //csChangeForReader
281     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
282     csChangeForReader->commStateChFReader=TOSEND;
283     csChangeForReader->csChange=csChange;
284     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
285     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
286     cstRemoteReader->csChangesCounter++;
287     cstRemoteReader->HBRetriesCounter=0;
288     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
289       cstRemoteReader->commStateSend=MUSTSENDDATA;
290       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
291         eventDetach(d,
292             cstRemoteReader->objectEntryOID->objectEntryAID,
293             &cstRemoteReader->delayResponceTimer,
294             1);
295         eventAdd(d,
296             cstRemoteReader->objectEntryOID->objectEntryAID,
297             &cstRemoteReader->delayResponceTimer,
298             1,   
299             "CSTWriterSendTimer",
300             CSTWriterSendTimer,
301             &cstRemoteReader->cstWriter->lock,
302             cstRemoteReader,
303             NULL);
304       } else {
305         ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
306         
307         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
308           //Strict reliable subscription
309           csChange->remoteReaderStrict++;
310           eventDetach(d,
311               cstRemoteReader->objectEntryOID->objectEntryAID,
312               &cstRemoteReader->delayResponceTimer,
313               2);
314           eventAdd(d,
315               cstRemoteReader->objectEntryOID->objectEntryAID,
316               &cstRemoteReader->delayResponceTimer,
317               2,   
318               "CSTWriterSendStrictTimer",
319               CSTWriterSendStrictTimer,
320               &cstRemoteReader->cstWriter->lock,
321               cstRemoteReader,
322               NULL);
323         } else {
324           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
325             //best efforts subscription
326             NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
327             
328             csChange->remoteReaderBest++;
329             NtpTimeAdd(nextIssueTime,
330                       cstRemoteReader->lastSentIssueTime,
331                       sp->minimumSeparation);
332             NtpTimeSub(nextIssueDelay,
333                       nextIssueTime,
334                       actTime);
335             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
336               NTPTIME_ZERO(nextIssueDelay);
337             eventDetach(d,
338                 cstRemoteReader->objectEntryOID->objectEntryAID,
339                 &cstRemoteReader->delayResponceTimer,
340                 2);
341             if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
342               //direct sent issue, for case zero time
343               CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
344             } else {
345               //schedule sent issue (future)
346               eventAdd(d,
347                   cstRemoteReader->objectEntryOID->objectEntryAID,
348                   &cstRemoteReader->delayResponceTimer,
349                   2,   
350                   "CSTWriterSendBestEffortTimer",
351                   CSTWriterSendBestEffortTimer,
352                   &cstRemoteReader->cstWriter->lock,
353                   cstRemoteReader,
354                   &nextIssueDelay);
355             }
356           } else {
357             //!Best_Effort & !Strict_Reliable
358             CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
359               ORTE_TRUE);
360             debug(51,5) ("CSTWriterAddCSChange: destryed\n");
361               
362           }
363         }
364       }
365     }
366     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
367   }
368   debug(51,5) ("CSTWriterAddCSChange: finished\n");
369 }
370
371 /*****************************************************************************/
372 void 
373 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
374     CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
375   CSChange *csChange;
376   if (!csChangeForReader) return;
377   csChange=csChangeForReader->csChange;
378   csChange->remoteReaderCount--;  
379   cstRemoteReader->csChangesCounter--;
380   if (!cstRemoteReader->csChangesCounter) {
381     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
382   }
383   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
384     ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
385     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
386         csChange->remoteReaderStrict--;
387     } else {
388       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
389         csChange->remoteReaderBest--;
390       }
391     }
392   }  
393   eventDetach(cstRemoteReader->cstWriter->domain,
394       cstRemoteReader->objectEntryOID->objectEntryAID,
395       &csChangeForReader->waitWhileDataUnderwayTimer,
396       0);
397   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
398   FREE(csChangeForReader);
399   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
400     if (!csChange->remoteReaderCount) {
401       if (destroyCSChange) {
402         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
403             cstRemoteReader->cstWriter,csChange);
404       }
405       sem_post(&cstRemoteReader->cstWriter->semCSChangeDestroyed);
406       debug(51,5) ("Publication: new queue level (%d)\n",
407                   cstRemoteReader->cstWriter->csChangesCounter);
408     }
409   }
410 }
411
412 /*****************************************************************************/
413 void 
414 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
415   CSTRemoteReader     *cstRemoteReader;
416   CSChangeForReader   *csChangeForReader;
417   CSChange            *csChangeFSN;
418
419   if (!csChange) return;
420   cstWriter->csChangesCounter--;
421   CSTWriterCSChange_delete(cstWriter,csChange);
422   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
423     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
424     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
425         csChangeForReader,ORTE_FALSE);
426   }
427   if (csChange->cdrStream.buffer)
428     FREE(csChange->cdrStream.buffer);
429   parameterDelete(csChange);
430   FREE(csChange);
431   //update first SN
432   csChangeFSN=CSTWriterCSChange_first(cstWriter);
433   if (csChangeFSN)
434     cstWriter->firstSN=csChangeFSN->sn;
435   else
436     cstWriter->firstSN=cstWriter->lastSN;
437 }
438
439 /*****************************************************************************/
440 Boolean
441 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
442   CSChange *csChange;
443   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
444     if (!csChange->remoteReaderStrict) {
445       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
446       return ORTE_TRUE;
447     }
448   }
449   return ORTE_FALSE;
450 }
451
452 /*****************************************************************************/
453 void
454 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
455   CSChangeForReader   *csChangeForReader;
456   int32_t             timerQueue=1;
457   
458   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
459     timerQueue=2; //userdata timer queue
460   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
461     //refresh only VAR's
462     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
463       csChangeForReader->commStateChFReader=TOSEND;
464       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
465         cstRemoteReader->commStateSend=MUSTSENDDATA;
466         eventDetach(d,
467             cstRemoteReader->objectEntryOID->objectEntryAID,
468             &cstRemoteReader->delayResponceTimer,
469             timerQueue);
470         eventAdd(d,
471             cstRemoteReader->objectEntryOID->objectEntryAID,
472             &cstRemoteReader->delayResponceTimer,
473             timerQueue,  
474             "CSTWriterSendTimer",
475             CSTWriterSendTimer,
476             &cstRemoteReader->cstWriter->lock,
477             cstRemoteReader,
478             &cstRemoteReader->cstWriter->params.delayResponceTime);               
479       }
480     }
481   }
482 }