]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
2450755116d3a30eab397bfe44cdc981c7ad0373
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(CSTWriter, 
35                        CSTPublications, CSTWriter, GUID_RTPS,
36                        cstWriter, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader, 
38                        CSTWriter, CSTRemoteReader, GUID_RTPS,
39                        cstRemoteReader, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
41                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
42                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
43
44 /*****************************************************************************/
45 void 
46 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
47               ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
48
49   debug(51,10) ("CSTWriterInit: start\n");
50   //init values of cstwriter
51   cstWriter->guid.hid=object->objectEntryHID->hid;
52   cstWriter->guid.aid=object->objectEntryAID->aid;
53   cstWriter->guid.oid=oid;
54   cstWriter->objectEntryOID=object;
55   memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
56   cstWriter->registrationCounter=0;
57   ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
58   cstWriter->strictReliableCounter=0;
59   cstWriter->bestEffortsCounter=0;
60   cstWriter->csChangesCounter=0;
61   cstWriter->cstRemoteReaderCounter=0;
62   cstWriter->registrationCounter=cstWriter->params.registrationRetries;
63   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
64   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
65   CSTWriterCSChange_init_head(cstWriter);
66   CSTRemoteReader_init_root_field(cstWriter);
67   pthread_rwlock_init(&cstWriter->lock,NULL);
68   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
69   cstWriter->domain=d;
70   cstWriter->typeRegister=typeRegister;
71   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
72     pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
73     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
74     cstWriter->condValueCSChangeDestroyed=0;
75   }
76   //add event for refresh
77   if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
78     CSTWriterRefreshTimer(d,(void*)cstWriter);
79   }
80   //add event for registration 
81   if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
82     CSTWriterRegistrationTimer(d,(void*)cstWriter);
83   }
84   debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
85                 GUID_PRINTF(cstWriter->guid));
86   debug(51,10) ("CSTWriterInit: finished\n");
87 }
88
89 /*****************************************************************************/
90 void 
91 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
92   CSTRemoteReader       *cstRemoteReader;
93   CSChange              *csChange;
94
95   debug(51,10) ("CSTWriterDelete: start\n");
96   
97   debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
98                 GUID_PRINTF(cstWriter->guid));
99   //Destroy all cstRemoteReader connected on cstWriter
100   while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
101     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
102   }
103   //Destroy all csChnages connected on cstWriter
104   while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
105     parameterDelete(csChange);
106     FREE(csChange);
107   }
108   eventDetach(d,
109       cstWriter->objectEntryOID->objectEntryAID,
110       &cstWriter->refreshPeriodTimer,
111       0);
112   eventDetach(d,
113       cstWriter->objectEntryOID->objectEntryAID,
114       &cstWriter->registrationTimer,
115       0);
116   if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
117     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
118     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
119   }
120   pthread_rwlock_destroy(&cstWriter->lock);
121   debug(51,10) ("CSTWriterDelete: finished\n");
122 }
123
124 /*****************************************************************************/
125 CSTRemoteReader *
126 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
127                          ObjectId oid,ObjectEntryOID *sobject) {
128   CSTRemoteReader     *cstRemoteReader;
129   CSChangeForReader   *csChangeForReader;
130   CSChange            *csChange=NULL;
131   
132   cstWriter->cstRemoteReaderCounter++;
133   cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
134   cstRemoteReader->guid.hid=pobject->guid.hid;
135   cstRemoteReader->guid.aid=pobject->guid.aid;
136   cstRemoteReader->guid.oid=oid;
137   cstRemoteReader->sobject=sobject;
138   cstRemoteReader->pobject=pobject;
139   cstRemoteReader->cstWriter=cstWriter;
140   CSChangeForReader_init_root_field(cstRemoteReader);
141   cstRemoteReader->commStateHB=MAYSENDHB;
142   cstRemoteReader->commStateSend=NOTHNIGTOSEND;
143   cstRemoteReader->HBRetriesCounter=0;
144   cstRemoteReader->csChangesCounter=0;
145   cstRemoteReader->commStateToSentCounter=0;
146   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
147   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
148   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
149   //insert remote reader 
150   CSTRemoteReader_insert(cstWriter,cstRemoteReader);
151   //multicast case
152   if (cstRemoteReader->sobject->multicastPort) {
153     debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
154                   GUID_PRINTF(cstRemoteReader->guid),
155                   GUID_PRINTF(cstRemoteReader->sobject->guid));
156    ObjectEntryMulticast_insert(cstRemoteReader->sobject,
157        cstRemoteReader);
158   }
159   //copy all csChanges (not for publication)
160   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
161     ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
162       csChange->remoteReaderCount++;  
163       cstRemoteReader->csChangesCounter++;
164       csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
165       csChangeForReader->commStateChFReader=TOSEND;
166       cstRemoteReader->commStateToSentCounter++;
167       csChangeForReader->csChange=csChange;
168       csChangeForReader->cstRemoteReader=cstRemoteReader;
169       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
170       CSChangeParticipant_insert(csChange,csChangeForReader);
171       CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
172       cstRemoteReader->commStateSend=MUSTSENDDATA;
173     }
174     if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
175       eventAdd(d,
176           cstRemoteReader->sobject->objectEntryAID,
177           &cstRemoteReader->delayResponceTimer,
178           1,   
179           "CSTWriterSendTimer",
180           CSTWriterSendTimer,
181           &cstRemoteReader->cstWriter->lock,
182           cstRemoteReader,
183           &cstRemoteReader->cstWriter->params.delayResponceTime);               
184     }
185   } else {
186     //Publication
187     ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
188     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
189       cstWriter->strictReliableCounter++;
190     else {
191       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
192         cstWriter->bestEffortsCounter++;
193     }
194   }
195   debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
196                 GUID_PRINTF(cstRemoteReader->guid));
197   return cstRemoteReader;
198 }
199
200 /*****************************************************************************/
201 void 
202 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
203   CSChangeForReader   *csChangeForReader;
204   
205   if (!cstRemoteReader) return;
206   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
207   debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
208                 GUID_PRINTF(cstRemoteReader->guid));
209   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
210     ORTESubsProp *sp;
211     sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
212     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
213       cstRemoteReader->cstWriter->strictReliableCounter--;
214     else {
215       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
216         cstRemoteReader->cstWriter->bestEffortsCounter--;
217     }
218   }  
219   while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
220     CSTWriterDestroyCSChangeForReader(
221         csChangeForReader,ORTE_TRUE);
222   }
223   eventDetach(d,
224       cstRemoteReader->sobject->objectEntryAID,
225       &cstRemoteReader->delayResponceTimer,
226       1);   //metatraffic timer
227   eventDetach(d,
228       cstRemoteReader->sobject->objectEntryAID,
229       &cstRemoteReader->delayResponceTimer,
230       2);   //userdata timer
231   eventDetach(d,
232       cstRemoteReader->sobject->objectEntryAID,
233       &cstRemoteReader->repeatAnnounceTimer,
234       1);   //metatraffic timer
235   eventDetach(d,
236       cstRemoteReader->sobject->objectEntryAID,
237       &cstRemoteReader->repeatAnnounceTimer,
238       2);   //userdata timer
239   //multicast case
240   if (cstRemoteReader->sobject->multicastPort) {
241     ObjectEntryOID *object;
242
243     object=cstRemoteReader->sobject;
244
245     ObjectEntryMulticast_delete(object,cstRemoteReader);
246     debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
247                   GUID_PRINTF(cstRemoteReader->guid),
248                   GUID_PRINTF(object->guid));
249
250     if (ObjectEntryMulticast_is_empty(object)) {
251       objectEntryDelete(d,object,ORTE_TRUE);
252     }
253   }
254   CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
255   FREE(cstRemoteReader);
256 }
257
258 /*****************************************************************************/
259 void
260 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
261   CSChange            *csChange,*csChange1;
262
263   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
264     if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
265         (!gavl_cmp_guid(&csChange->guid,guid))) {  //equal? (VAR)
266       //VAR->GAP - inc gap_sn_no
267       SeqNumberInc(csChange->gapSN,csChange->gapSN);  
268       parameterDelete(csChange);
269       //is Gap in prior or next position?
270       csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
271       if (csChange1) {
272         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
273           SeqNumberAdd(csChange1->gapSN,
274                        csChange1->gapSN,
275                        csChange->gapSN);
276           CSTWriterDestroyCSChange(d,cstWriter,csChange);
277           csChange=csChange1;
278         }
279       }
280       csChange1=CSTWriterCSChange_next(cstWriter,csChange);
281       if (csChange1) {
282         if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
283           SeqNumberAdd(csChange->gapSN,
284                        csChange->gapSN,
285                        csChange1->gapSN);
286           CSTWriterDestroyCSChange(d,cstWriter,csChange1);
287         }
288       }
289       break;
290     }
291   }
292 }
293
294 /*****************************************************************************/
295 void
296 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
297   CSChangeForReader   *csChangeForReader;
298   CSTRemoteReader     *cstRemoteReader;
299   CSChange            *csChangeFSN;
300   
301   debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
302                 GUID_PRINTF(cstWriter->guid));
303   cstWriter->csChangesCounter++;
304   //look for old cschange
305   if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
306     CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
307   //insert cschange into database changes
308   SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
309   csChange->sn=cstWriter->lastSN;
310   SEQUENCE_NUMBER_NONE(csChange->gapSN);
311   csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;  
312   csChange->remoteReaderBest=0;
313   csChange->remoteReaderStrict=0;
314   CSChangeParticipant_init_head(csChange);
315   CSTWriterCSChange_insert(cstWriter,csChange);
316   debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
317                csChange->sn.low);
318   //update FirstSN
319   csChangeFSN=CSTWriterCSChange_first(cstWriter);
320   if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
321     //minimal are 2 SNs (GAP,VAR) ...
322 //    CSTWriterDestroyCSChange(cstWriter,csChange);
323   }
324   csChangeFSN=CSTWriterCSChange_first(cstWriter);
325   cstWriter->firstSN=csChangeFSN->sn;
326   //insert new cschange for each reader
327   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
328     //csChangeForReader
329     debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
330                    GUID_PRINTF(cstRemoteReader->guid));
331     csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
332     csChangeForReader->commStateChFReader=TOSEND;
333     cstRemoteReader->commStateToSentCounter++;
334     csChangeForReader->csChange=csChange;
335     csChangeForReader->cstRemoteReader=cstRemoteReader;
336     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
337     CSChangeParticipant_insert(csChange,csChangeForReader);
338     CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
339     cstRemoteReader->csChangesCounter++;
340     cstRemoteReader->HBRetriesCounter=0;
341     cstRemoteReader->commStateSend=MUSTSENDDATA;
342     if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
343       eventDetach(d,
344           cstRemoteReader->sobject->objectEntryAID,
345           &cstRemoteReader->delayResponceTimer,
346           1);
347       eventAdd(d,
348           cstRemoteReader->sobject->objectEntryAID,
349           &cstRemoteReader->delayResponceTimer,
350           1,   
351           "CSTWriterSendTimer",
352           CSTWriterSendTimer,
353           &cstRemoteReader->cstWriter->lock,
354           cstRemoteReader,
355           NULL);
356     } else {
357       ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
358
359       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
360         //Strict reliable subscription
361         csChange->remoteReaderStrict++;
362         eventDetach(d,
363             cstRemoteReader->sobject->objectEntryAID,
364             &cstRemoteReader->delayResponceTimer,
365             2);
366         eventAdd(d,
367             cstRemoteReader->sobject->objectEntryAID,
368             &cstRemoteReader->delayResponceTimer,
369             2,   
370             "CSTWriterSendStrictTimer",
371             CSTWriterSendStrictTimer,
372             &cstRemoteReader->cstWriter->lock,
373             cstRemoteReader,
374             NULL);
375       } else {
376         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
377           //best efforts subscription
378           NtpTime nextIssueTime,nextIssueDelay,actTime;
379
380           actTime=getActualNtpTime();
381           csChange->remoteReaderBest++;
382           NtpTimeAdd(nextIssueTime,
383                     cstRemoteReader->lastSentIssueTime,
384                     sp->minimumSeparation);
385           NtpTimeSub(nextIssueDelay,
386                     nextIssueTime,
387                     actTime);
388           if (NtpTimeCmp(actTime,nextIssueTime)>=0) 
389             NTPTIME_ZERO(nextIssueDelay);
390           eventDetach(d,
391               cstRemoteReader->sobject->objectEntryAID,
392               &cstRemoteReader->delayResponceTimer,
393               2);
394           //schedule sent issue 
395           eventAdd(d,
396               cstRemoteReader->sobject->objectEntryAID,
397               &cstRemoteReader->delayResponceTimer,
398               2,   
399               "CSTWriterSendBestEffortTimer",
400               CSTWriterSendBestEffortTimer,
401               &cstRemoteReader->cstWriter->lock,
402               cstRemoteReader,
403               &nextIssueDelay);
404         } else {
405           //!Best_Effort & !Strict_Reliable
406           CSTWriterDestroyCSChangeForReader(csChangeForReader,
407             ORTE_TRUE);
408           debug(51,5) ("CSTWriterAddCSChange: destroyed\n");          
409         }
410       }
411     }
412     debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
413   }
414   debug(51,5) ("CSTWriterAddCSChange: finished\n");
415 }
416
417 /*****************************************************************************/
418 void 
419 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
420     Boolean destroyCSChange) {
421   CSTRemoteReader *cstRemoteReader;
422   CSChange *csChange;
423   
424   if (!csChangeForReader) return;
425   cstRemoteReader=csChangeForReader->cstRemoteReader;
426   csChange=csChangeForReader->csChange;
427   csChange->remoteReaderCount--;  
428   cstRemoteReader->csChangesCounter--;
429   if (!cstRemoteReader->csChangesCounter) {
430     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
431   }
432   if (csChangeForReader->commStateChFReader==TOSEND) {
433     cstRemoteReader->commStateToSentCounter--;
434   }
435   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
436     ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
437     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
438         csChange->remoteReaderStrict--;
439     } else {
440       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
441         csChange->remoteReaderBest--;
442       }
443     }
444   }  
445   eventDetach(cstRemoteReader->cstWriter->domain,
446       cstRemoteReader->sobject->objectEntryAID,
447       &csChangeForReader->waitWhileDataUnderwayTimer,
448       0);
449   CSChangeParticipant_delete(csChange,csChangeForReader);
450   CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
451   FREE(csChangeForReader);
452
453   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
454     if (!csChange->remoteReaderCount) {
455       if (destroyCSChange) {
456         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
457             cstRemoteReader->cstWriter,csChange);
458       }
459       pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
460       cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
461       pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
462       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
463       debug(51,5) ("Publication: new queue level (%d)\n",
464                   cstRemoteReader->cstWriter->csChangesCounter);
465     }
466   }
467 }
468
469 /*****************************************************************************/
470 void 
471 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
472   CSTRemoteReader     *cstRemoteReader;
473   CSChangeForReader   *csChangeForReader;
474   CSChange            *csChangeFSN;
475
476   if (!csChange) return;
477
478   cstWriter->csChangesCounter--;
479   CSTWriterCSChange_delete(cstWriter,csChange);
480   gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
481     csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
482     CSTWriterDestroyCSChangeForReader(
483         csChangeForReader,ORTE_FALSE);
484   }
485
486   if (csChange->cdrCodec.buffer)
487     FREE(csChange->cdrCodec.buffer);
488   parameterDelete(csChange);
489   FREE(csChange);
490
491   //update first SN
492   csChangeFSN=CSTWriterCSChange_first(cstWriter);
493   if (csChangeFSN)
494     cstWriter->firstSN=csChangeFSN->sn;
495   else
496     cstWriter->firstSN=cstWriter->lastSN;
497 }
498
499 /*****************************************************************************/
500 Boolean
501 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
502   CSChange *csChange;
503
504   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
505
506     if (!csChange->remoteReaderStrict) {
507       CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
508       return ORTE_TRUE;
509     }
510   }
511   return ORTE_FALSE;
512 }
513
514 /*****************************************************************************/
515 void
516 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
517   CSChangeForReader   *csChangeForReader;
518   int32_t             timerQueue=1;
519   
520   if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
521     timerQueue=2; //userdata timer queue
522
523   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
524
525     //refresh only VAR's
526     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) { 
527       
528       if (csChangeForReader->commStateChFReader!=TOSEND) {
529         csChangeForReader->commStateChFReader=TOSEND;
530         cstRemoteReader->commStateToSentCounter++;
531       }
532
533       if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
534         cstRemoteReader->commStateSend=MUSTSENDDATA;
535         eventDetach(d,
536             cstRemoteReader->sobject->objectEntryAID,
537             &cstRemoteReader->delayResponceTimer,
538             timerQueue);
539         eventAdd(d,
540             cstRemoteReader->sobject->objectEntryAID,
541             &cstRemoteReader->delayResponceTimer,
542             timerQueue,  
543             "CSTWriterSendTimer",
544             CSTWriterSendTimer,
545             &cstRemoteReader->cstWriter->lock,
546             cstRemoteReader,
547             &cstRemoteReader->cstWriter->params.delayResponceTime);               
548       }
549     }
550   }
551 }
552
553 /*****************************************************************************/
554 int
555 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader) 
556 {
557   CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
558
559   //setup new state for csChangeForReader
560   if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
561   cstRemoteReader->commStateToSentCounter--;
562
563   if (!cstRemoteReader->commStateToSentCounter)
564         cstRemoteReader->commStateSend=NOTHNIGTOSEND;
565
566   if (NtpTimeCmp(zNtpTime,
567         cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
568         csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
569   } else {
570     csChangeForReader->commStateChFReader=UNDERWAY;
571     eventDetach(cstRemoteReader->cstWriter->domain,
572                 cstRemoteReader->sobject->objectEntryAID,
573                 &csChangeForReader->waitWhileDataUnderwayTimer,
574                 0);
575     eventAdd(cstRemoteReader->cstWriter->domain,
576              cstRemoteReader->sobject->objectEntryAID,
577              &csChangeForReader->waitWhileDataUnderwayTimer,
578              0,   //common timer
579              "CSChangeForReaderUnderwayTimer",
580              CSChangeForReaderUnderwayTimer,
581              &cstRemoteReader->cstWriter->lock,
582              csChangeForReader,
583              &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
584   }
585   return 0;
586 }
587
588 /*****************************************************************************/
589 void
590 CSTWriterMulticast(CSChangeForReader *csChangeForReader) 
591 {
592     CSTRemoteReader     *cstRemoteReader;
593     ObjectEntryOID      *objectEntryOID;
594     CSChangeForReader   *csChangeForReader1;
595     char                queue=1;
596     
597     cstRemoteReader=csChangeForReader->cstRemoteReader;
598     objectEntryOID=cstRemoteReader->sobject;
599
600     //multicast can do an application with multicast interface
601     if (!objectEntryOID->multicastPort)
602         return;
603
604     ul_list_for_each(CSChangeParticipant,
605                      csChangeForReader->csChange,
606                      csChangeForReader1) {
607         ObjectEntryOID  *objectEntryOID1;
608         CSTRemoteReader *cstRemoteReader1;
609       
610         cstRemoteReader1=csChangeForReader1->cstRemoteReader;
611         objectEntryOID1=cstRemoteReader1->sobject;
612                    
613         /* are RRs from same GROUP */
614         if (objectEntryOID!=objectEntryOID1)
615           continue;
616
617         /* is the csChange in state TOSEND ? If yes, marks like proc. */
618         CSTWriterCSChangeForReaderNewState(csChangeForReader1);
619
620         /* if there are no messages, detach sending timer */
621         if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
622             !(cstRemoteReader->commStateHB==MAYSENDHB))
623             continue;
624
625         if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) 
626           queue=2;
627         eventDetach(cstRemoteReader->cstWriter->domain,
628                     cstRemoteReader->sobject->objectEntryAID,
629                     &cstRemoteReader->delayResponceTimer,
630                     queue);
631     }
632 }