]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
Update of ORTE. Configured to compile for Linux out of box.
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
25                        CSTPublications, CSTWriter, GUID_RTPS,
26                        cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
28                        CSTWriter, CSTRemoteReader, GUID_RTPS,
29                        cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
32                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(51,10) ("CSTWriterInit: start\n");
40   //init values of cstwriter
41   cstWriter->guid.hid=object->objectEntryHID->hid;
42   cstWriter->guid.aid=object->objectEntryAID->aid;
43   cstWriter->guid.oid=oid;
44   cstWriter->objectEntryOID=object;
45   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46   cstWriter->strictReliableCounter=0;
47   cstWriter->bestEffortsCounter=0;
48   cstWriter->csChangesCounter=0;
49   cstWriter->cstRemoteReaderCounter=0;
50   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52   CSTWriterCSChange_init_head(cstWriter);
53   CSTRemoteReader_init_root_field(cstWriter);
54   pthread_rwlock_init(&cstWriter->lock,NULL);
55   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
56   cstWriter->domain=d;
57   cstWriter->typeRegister=typeRegister;
58   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
60   }
61   //add event for refresh
62   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
63     CSTWriterRefreshTimer(d,(void*)cstWriter);
64   }
65   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
66                 cstWriter->guid.hid,
67                 cstWriter->guid.aid,
68                 cstWriter->guid.oid);
69   debug(51,10) ("CSTWriterInit: finished\n");
70 }
71
72 /*****************************************************************************/
73 void 
74 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
75   CSTRemoteReader       *cstRemoteReader;
76   CSChange              *csChange;
77
78   debug(51,10) ("CSTWriterDelete: start\n");
79   
80   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
81                 cstWriter->guid.hid,
82                 cstWriter->guid.aid,
83                 cstWriter->guid.oid);
84   //Destroy all cstRemoteReader connected on cstWriter
85   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
86     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
87   }
88   //Destroy all csChnages connected on cstWriter
89   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
90     parameterDelete(csChange);
91     FREE(csChange);
92   }
93   eventDetach(d,
94       cstWriter->objectEntryOID->objectEntryAID,
95       &cstWriter->refreshPeriodTimer,
96       0);
97   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
98     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
99   }
100   pthread_rwlock_destroy(&cstWriter->lock);
101   debug(51,10) ("CSTWriterDelete: finished\n");
102 }
103
104 /*****************************************************************************/
105 void
106 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
107                          ObjectId oid) {
108   CSTRemoteReader     *cstRemoteReader;
109   CSChangeForReader   *csChangeForReader;
110   CSChange            *csChange=NULL;
111   
112   cstWriter->cstRemoteReaderCounter++;
113   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
114   cstRemoteReader->guid.hid=object->objectEntryHID->hid;
115   cstRemoteReader->guid.aid=object->objectEntryAID->aid;
116   cstRemoteReader->guid.oid=oid;
117   cstRemoteReader->objectEntryOID=object;
118   cstRemoteReader->cstWriter=cstWriter;
119   CSChangeForReader_init_root_field(cstRemoteReader);
120   cstRemoteReader->commStateHB=MAYSENDHB;
121   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
122   cstRemoteReader->HBRetriesCounter=0;
123   cstRemoteReader->csChangesCounter=0;
124   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
125   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
126   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
127   //insert remote reader 
128   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
129   //copy all csChanges (not for publication)
130   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
131     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
132       csChange->remoteReaderCount++;  
133       cstRemoteReader->csChangesCounter++;
134       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
135       csChangeForReader->commStateChFReader=TOSEND;
136       csChangeForReader->csChange=csChange;
137       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
138       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
139       cstRemoteReader->commStateSend=MUSTSENDDATA;
140     }
141     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
142       eventAdd(d,
143           cstRemoteReader->objectEntryOID->objectEntryAID,
144           &cstRemoteReader->delayResponceTimer,
145           1,   
146           "CSTWriterSendTimer",
147           CSTWriterSendTimer,
148           &cstRemoteReader->cstWriter->lock,
149           cstRemoteReader,
150           &cstRemoteReader->cstWriter->params.delayResponceTime);               
151     }
152   } else {
153     //Publication
154     ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
155     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
156       cstWriter->strictReliableCounter++;
157     else {
158       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
159         cstWriter->bestEffortsCounter++;
160     }
161   }
162   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
163                 cstRemoteReader->guid.hid,
164                 cstRemoteReader->guid.aid,
165                 cstRemoteReader->guid.oid);
166 }
167
168 /*****************************************************************************/
169 void 
170 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
171   CSChangeForReader   *csChangeForReader;
172   
173   if (!cstRemoteReader) return;
174   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
175   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
176                 cstRemoteReader->guid.hid,
177                 cstRemoteReader->guid.aid,
178                 cstRemoteReader->guid.oid);
179   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
180     ORTESubsProp *sp;
181     sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
182     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
183       cstRemoteReader->cstWriter->strictReliableCounter--;
184     else {
185       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
186         cstRemoteReader->cstWriter->bestEffortsCounter--;
187     }
188   }  
189   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
190     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
191         csChangeForReader,ORTE_TRUE);
192   }
193   eventDetach(d,
194       cstRemoteReader->objectEntryOID->objectEntryAID,
195       &cstRemoteReader->delayResponceTimer,
196       1);   //metatraffic timer
197   eventDetach(d,
198       cstRemoteReader->objectEntryOID->objectEntryAID,
199       &cstRemoteReader->delayResponceTimer,
200       2);   //userdata timer
201   eventDetach(d,
202       cstRemoteReader->objectEntryOID->objectEntryAID,
203       &cstRemoteReader->repeatAnnounceTimer,
204       1);   //metatraffic timer
205   eventDetach(d,
206       cstRemoteReader->objectEntryOID->objectEntryAID,
207       &cstRemoteReader->repeatAnnounceTimer,
208       2);   //userdata timer
209   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
210   FREE(cstRemoteReader);
211 }
212
213 /*****************************************************************************/
214 void
215 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
216   CSChange            *csChange,*csChange1;
217
218   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
219     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
220         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
221       //VAR->GAP - inc gap_sn_no
222       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
223       parameterDelete(csChange);
224       //is Gap in prior or next position?
225       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
226       if (csChange1) {
227         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
228           SeqNumberAdd(csChange1->gapSN,
229                        csChange1->gapSN,
230                        csChange->gapSN);
231           CSTWriterDestroyCSChange(d,cstWriter,csChange);
232           csChange=csChange1;
233         }
234       }
235       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
236       if (csChange1) {
237         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
238           SeqNumberAdd(csChange->gapSN,
239                        csChange->gapSN,
240                        csChange1->gapSN);
241           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
242         }
243       }
244       break;
245     }
246   }
247 }
248
249 /*****************************************************************************/
250 void
251 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
252   CSChangeForReader   *csChangeForReader;
253   CSTRemoteReader     *cstRemoteReader;
254   CSChange            *csChangeFSN;
255   
256   cstWriter->csChangesCounter++;
257   //look for old cschange
258   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
259     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
260   //insert cschange into database changes
261   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
262   csChange->sn=cstWriter->lastSN;
263   SEQUENCE_NUMBER_NONE(csChange->gapSN);
264   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
265   csChange->remoteReaderProcBest=0;
266   csChange->remoteReaderProcStrict=0;
267   CSTWriterCSChange_insert(cstWriter,csChange);
268   //update FirstSN
269   csChangeFSN=CSTWriterCSChange_first(cstWriter);
270   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
271     //minimal are 2 SNs (GAP,VAR) ...
272 //    CSTWriterDestroyCSChange(cstWriter,csChange);
273   }
274   csChangeFSN=CSTWriterCSChange_first(cstWriter);
275   cstWriter->firstSN=csChangeFSN->sn;
276   //insert new cschange for each reader
277   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
278     //csChangeForReader
279     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
280     csChangeForReader->commStateChFReader=TOSEND;
281     csChangeForReader->csChange=csChange;
282     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
283     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
284     cstRemoteReader->csChangesCounter++;
285     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
286       cstRemoteReader->commStateSend=MUSTSENDDATA;
287       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
288         eventDetach(d,
289             cstRemoteReader->objectEntryOID->objectEntryAID,
290             &cstRemoteReader->delayResponceTimer,
291             1);
292         eventAdd(d,
293             cstRemoteReader->objectEntryOID->objectEntryAID,
294             &cstRemoteReader->delayResponceTimer,
295             1,   
296             "CSTWriterSendTimer",
297             CSTWriterSendTimer,
298             &cstRemoteReader->cstWriter->lock,
299             cstRemoteReader,
300             NULL);
301       } else {
302         ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
303         
304         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
305           //Strict reliable subscription
306           eventDetach(d,
307               cstRemoteReader->objectEntryOID->objectEntryAID,
308               &cstRemoteReader->delayResponceTimer,
309               2);
310           eventAdd(d,
311               cstRemoteReader->objectEntryOID->objectEntryAID,
312               &cstRemoteReader->delayResponceTimer,
313               2,   
314               "CSTWriterSendStrictTimer",
315               CSTWriterSendStrictTimer,
316               &cstRemoteReader->cstWriter->lock,
317               cstRemoteReader,
318               NULL);
319         } else {
320           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
321             //best efforts subscription
322             NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
323
324             NtpTimeAdd(nextIssueTime,
325                       cstRemoteReader->lastSentIssueTime,
326                       sp->minimumSeparation);
327             NtpTimeSub(nextIssueDelay,
328                       nextIssueTime,
329                       actTime);
330             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
331               NTPTIME_ZERO(nextIssueDelay);
332             eventDetach(d,
333                 cstRemoteReader->objectEntryOID->objectEntryAID,
334                 &cstRemoteReader->delayResponceTimer,
335                 2);
336             if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
337               //direct sent issue, for case zero time
338               CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
339             } else {
340               //schedule sent issue (future)
341               eventAdd(d,
342                   cstRemoteReader->objectEntryOID->objectEntryAID,
343                   &cstRemoteReader->delayResponceTimer,
344                   2,   
345                   "CSTWriterSendBestEffortTimer",
346                   CSTWriterSendBestEffortTimer,
347                   &cstRemoteReader->cstWriter->lock,
348                   cstRemoteReader,
349                   &nextIssueDelay);
350             }
351           } else {
352             //!Best_Effort & !Strict_Reliable
353             CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
354               ORTE_TRUE);
355           }
356         }
357       }
358     }
359     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
360   }
361 }
362
363 /*****************************************************************************/
364 void 
365 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
366     CSChangeForReader   *csChangeForReader,Boolean destroyCSChange) {
367   CSChange *csChange;
368   if (!csChangeForReader) return;
369   csChange=csChangeForReader->csChange;
370   csChange->remoteReaderCount--;  
371   cstRemoteReader->csChangesCounter--;
372   eventDetach(cstRemoteReader->cstWriter->domain,
373       cstRemoteReader->objectEntryOID->objectEntryAID,
374       &csChangeForReader->waitWhileDataUnderwayTimer,
375       0);
376   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
377   FREE(csChangeForReader);
378   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
379     if (csChange->remoteReaderCount<=
380         (csChange->remoteReaderProcBest+csChange->remoteReaderProcStrict)) {
381       if (destroyCSChange) {
382         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
383             cstRemoteReader->cstWriter,csChange);
384       }
385       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
386       debug(51,5) ("Publication: new queue level (%d)\n",
387                   cstRemoteReader->cstWriter->csChangesCounter);
388     }
389   }
390 }
391
392 /*****************************************************************************/
393 void 
394 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
395   CSTRemoteReader     *cstRemoteReader;
396   CSChangeForReader   *csChangeForReader;
397
398   if (!csChange) return;
399   cstWriter->csChangesCounter--;
400   CSTWriterCSChange_delete(cstWriter,csChange);
401   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
402     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
403     CSTWriterDestroyCSChangeForReader(cstRemoteReader,
404         csChangeForReader,ORTE_FALSE);
405   }
406   if (csChange->cdrStream.buffer)
407     FREE(csChange->cdrStream.buffer);
408   parameterDelete(csChange);
409   FREE(csChange);
410 }
411
412 /*****************************************************************************/
413 Boolean
414 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
415   CSChange *csChange;
416   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
417     if (!csChange->remoteReaderProcStrict) {
418       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
419       return ORTE_TRUE;
420     }
421   }
422   return ORTE_FALSE;
423 }
424
425 /*****************************************************************************/
426 void
427 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
428   CSChangeForReader   *csChangeForReader;
429   int32_t             timerQueue=1;
430   
431   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
432     timerQueue=2; //userdata timer queue
433   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
434     csChangeForReader->commStateChFReader=TOSEND;
435     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
436       cstRemoteReader->commStateSend=MUSTSENDDATA;
437       eventDetach(d,
438           cstRemoteReader->objectEntryOID->objectEntryAID,
439           &cstRemoteReader->delayResponceTimer,
440           timerQueue);
441       eventAdd(d,
442           cstRemoteReader->objectEntryOID->objectEntryAID,
443           &cstRemoteReader->delayResponceTimer,
444           timerQueue,  
445           "CSTWriterSendTimer",
446           CSTWriterSendTimer,
447           &cstRemoteReader->cstWriter->lock,
448           cstRemoteReader,
449           &cstRemoteReader->cstWriter->params.delayResponceTime);               
450     }
451   }
452 }