]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTWriter.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSCSTWriter.c
1 /*
2  *  $Id: RTPSCSTWriter.c,v 0.0.0.1      2003/09/13
3  *
4  *  DEBUG:  section 51                  CSTWriter
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(CSTWriter,
35                        CSTPublications, CSTWriter, GUID_RTPS,
36                        cstWriter, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
38                        CSTWriter, CSTRemoteReader, GUID_RTPS,
39                        cstRemoteReader, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
41                        CSTRemoteReader, CSChangeForReader, SequenceNumber,
42                        csChangeForReader, node, csChange->sn, gavl_cmp_sn);
43
44 /*****************************************************************************/
45 void
46 CSTWriterInit(ORTEDomain *d, CSTWriter *cstWriter, ObjectEntryOID *object,
47               ObjectId oid, CSTWriterParams *params, ORTETypeRegister *typeRegister)
48 {
49
50   debug(51, 10) ("CSTWriterInit: start\n");
51   //init values of cstwriter
52   cstWriter->guid.hid = object->objectEntryHID->hid;
53   cstWriter->guid.aid = object->objectEntryAID->aid;
54   cstWriter->guid.oid = oid;
55   cstWriter->objectEntryOID = object;
56   memcpy(&cstWriter->params, params, sizeof(CSTWriterParams));
57   cstWriter->registrationCounter = 0;
58   ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
59   cstWriter->strictReliableCounter = 0;
60   cstWriter->bestEffortsCounter = 0;
61   cstWriter->csChangesCounter = 0;
62   cstWriter->cstRemoteReaderCounter = 0;
63   cstWriter->registrationCounter = cstWriter->params.registrationRetries;
64   SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
65   SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
66   CSTWriterCSChange_init_head(cstWriter);
67   CSTRemoteReader_init_root_field(cstWriter);
68   pthread_rwlock_init(&cstWriter->lock, NULL);
69   ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
70   cstWriter->domain = d;
71   cstWriter->typeRegister = typeRegister;
72   if ((cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
73     pthread_cond_init(&cstWriter->condCSChangeDestroyed, NULL);
74     pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed, NULL);
75     cstWriter->condValueCSChangeDestroyed = 0;
76   }
77   //add event for refresh
78   if (NtpTimeCmp(cstWriter->params.refreshPeriod, iNtpTime) != 0) {
79     CSTWriterRefreshTimer(d, (void *)cstWriter);
80   }
81   //add event for registration
82   if (NtpTimeCmp(cstWriter->params.registrationPeriod, zNtpTime) != 0) {
83     CSTWriterRegistrationTimer(d, (void *)cstWriter);
84   }
85   debug(51, 4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
86                 GUID_PRINTF(cstWriter->guid));
87   debug(51, 10) ("CSTWriterInit: finished\n");
88 }
89
90 /*****************************************************************************/
91 void
92 CSTWriterDelete(ORTEDomain *d, CSTWriter *cstWriter)
93 {
94   CSTRemoteReader       *cstRemoteReader;
95   CSChange              *csChange;
96
97   debug(51, 10) ("CSTWriterDelete: start\n");
98
99   debug(51, 4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
100                 GUID_PRINTF(cstWriter->guid));
101   //Destroy all cstRemoteReader connected on cstWriter
102   while ((cstRemoteReader = CSTRemoteReader_first(cstWriter)))
103     CSTWriterDestroyRemoteReader(d, cstRemoteReader);
104   //Destroy all csChnages connected on cstWriter
105   while ((csChange = CSTWriterCSChange_cut_first(cstWriter))) {
106     parameterDelete(csChange);
107     FREE(csChange);
108   }
109   eventDetach(d,
110               cstWriter->objectEntryOID->objectEntryAID,
111               &cstWriter->refreshPeriodTimer,
112               0);
113   eventDetach(d,
114               cstWriter->objectEntryOID->objectEntryAID,
115               &cstWriter->registrationTimer,
116               0);
117   if ((cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
118     pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
119     pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
120   }
121   pthread_rwlock_destroy(&cstWriter->lock);
122   debug(51, 10) ("CSTWriterDelete: finished\n");
123 }
124
125 /*****************************************************************************/
126 CSTRemoteReader *
127 CSTWriterAddRemoteReader(ORTEDomain *d, CSTWriter *cstWriter, ObjectEntryOID *pobject,
128                          ObjectId oid, ObjectEntryOID *sobject)
129 {
130   CSTRemoteReader     *cstRemoteReader;
131   CSChangeForReader   *csChangeForReader;
132   CSChange            *csChange = NULL;
133
134   cstWriter->cstRemoteReaderCounter++;
135   cstRemoteReader = (CSTRemoteReader *)MALLOC(sizeof(CSTRemoteReader));
136   cstRemoteReader->guid.hid = pobject->guid.hid;
137   cstRemoteReader->guid.aid = pobject->guid.aid;
138   cstRemoteReader->guid.oid = oid;
139   cstRemoteReader->sobject = sobject;
140   cstRemoteReader->pobject = pobject;
141   cstRemoteReader->cstWriter = cstWriter;
142   CSChangeForReader_init_root_field(cstRemoteReader);
143   cstRemoteReader->commStateHB = MAYSENDHB;
144   cstRemoteReader->commStateSend = NOTHNIGTOSEND;
145   cstRemoteReader->HBRetriesCounter = 0;
146   cstRemoteReader->csChangesCounter = 0;
147   cstRemoteReader->commStateToSentCounter = 0;
148   NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
149   ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
150   ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
151   //insert remote reader
152   CSTRemoteReader_insert(cstWriter, cstRemoteReader);
153   //multicast case
154   if (cstRemoteReader->sobject->multicastPort) {
155     debug(51, 9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
156                   GUID_PRINTF(cstRemoteReader->guid),
157                   GUID_PRINTF(cstRemoteReader->sobject->guid));
158     ObjectEntryMulticast_insert(cstRemoteReader->sobject,
159                                 cstRemoteReader);
160   }
161   //copy all csChanges (not for publication)
162   if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION) {
163     ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
164       csChange->remoteReaderCount++;
165       cstRemoteReader->csChangesCounter++;
166       csChangeForReader = (CSChangeForReader *)MALLOC(sizeof(CSChangeForReader));
167       csChangeForReader->commStateChFReader = TOSEND;
168       cstRemoteReader->commStateToSentCounter++;
169       csChangeForReader->csChange = csChange;
170       csChangeForReader->cstRemoteReader = cstRemoteReader;
171       ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
172       CSChangeParticipant_insert(csChange, csChangeForReader);
173       CSChangeForReader_insert(cstRemoteReader, csChangeForReader);
174       cstRemoteReader->commStateSend = MUSTSENDDATA;
175     }
176     if (cstRemoteReader->commStateSend == MUSTSENDDATA) {
177       eventAdd(d,
178                cstRemoteReader->sobject->objectEntryAID,
179                &cstRemoteReader->delayResponceTimer,
180                1,
181                "CSTWriterSendTimer",
182                CSTWriterSendTimer,
183                &cstRemoteReader->cstWriter->lock,
184                cstRemoteReader,
185                &cstRemoteReader->cstWriter->params.delayResponceTime);
186     }
187   } else {
188     //Publication
189     ORTESubsProp *sp = (ORTESubsProp *)pobject->attributes;
190     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0)
191       cstWriter->strictReliableCounter++;
192     else {
193       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
194         cstWriter->bestEffortsCounter++;
195     }
196   }
197   debug(51, 4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
198                 GUID_PRINTF(cstRemoteReader->guid));
199   return cstRemoteReader;
200 }
201
202 /*****************************************************************************/
203 void
204 CSTWriterDestroyRemoteReader(ORTEDomain *d, CSTRemoteReader *cstRemoteReader)
205 {
206   CSChangeForReader   *csChangeForReader;
207
208   if (!cstRemoteReader)
209     return;
210   cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
211   debug(51, 4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
212                 GUID_PRINTF(cstRemoteReader->guid));
213   if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
214     ORTESubsProp *sp;
215     sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
216     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0)
217       cstRemoteReader->cstWriter->strictReliableCounter--;
218     else {
219       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
220         cstRemoteReader->cstWriter->bestEffortsCounter--;
221     }
222   }
223   while ((csChangeForReader = CSChangeForReader_first(cstRemoteReader)))
224     CSTWriterDestroyCSChangeForReader(
225       csChangeForReader, ORTE_TRUE);
226   eventDetach(d,
227               cstRemoteReader->sobject->objectEntryAID,
228               &cstRemoteReader->delayResponceTimer,
229               1); //metatraffic timer
230   eventDetach(d,
231               cstRemoteReader->sobject->objectEntryAID,
232               &cstRemoteReader->delayResponceTimer,
233               2); //userdata timer
234   eventDetach(d,
235               cstRemoteReader->sobject->objectEntryAID,
236               &cstRemoteReader->repeatAnnounceTimer,
237               1); //metatraffic timer
238   eventDetach(d,
239               cstRemoteReader->sobject->objectEntryAID,
240               &cstRemoteReader->repeatAnnounceTimer,
241               2); //userdata timer
242   //multicast case
243   if (cstRemoteReader->sobject->multicastPort) {
244     ObjectEntryOID *object;
245
246     object = cstRemoteReader->sobject;
247
248     ObjectEntryMulticast_delete(object, cstRemoteReader);
249     debug(51, 9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
250                   GUID_PRINTF(cstRemoteReader->guid),
251                   GUID_PRINTF(object->guid));
252
253     if (ObjectEntryMulticast_is_empty(object)) {
254       objectEntryDelete(d, object, ORTE_TRUE);
255     }
256   }
257   CSTRemoteReader_delete(cstRemoteReader->cstWriter, cstRemoteReader);
258   FREE(cstRemoteReader);
259 }
260
261 /*****************************************************************************/
262 void
263 CSTWriterMakeGAP(ORTEDomain *d, CSTWriter *cstWriter, GUID_RTPS *guid)
264 {
265   CSChange            *csChange, *csChange1;
266
267   ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
268     if ((!SeqNumberCmp(csChange->gapSN, noneSN)) &&
269         (!gavl_cmp_guid(&csChange->guid, guid))) {  //equal? (VAR)
270       //VAR->GAP - inc gap_sn_no
271       SeqNumberInc(csChange->gapSN, csChange->gapSN);
272       parameterDelete(csChange);
273       //is Gap in prior or next position?
274       csChange1 = CSTWriterCSChange_prev(cstWriter, csChange);
275       if (csChange1) {
276         if (SeqNumberCmp(csChange1->gapSN, noneSN)) {
277           SeqNumberAdd(csChange1->gapSN,
278                        csChange1->gapSN,
279                        csChange->gapSN);
280           CSTWriterDestroyCSChange(d, cstWriter, csChange);
281           csChange = csChange1;
282         }
283       }
284       csChange1 = CSTWriterCSChange_next(cstWriter, csChange);
285       if (csChange1) {
286         if (SeqNumberCmp(csChange1->gapSN, noneSN)) {
287           SeqNumberAdd(csChange->gapSN,
288                        csChange->gapSN,
289                        csChange1->gapSN);
290           CSTWriterDestroyCSChange(d, cstWriter, csChange1);
291         }
292       }
293       break;
294     }
295   }
296 }
297
298 /*****************************************************************************/
299 void
300 CSTWriterAddCSChange(ORTEDomain *d, CSTWriter *cstWriter, CSChange *csChange)
301 {
302   CSChangeForReader   *csChangeForReader;
303   CSTRemoteReader     *cstRemoteReader;
304   CSChange            *csChangeFSN;
305
306   debug(51, 5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
307                 GUID_PRINTF(cstWriter->guid));
308   cstWriter->csChangesCounter++;
309   //look for old cschange
310   if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION)
311     CSTWriterMakeGAP(d, cstWriter, &csChange->guid);
312   //insert cschange into database changes
313   SeqNumberInc(cstWriter->lastSN, cstWriter->lastSN);
314   csChange->sn = cstWriter->lastSN;
315   SEQUENCE_NUMBER_NONE(csChange->gapSN);
316   csChange->remoteReaderCount = cstWriter->cstRemoteReaderCounter;
317   csChange->remoteReaderBest = 0;
318   csChange->remoteReaderStrict = 0;
319   CSChangeParticipant_init_head(csChange);
320   CSTWriterCSChange_insert(cstWriter, csChange);
321   debug(51, 5) ("CSTWriterAddCSChange: sn:0x%x\n",
322                 csChange->sn.low);
323   //update FirstSN
324   csChangeFSN = CSTWriterCSChange_first(cstWriter);
325   if (SeqNumberCmp(csChangeFSN->gapSN, noneSN) > 0) {
326     //minimal are 2 SNs (GAP,VAR) ...
327 //    CSTWriterDestroyCSChange(cstWriter,csChange);
328   }
329   csChangeFSN = CSTWriterCSChange_first(cstWriter);
330   cstWriter->firstSN = csChangeFSN->sn;
331   //insert new cschange for each reader
332   gavl_cust_for_each(CSTRemoteReader, cstWriter, cstRemoteReader) {
333     //csChangeForReader
334     debug(51, 10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
335                    GUID_PRINTF(cstRemoteReader->guid));
336     csChangeForReader = (CSChangeForReader *)MALLOC(sizeof(CSChangeForReader));
337     csChangeForReader->commStateChFReader = TOSEND;
338     cstRemoteReader->commStateToSentCounter++;
339     csChangeForReader->csChange = csChange;
340     csChangeForReader->cstRemoteReader = cstRemoteReader;
341     ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
342     CSChangeParticipant_insert(csChange, csChangeForReader);
343     CSChangeForReader_insert(cstRemoteReader, csChangeForReader);
344     cstRemoteReader->csChangesCounter++;
345     cstRemoteReader->HBRetriesCounter = 0;
346     cstRemoteReader->commStateSend = MUSTSENDDATA;
347     if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION) {
348       eventDetach(d,
349                   cstRemoteReader->sobject->objectEntryAID,
350                   &cstRemoteReader->delayResponceTimer,
351                   1);
352       eventAdd(d,
353                cstRemoteReader->sobject->objectEntryAID,
354                &cstRemoteReader->delayResponceTimer,
355                1,
356                "CSTWriterSendTimer",
357                CSTWriterSendTimer,
358                &cstRemoteReader->cstWriter->lock,
359                cstRemoteReader,
360                NULL);
361     } else {
362       ORTESubsProp *sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
363
364       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
365         //Strict reliable subscription
366         csChange->remoteReaderStrict++;
367         eventDetach(d,
368                     cstRemoteReader->sobject->objectEntryAID,
369                     &cstRemoteReader->delayResponceTimer,
370                     2);
371         eventAdd(d,
372                  cstRemoteReader->sobject->objectEntryAID,
373                  &cstRemoteReader->delayResponceTimer,
374                  2,
375                  "CSTWriterSendStrictTimer",
376                  CSTWriterSendStrictTimer,
377                  &cstRemoteReader->cstWriter->lock,
378                  cstRemoteReader,
379                  NULL);
380       } else {
381         if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
382           //best efforts subscription
383           NtpTime nextIssueTime, nextIssueDelay, actTime;
384
385           actTime = getActualNtpTime();
386           csChange->remoteReaderBest++;
387           NtpTimeAdd(nextIssueTime,
388                      cstRemoteReader->lastSentIssueTime,
389                      sp->minimumSeparation);
390           NtpTimeSub(nextIssueDelay,
391                      nextIssueTime,
392                      actTime);
393           if (NtpTimeCmp(actTime, nextIssueTime) >= 0)
394             NTPTIME_ZERO(nextIssueDelay);
395           eventDetach(d,
396                       cstRemoteReader->sobject->objectEntryAID,
397                       &cstRemoteReader->delayResponceTimer,
398                       2);
399           //schedule sent issue
400           eventAdd(d,
401                    cstRemoteReader->sobject->objectEntryAID,
402                    &cstRemoteReader->delayResponceTimer,
403                    2,
404                    "CSTWriterSendBestEffortTimer",
405                    CSTWriterSendBestEffortTimer,
406                    &cstRemoteReader->cstWriter->lock,
407                    cstRemoteReader,
408                    &nextIssueDelay);
409         } else {
410           //!Best_Effort & !Strict_Reliable
411           CSTWriterDestroyCSChangeForReader(csChangeForReader,
412                                             ORTE_TRUE);
413           debug(51, 5) ("CSTWriterAddCSChange: destroyed\n");
414         }
415       }
416     }
417     debug(51, 5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
418   }
419   debug(51, 5) ("CSTWriterAddCSChange: finished\n");
420 }
421
422 /*****************************************************************************/
423 void
424 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
425                                   Boolean destroyCSChange)
426 {
427   CSTRemoteReader *cstRemoteReader;
428   CSChange *csChange;
429
430   if (!csChangeForReader)
431     return;
432   cstRemoteReader = csChangeForReader->cstRemoteReader;
433   csChange = csChangeForReader->csChange;
434   csChange->remoteReaderCount--;
435   cstRemoteReader->csChangesCounter--;
436   if (!cstRemoteReader->csChangesCounter) {
437     cstRemoteReader->commStateSend = NOTHNIGTOSEND;
438   }
439   if (csChangeForReader->commStateChFReader == TOSEND) {
440     cstRemoteReader->commStateToSentCounter--;
441   }
442   if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
443     ORTESubsProp *sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
444     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
445       csChange->remoteReaderStrict--;
446     } else {
447       if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
448         csChange->remoteReaderBest--;
449       }
450     }
451   }
452   eventDetach(cstRemoteReader->cstWriter->domain,
453               cstRemoteReader->sobject->objectEntryAID,
454               &csChangeForReader->waitWhileDataUnderwayTimer,
455               0);
456   CSChangeParticipant_delete(csChange, csChangeForReader);
457   CSChangeForReader_delete(cstRemoteReader, csChangeForReader);
458   FREE(csChangeForReader);
459
460   if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
461     if (!csChange->remoteReaderCount) {
462       if (destroyCSChange) {
463         CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
464                                  cstRemoteReader->cstWriter, csChange);
465       }
466       pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
467       cstRemoteReader->cstWriter->condValueCSChangeDestroyed = 1;
468       pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
469       pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
470       debug(51, 5) ("Publication: new queue level (%d)\n",
471                     cstRemoteReader->cstWriter->csChangesCounter);
472     }
473   }
474 }
475
476 /*****************************************************************************/
477 void
478 CSTWriterDestroyCSChange(ORTEDomain *d, CSTWriter *cstWriter, CSChange *csChange)
479 {
480   CSTRemoteReader     *cstRemoteReader;
481   CSChangeForReader   *csChangeForReader;
482   CSChange            *csChangeFSN;
483
484   if (!csChange)
485     return;
486
487   cstWriter->csChangesCounter--;
488   CSTWriterCSChange_delete(cstWriter, csChange);
489   gavl_cust_for_each(CSTRemoteReader, cstWriter, cstRemoteReader) {
490     csChangeForReader = CSChangeForReader_find(cstRemoteReader, &csChange->sn);
491     CSTWriterDestroyCSChangeForReader(
492       csChangeForReader, ORTE_FALSE);
493   }
494
495   if (csChange->cdrCodec.buffer)
496     FREE(csChange->cdrCodec.buffer);
497   parameterDelete(csChange);
498   FREE(csChange);
499
500   //update first SN
501   csChangeFSN = CSTWriterCSChange_first(cstWriter);
502   if (csChangeFSN)
503     cstWriter->firstSN = csChangeFSN->sn;
504   else
505     cstWriter->firstSN = cstWriter->lastSN;
506 }
507
508 /*****************************************************************************/
509 Boolean
510 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter)
511 {
512   CSChange *csChange;
513
514   ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
515
516     if (!csChange->remoteReaderStrict) {
517       CSTWriterDestroyCSChange(cstWriter->domain, cstWriter, csChange);
518       return ORTE_TRUE;
519     }
520   }
521   return ORTE_FALSE;
522 }
523
524 /*****************************************************************************/
525 void
526 CSTWriterRefreshAllCSChanges(ORTEDomain *d, CSTRemoteReader *cstRemoteReader)
527 {
528   CSChangeForReader   *csChangeForReader;
529   int32_t             timerQueue = 1;
530
531   if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION)
532     timerQueue = 2;  //userdata timer queue
533
534   gavl_cust_for_each(CSChangeForReader, cstRemoteReader, csChangeForReader) {
535
536     //refresh only VAR's
537     if (SeqNumberCmp(csChangeForReader->csChange->gapSN, noneSN) == 0) {
538
539       if (csChangeForReader->commStateChFReader != TOSEND) {
540         csChangeForReader->commStateChFReader = TOSEND;
541         cstRemoteReader->commStateToSentCounter++;
542       }
543
544       if (cstRemoteReader->commStateSend == NOTHNIGTOSEND) {
545         cstRemoteReader->commStateSend = MUSTSENDDATA;
546         eventDetach(d,
547                     cstRemoteReader->sobject->objectEntryAID,
548                     &cstRemoteReader->delayResponceTimer,
549                     timerQueue);
550         eventAdd(d,
551                  cstRemoteReader->sobject->objectEntryAID,
552                  &cstRemoteReader->delayResponceTimer,
553                  timerQueue,
554                  "CSTWriterSendTimer",
555                  CSTWriterSendTimer,
556                  &cstRemoteReader->cstWriter->lock,
557                  cstRemoteReader,
558                  &cstRemoteReader->cstWriter->params.delayResponceTime);
559       }
560     }
561   }
562 }
563
564 /*****************************************************************************/
565 int
566 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader)
567 {
568   CSTRemoteReader *cstRemoteReader = csChangeForReader->cstRemoteReader;
569
570   //setup new state for csChangeForReader
571   if (csChangeForReader->commStateChFReader != TOSEND)
572     return -1;
573   cstRemoteReader->commStateToSentCounter--;
574
575   if (!cstRemoteReader->commStateToSentCounter)
576     cstRemoteReader->commStateSend = NOTHNIGTOSEND;
577
578   if (NtpTimeCmp(zNtpTime,
579                  cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime) == 0) {
580     csChangeForReader->commStateChFReader = UNACKNOWLEDGED;
581   } else {
582     csChangeForReader->commStateChFReader = UNDERWAY;
583     eventDetach(cstRemoteReader->cstWriter->domain,
584                 cstRemoteReader->sobject->objectEntryAID,
585                 &csChangeForReader->waitWhileDataUnderwayTimer,
586                 0);
587     eventAdd(cstRemoteReader->cstWriter->domain,
588              cstRemoteReader->sobject->objectEntryAID,
589              &csChangeForReader->waitWhileDataUnderwayTimer,
590              0,   //common timer
591              "CSChangeForReaderUnderwayTimer",
592              CSChangeForReaderUnderwayTimer,
593              &cstRemoteReader->cstWriter->lock,
594              csChangeForReader,
595              &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
596   }
597   return 0;
598 }
599
600 /*****************************************************************************/
601 void
602 CSTWriterMulticast(CSChangeForReader *csChangeForReader)
603 {
604   CSTRemoteReader     *cstRemoteReader;
605   ObjectEntryOID      *objectEntryOID;
606   CSChangeForReader   *csChangeForReader1;
607   char                queue = 1;
608
609   cstRemoteReader = csChangeForReader->cstRemoteReader;
610   objectEntryOID = cstRemoteReader->sobject;
611
612   //multicast can do an application with multicast interface
613   if (!objectEntryOID->multicastPort)
614     return;
615
616   ul_list_for_each(CSChangeParticipant,
617                    csChangeForReader->csChange,
618                    csChangeForReader1) {
619     ObjectEntryOID  *objectEntryOID1;
620     CSTRemoteReader *cstRemoteReader1;
621
622     cstRemoteReader1 = csChangeForReader1->cstRemoteReader;
623     objectEntryOID1 = cstRemoteReader1->sobject;
624
625     /* are RRs from same GROUP */
626     if (objectEntryOID != objectEntryOID1)
627       continue;
628
629     /* is the csChange in state TOSEND ? If yes, marks like proc. */
630     CSTWriterCSChangeForReaderNewState(csChangeForReader1);
631
632     /* if there are no messages, detach sending timer */
633     if (!(cstRemoteReader->commStateSend == NOTHNIGTOSEND) &&
634         !(cstRemoteReader->commStateHB == MAYSENDHB))
635       continue;
636
637     if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION)
638       queue = 2;
639     eventDetach(cstRemoteReader->cstWriter->domain,
640                 cstRemoteReader->sobject->objectEntryAID,
641                 &cstRemoteReader->delayResponceTimer,
642                 queue);
643   }
644 }