]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
multicast problem fixed (jointing to a multicast group)
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
25                        CSTPublications, CSTWriter, GUID_RTPS,
26                        cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
28                        CSTWriter, CSTRemoteReader, GUID_RTPS,
29                        cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
32                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(51,10) ("CSTWriterInit: start\n");
40   //init values of cstwriter
41   cstWriter->guid.hid=object->objectEntryHID->hid;
42   cstWriter->guid.aid=object->objectEntryAID->aid;
43   cstWriter->guid.oid=oid;
44   cstWriter->objectEntryOID=object;
45   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46   cstWriter->registrationCounter=0;
47   ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
48   cstWriter->strictReliableCounter=0;
49   cstWriter->bestEffortsCounter=0;
50   cstWriter->csChangesCounter=0;
51   cstWriter->cstRemoteReaderCounter=0;
52   cstWriter->registrationCounter=cstWriter->params.registrationRetries;
53   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
54   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
55   CSTWriterCSChange_init_head(cstWriter);
56   CSTRemoteReader_init_root_field(cstWriter);
57   pthread_rwlock_init(&cstWriter->lock,NULL);
58   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
59   cstWriter->domain=d;
60   cstWriter->typeRegister=typeRegister;
61   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
62     pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
63     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
64     cstWriter->condValueCSChangeDestroyed=0;
65   }
66   //add event for refresh
67   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
68     CSTWriterRefreshTimer(d,(void*)cstWriter);
69   }
70   //add event for registration 
71   if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
72     CSTWriterRegistrationTimer(d,(void*)cstWriter);
73   }
74   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
75                 GUID_PRINTF(cstWriter->guid));
76   debug(51,10) ("CSTWriterInit: finished\n");
77 }
78
79 /*****************************************************************************/
80 void 
81 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
82   CSTRemoteReader       *cstRemoteReader;
83   CSChange              *csChange;
84
85   debug(51,10) ("CSTWriterDelete: start\n");
86   
87   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
88                 GUID_PRINTF(cstWriter->guid));
89   //Destroy all cstRemoteReader connected on cstWriter
90   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
91     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
92   }
93   //Destroy all csChnages connected on cstWriter
94   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
95     parameterDelete(csChange);
96     FREE(csChange);
97   }
98   eventDetach(d,
99       cstWriter->objectEntryOID->objectEntryAID,
100       &cstWriter->refreshPeriodTimer,
101       0);
102   eventDetach(d,
103       cstWriter->objectEntryOID->objectEntryAID,
104       &cstWriter->registrationTimer,
105       0);
106   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
107     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
108     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
109   }
110   pthread_rwlock_destroy(&cstWriter->lock);
111   debug(51,10) ("CSTWriterDelete: finished\n");
112 }
113
114 /*****************************************************************************/
115 CSTRemoteReader *
116 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
117                          ObjectId oid,ObjectEntryOID *sobject) {
118   CSTRemoteReader     *cstRemoteReader;
119   CSChangeForReader   *csChangeForReader;
120   CSChange            *csChange=NULL;
121   
122   cstWriter->cstRemoteReaderCounter++;
123   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
124   cstRemoteReader->guid.hid=pobject->guid.hid;
125   cstRemoteReader->guid.aid=pobject->guid.aid;
126   cstRemoteReader->guid.oid=oid;
127   cstRemoteReader->sobject=sobject;
128   cstRemoteReader->pobject=pobject;
129   cstRemoteReader->cstWriter=cstWriter;
130   CSChangeForReader_init_root_field(cstRemoteReader);
131   cstRemoteReader->commStateHB=MAYSENDHB;
132   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
133   cstRemoteReader->HBRetriesCounter=0;
134   cstRemoteReader->csChangesCounter=0;
135   cstRemoteReader->commStateToSentCounter=0;
136   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
137   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
138   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
139   //insert remote reader 
140   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
141   //multicast case
142   if (cstRemoteReader->sobject->multicastPort) {
143     debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
144                   GUID_PRINTF(cstRemoteReader->guid),
145                   GUID_PRINTF(cstRemoteReader->sobject->guid));
146    ObjectEntryMulticast_insert(cstRemoteReader->sobject,
147        cstRemoteReader);
148   }
149   //copy all csChanges (not for publication)
150   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
151     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
152       csChange->remoteReaderCount++;  
153       cstRemoteReader->csChangesCounter++;
154       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
155       csChangeForReader->commStateChFReader=TOSEND;
156       cstRemoteReader->commStateToSentCounter++;
157       csChangeForReader->csChange=csChange;
158       csChangeForReader->cstRemoteReader=cstRemoteReader;
159       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
160       CSChangeParticipant_insert(csChange,csChangeForReader);
161       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
162       cstRemoteReader->commStateSend=MUSTSENDDATA;
163     }
164     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
165       eventAdd(d,
166           cstRemoteReader->sobject->objectEntryAID,
167           &cstRemoteReader->delayResponceTimer,
168           1,   
169           "CSTWriterSendTimer",
170           CSTWriterSendTimer,
171           &cstRemoteReader->cstWriter->lock,
172           cstRemoteReader,
173           &cstRemoteReader->cstWriter->params.delayResponceTime);               
174     }
175   } else {
176     //Publication
177     ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
178     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
179       cstWriter->strictReliableCounter++;
180     else {
181       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
182         cstWriter->bestEffortsCounter++;
183     }
184   }
185   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
186                 GUID_PRINTF(cstRemoteReader->guid));
187   return cstRemoteReader;
188 }
189
190 /*****************************************************************************/
191 void 
192 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
193   CSChangeForReader   *csChangeForReader;
194   
195   if (!cstRemoteReader) return;
196   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
197   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
198                 GUID_PRINTF(cstRemoteReader->guid));
199   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
200     ORTESubsProp *sp;
201     sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
202     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
203       cstRemoteReader->cstWriter->strictReliableCounter--;
204     else {
205       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
206         cstRemoteReader->cstWriter->bestEffortsCounter--;
207     }
208   }  
209   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
210     CSTWriterDestroyCSChangeForReader(
211         csChangeForReader,ORTE_TRUE);
212   }
213   eventDetach(d,
214       cstRemoteReader->sobject->objectEntryAID,
215       &cstRemoteReader->delayResponceTimer,
216       1);   //metatraffic timer
217   eventDetach(d,
218       cstRemoteReader->sobject->objectEntryAID,
219       &cstRemoteReader->delayResponceTimer,
220       2);   //userdata timer
221   eventDetach(d,
222       cstRemoteReader->sobject->objectEntryAID,
223       &cstRemoteReader->repeatAnnounceTimer,
224       1);   //metatraffic timer
225   eventDetach(d,
226       cstRemoteReader->sobject->objectEntryAID,
227       &cstRemoteReader->repeatAnnounceTimer,
228       2);   //userdata timer
229   //multicast case
230   if (cstRemoteReader->sobject->multicastPort) {
231     ObjectEntryOID *object;
232
233     object=cstRemoteReader->sobject;
234
235     ObjectEntryMulticast_delete(object,cstRemoteReader);
236     debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
237                   GUID_PRINTF(cstRemoteReader->guid),
238                   GUID_PRINTF(object->guid));
239
240     if (ObjectEntryMulticast_is_empty(object)) {
241       objectEntryDelete(d,object,ORTE_TRUE);
242     }
243   }
244   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
245   FREE(cstRemoteReader);
246 }
247
248 /*****************************************************************************/
249 void
250 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
251   CSChange            *csChange,*csChange1;
252
253   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
254     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
255         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
256       //VAR->GAP - inc gap_sn_no
257       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
258       parameterDelete(csChange);
259       //is Gap in prior or next position?
260       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
261       if (csChange1) {
262         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
263           SeqNumberAdd(csChange1->gapSN,
264                        csChange1->gapSN,
265                        csChange->gapSN);
266           CSTWriterDestroyCSChange(d,cstWriter,csChange);
267           csChange=csChange1;
268         }
269       }
270       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
271       if (csChange1) {
272         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
273           SeqNumberAdd(csChange->gapSN,
274                        csChange->gapSN,
275                        csChange1->gapSN);
276           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
277         }
278       }
279       break;
280     }
281   }
282 }
283
284 /*****************************************************************************/
285 void
286 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
287   CSChangeForReader   *csChangeForReader;
288   CSTRemoteReader     *cstRemoteReader;
289   CSChange            *csChangeFSN;
290   
291   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
292                 GUID_PRINTF(cstWriter->guid));
293   cstWriter->csChangesCounter++;
294   //look for old cschange
295   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
296     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
297   //insert cschange into database changes
298   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
299   csChange->sn=cstWriter->lastSN;
300   SEQUENCE_NUMBER_NONE(csChange->gapSN);
301   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
302   csChange->remoteReaderBest=0;
303   csChange->remoteReaderStrict=0;
304   CSChangeParticipant_init_head(csChange);
305   CSTWriterCSChange_insert(cstWriter,csChange);
306   debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
307                csChange->sn.low);
308   //update FirstSN
309   csChangeFSN=CSTWriterCSChange_first(cstWriter);
310   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
311     //minimal are 2 SNs (GAP,VAR) ...
312 //    CSTWriterDestroyCSChange(cstWriter,csChange);
313   }
314   csChangeFSN=CSTWriterCSChange_first(cstWriter);
315   cstWriter->firstSN=csChangeFSN->sn;
316   //insert new cschange for each reader
317   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
318     //csChangeForReader
319     debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
320                    GUID_PRINTF(cstRemoteReader->guid));
321     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
322     csChangeForReader->commStateChFReader=TOSEND;
323     cstRemoteReader->commStateToSentCounter++;
324     csChangeForReader->csChange=csChange;
325     csChangeForReader->cstRemoteReader=cstRemoteReader;
326     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
327     CSChangeParticipant_insert(csChange,csChangeForReader);
328     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
329     cstRemoteReader->csChangesCounter++;
330     cstRemoteReader->HBRetriesCounter=0;
331     if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
332       cstRemoteReader->commStateSend=MUSTSENDDATA;
333       if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
334         eventDetach(d,
335             cstRemoteReader->sobject->objectEntryAID,
336             &cstRemoteReader->delayResponceTimer,
337             1);
338         eventAdd(d,
339             cstRemoteReader->sobject->objectEntryAID,
340             &cstRemoteReader->delayResponceTimer,
341             1,   
342             "CSTWriterSendTimer",
343             CSTWriterSendTimer,
344             &cstRemoteReader->cstWriter->lock,
345             cstRemoteReader,
346             NULL);
347       } else {
348         ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
349         
350         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
351           //Strict reliable subscription
352           csChange->remoteReaderStrict++;
353           eventDetach(d,
354               cstRemoteReader->sobject->objectEntryAID,
355               &cstRemoteReader->delayResponceTimer,
356               2);
357           eventAdd(d,
358               cstRemoteReader->sobject->objectEntryAID,
359               &cstRemoteReader->delayResponceTimer,
360               2,   
361               "CSTWriterSendStrictTimer",
362               CSTWriterSendStrictTimer,
363               &cstRemoteReader->cstWriter->lock,
364               cstRemoteReader,
365               NULL);
366         } else {
367           if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
368             //best efforts subscription
369             NtpTime nextIssueTime,nextIssueDelay,actTime;
370
371             actTime=getActualNtpTime();
372             csChange->remoteReaderBest++;
373             NtpTimeAdd(nextIssueTime,
374                       cstRemoteReader->lastSentIssueTime,
375                       sp->minimumSeparation);
376             NtpTimeSub(nextIssueDelay,
377                       nextIssueTime,
378                       actTime);
379             if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
380               NTPTIME_ZERO(nextIssueDelay);
381             eventDetach(d,
382                 cstRemoteReader->sobject->objectEntryAID,
383                 &cstRemoteReader->delayResponceTimer,
384                 2);
385             //schedule sent issue 
386             eventAdd(d,
387                 cstRemoteReader->sobject->objectEntryAID,
388                 &cstRemoteReader->delayResponceTimer,
389                 2,   
390                 "CSTWriterSendBestEffortTimer",
391                 CSTWriterSendBestEffortTimer,
392                 &cstRemoteReader->cstWriter->lock,
393                 cstRemoteReader,
394                 &nextIssueDelay);
395           } else {
396             //!Best_Effort & !Strict_Reliable
397             CSTWriterDestroyCSChangeForReader(csChangeForReader,
398               ORTE_TRUE);
399             debug(51,5) ("CSTWriterAddCSChange: destroyed\n");        
400           }
401         }
402       }
403     }
404     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
405   }
406   debug(51,5) ("CSTWriterAddCSChange: finished\n");
407 }
408
409 /*****************************************************************************/
410 void 
411 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
412     Boolean destroyCSChange) {
413   CSTRemoteReader *cstRemoteReader;
414   CSChange *csChange;
415   
416   if (!csChangeForReader) return;
417   cstRemoteReader=csChangeForReader->cstRemoteReader;
418   csChange=csChangeForReader->csChange;
419   csChange->remoteReaderCount--;  
420   cstRemoteReader->csChangesCounter--;
421   if (!cstRemoteReader->csChangesCounter) {
422     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
423   }
424   if (csChangeForReader->commStateChFReader==TOSEND) {
425     cstRemoteReader->commStateToSentCounter--;
426   }
427   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
428     ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
429     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
430         csChange->remoteReaderStrict--;
431     } else {
432       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
433         csChange->remoteReaderBest--;
434       }
435     }
436   }  
437   eventDetach(cstRemoteReader->cstWriter->domain,
438       cstRemoteReader->sobject->objectEntryAID,
439       &csChangeForReader->waitWhileDataUnderwayTimer,
440       0);
441   CSChangeParticipant_delete(csChange,csChangeForReader);
442   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
443   FREE(csChangeForReader);
444
445   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
446     if (!csChange->remoteReaderCount) {
447       if (destroyCSChange) {
448         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
449             cstRemoteReader->cstWriter,csChange);
450       }
451       pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
452       cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
453       pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
454       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
455       debug(51,5) ("Publication: new queue level (%d)\n",
456                   cstRemoteReader->cstWriter->csChangesCounter);
457     }
458   }
459 }
460
461 /*****************************************************************************/
462 void 
463 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
464   CSTRemoteReader     *cstRemoteReader;
465   CSChangeForReader   *csChangeForReader;
466   CSChange            *csChangeFSN;
467
468   if (!csChange) return;
469
470   cstWriter->csChangesCounter--;
471   CSTWriterCSChange_delete(cstWriter,csChange);
472   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
473     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
474     CSTWriterDestroyCSChangeForReader(
475         csChangeForReader,ORTE_FALSE);
476   }
477
478   if (csChange->cdrCodec.buffer)
479     FREE(csChange->cdrCodec.buffer);
480   parameterDelete(csChange);
481   FREE(csChange);
482
483   //update first SN
484   csChangeFSN=CSTWriterCSChange_first(cstWriter);
485   if (csChangeFSN)
486     cstWriter->firstSN=csChangeFSN->sn;
487   else
488     cstWriter->firstSN=cstWriter->lastSN;
489 }
490
491 /*****************************************************************************/
492 Boolean
493 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
494   CSChange *csChange;
495
496   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
497
498     if (!csChange->remoteReaderStrict) {
499       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
500       return ORTE_TRUE;
501     }
502   }
503   return ORTE_FALSE;
504 }
505
506 /*****************************************************************************/
507 void
508 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
509   CSChangeForReader   *csChangeForReader;
510   int32_t             timerQueue=1;
511   
512   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
513     timerQueue=2; //userdata timer queue
514
515   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
516
517     //refresh only VAR's
518     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
519       
520       if (csChangeForReader->commStateChFReader!=TOSEND) {
521         csChangeForReader->commStateChFReader=TOSEND;
522         cstRemoteReader->commStateToSentCounter++;
523       }
524
525       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
526         cstRemoteReader->commStateSend=MUSTSENDDATA;
527         eventDetach(d,
528             cstRemoteReader->sobject->objectEntryAID,
529             &cstRemoteReader->delayResponceTimer,
530             timerQueue);
531         eventAdd(d,
532             cstRemoteReader->sobject->objectEntryAID,
533             &cstRemoteReader->delayResponceTimer,
534             timerQueue,  
535             "CSTWriterSendTimer",
536             CSTWriterSendTimer,
537             &cstRemoteReader->cstWriter->lock,
538             cstRemoteReader,
539             &cstRemoteReader->cstWriter->params.delayResponceTime);               
540       }
541     }
542   }
543 }
544
545 /*****************************************************************************/
546 int
547 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader) 
548 {
549   CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
550
551   //setup new state for csChangeForReader
552   if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
553   cstRemoteReader->commStateToSentCounter--;
554
555   if (!cstRemoteReader->commStateToSentCounter)
556         cstRemoteReader->commStateSend=NOTHNIGTOSEND;
557
558   if (NtpTimeCmp(zNtpTime,
559         cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
560         csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
561   } else {
562     csChangeForReader->commStateChFReader=UNDERWAY;
563     eventDetach(cstRemoteReader->cstWriter->domain,
564                 cstRemoteReader->sobject->objectEntryAID,
565                 &csChangeForReader->waitWhileDataUnderwayTimer,
566                 0);
567     eventAdd(cstRemoteReader->cstWriter->domain,
568              cstRemoteReader->sobject->objectEntryAID,
569              &csChangeForReader->waitWhileDataUnderwayTimer,
570              0,   //common timer
571              "CSChangeForReaderUnderwayTimer",
572              CSChangeForReaderUnderwayTimer,
573              &cstRemoteReader->cstWriter->lock,
574              csChangeForReader,
575              &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
576   }
577   return 0;
578 }
579
580 /*****************************************************************************/
581 void
582 CSTWriterMulticast(CSChangeForReader *csChangeForReader) 
583 {
584     CSTRemoteReader     *cstRemoteReader;
585     ObjectEntryOID      *objectEntryOID;
586     CSChangeForReader   *csChangeForReader1;
587     char                queue=1;
588     
589     cstRemoteReader=csChangeForReader->cstRemoteReader;
590     objectEntryOID=cstRemoteReader->sobject;
591
592     //multicast can do an application with multicast interface
593     if (!objectEntryOID->multicastPort)
594         return;
595
596     ul_list_for_each(CSChangeParticipant,
597                      csChangeForReader->csChange,
598                      csChangeForReader1) {
599         ObjectEntryOID  *objectEntryOID1;
600         CSTRemoteReader *cstRemoteReader1;
601       
602         cstRemoteReader1=csChangeForReader1->cstRemoteReader;
603         objectEntryOID1=cstRemoteReader1->sobject;
604                    
605         /* are RRs from same GROUP */
606         if (objectEntryOID!=objectEntryOID1)
607           continue;
608
609         /* is the csChange in state TOSEND ? If yes, marks like proc. */
610         CSTWriterCSChangeForReaderNewState(csChangeForReader1);
611
612         /* if there are no messages, detach sending timer */
613         if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
614             !(cstRemoteReader->commStateHB==MAYSENDHB))
615             continue;
616
617         if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) 
618           queue=2;
619         eventDetach(cstRemoteReader->cstWriter->domain,
620                     cstRemoteReader->sobject->objectEntryAID,
621                     &cstRemoteReader->delayResponceTimer,
622                     queue);
623     }
624 }