2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(CSTWriter,
35 CSTPublications, CSTWriter, GUID_RTPS,
36 cstWriter, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
38 CSTWriter, CSTRemoteReader, GUID_RTPS,
39 cstRemoteReader, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
41 CSTRemoteReader, CSChangeForReader, SequenceNumber,
42 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
44 /*****************************************************************************/
46 CSTWriterInit(ORTEDomain *d, CSTWriter *cstWriter, ObjectEntryOID *object,
47 ObjectId oid, CSTWriterParams *params, ORTETypeRegister *typeRegister)
50 debug(51, 10) ("CSTWriterInit: start\n");
51 //init values of cstwriter
52 cstWriter->guid.hid = object->objectEntryHID->hid;
53 cstWriter->guid.aid = object->objectEntryAID->aid;
54 cstWriter->guid.oid = oid;
55 cstWriter->objectEntryOID = object;
56 memcpy(&cstWriter->params, params, sizeof(CSTWriterParams));
57 cstWriter->registrationCounter = 0;
58 ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
59 cstWriter->strictReliableCounter = 0;
60 cstWriter->bestEffortsCounter = 0;
61 cstWriter->csChangesCounter = 0;
62 cstWriter->cstRemoteReaderCounter = 0;
63 cstWriter->registrationCounter = cstWriter->params.registrationRetries;
64 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
65 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
66 CSTWriterCSChange_init_head(cstWriter);
67 CSTRemoteReader_init_root_field(cstWriter);
68 pthread_rwlock_init(&cstWriter->lock, NULL);
69 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
70 cstWriter->domain = d;
71 cstWriter->typeRegister = typeRegister;
72 if ((cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
73 pthread_cond_init(&cstWriter->condCSChangeDestroyed, NULL);
74 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed, NULL);
75 cstWriter->condValueCSChangeDestroyed = 0;
77 //add event for refresh
78 if (NtpTimeCmp(cstWriter->params.refreshPeriod, iNtpTime) != 0) {
79 CSTWriterRefreshTimer(d, (void *)cstWriter);
81 //add event for registration
82 if (NtpTimeCmp(cstWriter->params.registrationPeriod, zNtpTime) != 0) {
83 CSTWriterRegistrationTimer(d, (void *)cstWriter);
85 debug(51, 4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
86 GUID_PRINTF(cstWriter->guid));
87 debug(51, 10) ("CSTWriterInit: finished\n");
90 /*****************************************************************************/
92 CSTWriterDelete(ORTEDomain *d, CSTWriter *cstWriter)
94 CSTRemoteReader *cstRemoteReader;
97 debug(51, 10) ("CSTWriterDelete: start\n");
99 debug(51, 4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
100 GUID_PRINTF(cstWriter->guid));
101 //Destroy all cstRemoteReader connected on cstWriter
102 while ((cstRemoteReader = CSTRemoteReader_first(cstWriter)))
103 CSTWriterDestroyRemoteReader(d, cstRemoteReader);
104 //Destroy all csChnages connected on cstWriter
105 while ((csChange = CSTWriterCSChange_cut_first(cstWriter))) {
106 parameterDelete(csChange);
110 cstWriter->objectEntryOID->objectEntryAID,
111 &cstWriter->refreshPeriodTimer,
114 cstWriter->objectEntryOID->objectEntryAID,
115 &cstWriter->registrationTimer,
117 if ((cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
118 pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
119 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
121 pthread_rwlock_destroy(&cstWriter->lock);
122 debug(51, 10) ("CSTWriterDelete: finished\n");
125 /*****************************************************************************/
127 CSTWriterAddRemoteReader(ORTEDomain *d, CSTWriter *cstWriter, ObjectEntryOID *pobject,
128 ObjectId oid, ObjectEntryOID *sobject)
130 CSTRemoteReader *cstRemoteReader;
131 CSChangeForReader *csChangeForReader;
132 CSChange *csChange = NULL;
134 cstWriter->cstRemoteReaderCounter++;
135 cstRemoteReader = (CSTRemoteReader *)MALLOC(sizeof(CSTRemoteReader));
136 cstRemoteReader->guid.hid = pobject->guid.hid;
137 cstRemoteReader->guid.aid = pobject->guid.aid;
138 cstRemoteReader->guid.oid = oid;
139 cstRemoteReader->sobject = sobject;
140 cstRemoteReader->pobject = pobject;
141 cstRemoteReader->cstWriter = cstWriter;
142 CSChangeForReader_init_root_field(cstRemoteReader);
143 cstRemoteReader->commStateHB = MAYSENDHB;
144 cstRemoteReader->commStateSend = NOTHNIGTOSEND;
145 cstRemoteReader->HBRetriesCounter = 0;
146 cstRemoteReader->csChangesCounter = 0;
147 cstRemoteReader->commStateToSentCounter = 0;
148 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
149 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
150 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
151 //insert remote reader
152 CSTRemoteReader_insert(cstWriter, cstRemoteReader);
154 if (cstRemoteReader->sobject->multicastPort) {
155 debug(51, 9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
156 GUID_PRINTF(cstRemoteReader->guid),
157 GUID_PRINTF(cstRemoteReader->sobject->guid));
158 ObjectEntryMulticast_insert(cstRemoteReader->sobject,
161 //copy all csChanges (not for publication)
162 if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION) {
163 ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
164 csChange->remoteReaderCount++;
165 cstRemoteReader->csChangesCounter++;
166 csChangeForReader = (CSChangeForReader *)MALLOC(sizeof(CSChangeForReader));
167 csChangeForReader->commStateChFReader = TOSEND;
168 cstRemoteReader->commStateToSentCounter++;
169 csChangeForReader->csChange = csChange;
170 csChangeForReader->cstRemoteReader = cstRemoteReader;
171 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
172 CSChangeParticipant_insert(csChange, csChangeForReader);
173 CSChangeForReader_insert(cstRemoteReader, csChangeForReader);
174 cstRemoteReader->commStateSend = MUSTSENDDATA;
176 if (cstRemoteReader->commStateSend == MUSTSENDDATA) {
178 cstRemoteReader->sobject->objectEntryAID,
179 &cstRemoteReader->delayResponceTimer,
181 "CSTWriterSendTimer",
183 &cstRemoteReader->cstWriter->lock,
185 &cstRemoteReader->cstWriter->params.delayResponceTime);
189 ORTESubsProp *sp = (ORTESubsProp *)pobject->attributes;
190 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0)
191 cstWriter->strictReliableCounter++;
193 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
194 cstWriter->bestEffortsCounter++;
197 debug(51, 4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
198 GUID_PRINTF(cstRemoteReader->guid));
199 return cstRemoteReader;
202 /*****************************************************************************/
204 CSTWriterDestroyRemoteReader(ORTEDomain *d, CSTRemoteReader *cstRemoteReader)
206 CSChangeForReader *csChangeForReader;
208 if (!cstRemoteReader)
210 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
211 debug(51, 4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
212 GUID_PRINTF(cstRemoteReader->guid));
213 if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
215 sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
216 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0)
217 cstRemoteReader->cstWriter->strictReliableCounter--;
219 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
220 cstRemoteReader->cstWriter->bestEffortsCounter--;
223 while ((csChangeForReader = CSChangeForReader_first(cstRemoteReader)))
224 CSTWriterDestroyCSChangeForReader(
225 csChangeForReader, ORTE_TRUE);
227 cstRemoteReader->sobject->objectEntryAID,
228 &cstRemoteReader->delayResponceTimer,
229 1); //metatraffic timer
231 cstRemoteReader->sobject->objectEntryAID,
232 &cstRemoteReader->delayResponceTimer,
235 cstRemoteReader->sobject->objectEntryAID,
236 &cstRemoteReader->repeatAnnounceTimer,
237 1); //metatraffic timer
239 cstRemoteReader->sobject->objectEntryAID,
240 &cstRemoteReader->repeatAnnounceTimer,
243 if (cstRemoteReader->sobject->multicastPort) {
244 ObjectEntryOID *object;
246 object = cstRemoteReader->sobject;
248 ObjectEntryMulticast_delete(object, cstRemoteReader);
249 debug(51, 9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
250 GUID_PRINTF(cstRemoteReader->guid),
251 GUID_PRINTF(object->guid));
253 if (ObjectEntryMulticast_is_empty(object)) {
254 objectEntryDelete(d, object, ORTE_TRUE);
257 CSTRemoteReader_delete(cstRemoteReader->cstWriter, cstRemoteReader);
258 FREE(cstRemoteReader);
261 /*****************************************************************************/
263 CSTWriterMakeGAP(ORTEDomain *d, CSTWriter *cstWriter, GUID_RTPS *guid)
265 CSChange *csChange, *csChange1;
267 ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
268 if ((!SeqNumberCmp(csChange->gapSN, noneSN)) &&
269 (!gavl_cmp_guid(&csChange->guid, guid))) { //equal? (VAR)
270 //VAR->GAP - inc gap_sn_no
271 SeqNumberInc(csChange->gapSN, csChange->gapSN);
272 parameterDelete(csChange);
273 //is Gap in prior or next position?
274 csChange1 = CSTWriterCSChange_prev(cstWriter, csChange);
276 if (SeqNumberCmp(csChange1->gapSN, noneSN)) {
277 SeqNumberAdd(csChange1->gapSN,
280 CSTWriterDestroyCSChange(d, cstWriter, csChange);
281 csChange = csChange1;
284 csChange1 = CSTWriterCSChange_next(cstWriter, csChange);
286 if (SeqNumberCmp(csChange1->gapSN, noneSN)) {
287 SeqNumberAdd(csChange->gapSN,
290 CSTWriterDestroyCSChange(d, cstWriter, csChange1);
298 /*****************************************************************************/
300 CSTWriterAddCSChange(ORTEDomain *d, CSTWriter *cstWriter, CSChange *csChange)
302 CSChangeForReader *csChangeForReader;
303 CSTRemoteReader *cstRemoteReader;
304 CSChange *csChangeFSN;
306 debug(51, 5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
307 GUID_PRINTF(cstWriter->guid));
308 cstWriter->csChangesCounter++;
309 //look for old cschange
310 if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION)
311 CSTWriterMakeGAP(d, cstWriter, &csChange->guid);
312 //insert cschange into database changes
313 SeqNumberInc(cstWriter->lastSN, cstWriter->lastSN);
314 csChange->sn = cstWriter->lastSN;
315 SEQUENCE_NUMBER_NONE(csChange->gapSN);
316 csChange->remoteReaderCount = cstWriter->cstRemoteReaderCounter;
317 csChange->remoteReaderBest = 0;
318 csChange->remoteReaderStrict = 0;
319 CSChangeParticipant_init_head(csChange);
320 CSTWriterCSChange_insert(cstWriter, csChange);
321 debug(51, 5) ("CSTWriterAddCSChange: sn:0x%x\n",
324 csChangeFSN = CSTWriterCSChange_first(cstWriter);
325 if (SeqNumberCmp(csChangeFSN->gapSN, noneSN) > 0) {
326 //minimal are 2 SNs (GAP,VAR) ...
327 // CSTWriterDestroyCSChange(cstWriter,csChange);
329 csChangeFSN = CSTWriterCSChange_first(cstWriter);
330 cstWriter->firstSN = csChangeFSN->sn;
331 //insert new cschange for each reader
332 gavl_cust_for_each(CSTRemoteReader, cstWriter, cstRemoteReader) {
334 debug(51, 10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
335 GUID_PRINTF(cstRemoteReader->guid));
336 csChangeForReader = (CSChangeForReader *)MALLOC(sizeof(CSChangeForReader));
337 csChangeForReader->commStateChFReader = TOSEND;
338 cstRemoteReader->commStateToSentCounter++;
339 csChangeForReader->csChange = csChange;
340 csChangeForReader->cstRemoteReader = cstRemoteReader;
341 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
342 CSChangeParticipant_insert(csChange, csChangeForReader);
343 CSChangeForReader_insert(cstRemoteReader, csChangeForReader);
344 cstRemoteReader->csChangesCounter++;
345 cstRemoteReader->HBRetriesCounter = 0;
346 cstRemoteReader->commStateSend = MUSTSENDDATA;
347 if ((cstWriter->guid.oid & 0x07) != OID_PUBLICATION) {
349 cstRemoteReader->sobject->objectEntryAID,
350 &cstRemoteReader->delayResponceTimer,
353 cstRemoteReader->sobject->objectEntryAID,
354 &cstRemoteReader->delayResponceTimer,
356 "CSTWriterSendTimer",
358 &cstRemoteReader->cstWriter->lock,
362 ORTESubsProp *sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
364 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
365 //Strict reliable subscription
366 csChange->remoteReaderStrict++;
368 cstRemoteReader->sobject->objectEntryAID,
369 &cstRemoteReader->delayResponceTimer,
372 cstRemoteReader->sobject->objectEntryAID,
373 &cstRemoteReader->delayResponceTimer,
375 "CSTWriterSendStrictTimer",
376 CSTWriterSendStrictTimer,
377 &cstRemoteReader->cstWriter->lock,
381 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
382 //best efforts subscription
383 NtpTime nextIssueTime, nextIssueDelay, actTime;
385 actTime = getActualNtpTime();
386 csChange->remoteReaderBest++;
387 NtpTimeAdd(nextIssueTime,
388 cstRemoteReader->lastSentIssueTime,
389 sp->minimumSeparation);
390 NtpTimeSub(nextIssueDelay,
393 if (NtpTimeCmp(actTime, nextIssueTime) >= 0)
394 NTPTIME_ZERO(nextIssueDelay);
396 cstRemoteReader->sobject->objectEntryAID,
397 &cstRemoteReader->delayResponceTimer,
399 //schedule sent issue
401 cstRemoteReader->sobject->objectEntryAID,
402 &cstRemoteReader->delayResponceTimer,
404 "CSTWriterSendBestEffortTimer",
405 CSTWriterSendBestEffortTimer,
406 &cstRemoteReader->cstWriter->lock,
410 //!Best_Effort & !Strict_Reliable
411 CSTWriterDestroyCSChangeForReader(csChangeForReader,
413 debug(51, 5) ("CSTWriterAddCSChange: destroyed\n");
417 debug(51, 5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
419 debug(51, 5) ("CSTWriterAddCSChange: finished\n");
422 /*****************************************************************************/
424 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
425 Boolean destroyCSChange)
427 CSTRemoteReader *cstRemoteReader;
430 if (!csChangeForReader)
432 cstRemoteReader = csChangeForReader->cstRemoteReader;
433 csChange = csChangeForReader->csChange;
434 csChange->remoteReaderCount--;
435 cstRemoteReader->csChangesCounter--;
436 if (!cstRemoteReader->csChangesCounter) {
437 cstRemoteReader->commStateSend = NOTHNIGTOSEND;
439 if (csChangeForReader->commStateChFReader == TOSEND) {
440 cstRemoteReader->commStateToSentCounter--;
442 if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
443 ORTESubsProp *sp = (ORTESubsProp *)cstRemoteReader->pobject->attributes;
444 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
445 csChange->remoteReaderStrict--;
447 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
448 csChange->remoteReaderBest--;
452 eventDetach(cstRemoteReader->cstWriter->domain,
453 cstRemoteReader->sobject->objectEntryAID,
454 &csChangeForReader->waitWhileDataUnderwayTimer,
456 CSChangeParticipant_delete(csChange, csChangeForReader);
457 CSChangeForReader_delete(cstRemoteReader, csChangeForReader);
458 FREE(csChangeForReader);
460 if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION) {
461 if (!csChange->remoteReaderCount) {
462 if (destroyCSChange) {
463 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
464 cstRemoteReader->cstWriter, csChange);
466 pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
467 cstRemoteReader->cstWriter->condValueCSChangeDestroyed = 1;
468 pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
469 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
470 debug(51, 5) ("Publication: new queue level (%d)\n",
471 cstRemoteReader->cstWriter->csChangesCounter);
476 /*****************************************************************************/
478 CSTWriterDestroyCSChange(ORTEDomain *d, CSTWriter *cstWriter, CSChange *csChange)
480 CSTRemoteReader *cstRemoteReader;
481 CSChangeForReader *csChangeForReader;
482 CSChange *csChangeFSN;
487 cstWriter->csChangesCounter--;
488 CSTWriterCSChange_delete(cstWriter, csChange);
489 gavl_cust_for_each(CSTRemoteReader, cstWriter, cstRemoteReader) {
490 csChangeForReader = CSChangeForReader_find(cstRemoteReader, &csChange->sn);
491 CSTWriterDestroyCSChangeForReader(
492 csChangeForReader, ORTE_FALSE);
495 if (csChange->cdrCodec.buffer)
496 FREE(csChange->cdrCodec.buffer);
497 parameterDelete(csChange);
501 csChangeFSN = CSTWriterCSChange_first(cstWriter);
503 cstWriter->firstSN = csChangeFSN->sn;
505 cstWriter->firstSN = cstWriter->lastSN;
508 /*****************************************************************************/
510 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter)
514 ul_list_for_each(CSTWriterCSChange, cstWriter, csChange) {
516 if (!csChange->remoteReaderStrict) {
517 CSTWriterDestroyCSChange(cstWriter->domain, cstWriter, csChange);
524 /*****************************************************************************/
526 CSTWriterRefreshAllCSChanges(ORTEDomain *d, CSTRemoteReader *cstRemoteReader)
528 CSChangeForReader *csChangeForReader;
529 int32_t timerQueue = 1;
531 if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION)
532 timerQueue = 2; //userdata timer queue
534 gavl_cust_for_each(CSChangeForReader, cstRemoteReader, csChangeForReader) {
537 if (SeqNumberCmp(csChangeForReader->csChange->gapSN, noneSN) == 0) {
539 if (csChangeForReader->commStateChFReader != TOSEND) {
540 csChangeForReader->commStateChFReader = TOSEND;
541 cstRemoteReader->commStateToSentCounter++;
544 if (cstRemoteReader->commStateSend == NOTHNIGTOSEND) {
545 cstRemoteReader->commStateSend = MUSTSENDDATA;
547 cstRemoteReader->sobject->objectEntryAID,
548 &cstRemoteReader->delayResponceTimer,
551 cstRemoteReader->sobject->objectEntryAID,
552 &cstRemoteReader->delayResponceTimer,
554 "CSTWriterSendTimer",
556 &cstRemoteReader->cstWriter->lock,
558 &cstRemoteReader->cstWriter->params.delayResponceTime);
564 /*****************************************************************************/
566 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader)
568 CSTRemoteReader *cstRemoteReader = csChangeForReader->cstRemoteReader;
570 //setup new state for csChangeForReader
571 if (csChangeForReader->commStateChFReader != TOSEND)
573 cstRemoteReader->commStateToSentCounter--;
575 if (!cstRemoteReader->commStateToSentCounter)
576 cstRemoteReader->commStateSend = NOTHNIGTOSEND;
578 if (NtpTimeCmp(zNtpTime,
579 cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime) == 0) {
580 csChangeForReader->commStateChFReader = UNACKNOWLEDGED;
582 csChangeForReader->commStateChFReader = UNDERWAY;
583 eventDetach(cstRemoteReader->cstWriter->domain,
584 cstRemoteReader->sobject->objectEntryAID,
585 &csChangeForReader->waitWhileDataUnderwayTimer,
587 eventAdd(cstRemoteReader->cstWriter->domain,
588 cstRemoteReader->sobject->objectEntryAID,
589 &csChangeForReader->waitWhileDataUnderwayTimer,
591 "CSChangeForReaderUnderwayTimer",
592 CSChangeForReaderUnderwayTimer,
593 &cstRemoteReader->cstWriter->lock,
595 &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
600 /*****************************************************************************/
602 CSTWriterMulticast(CSChangeForReader *csChangeForReader)
604 CSTRemoteReader *cstRemoteReader;
605 ObjectEntryOID *objectEntryOID;
606 CSChangeForReader *csChangeForReader1;
609 cstRemoteReader = csChangeForReader->cstRemoteReader;
610 objectEntryOID = cstRemoteReader->sobject;
612 //multicast can do an application with multicast interface
613 if (!objectEntryOID->multicastPort)
616 ul_list_for_each(CSChangeParticipant,
617 csChangeForReader->csChange,
618 csChangeForReader1) {
619 ObjectEntryOID *objectEntryOID1;
620 CSTRemoteReader *cstRemoteReader1;
622 cstRemoteReader1 = csChangeForReader1->cstRemoteReader;
623 objectEntryOID1 = cstRemoteReader1->sobject;
625 /* are RRs from same GROUP */
626 if (objectEntryOID != objectEntryOID1)
629 /* is the csChange in state TOSEND ? If yes, marks like proc. */
630 CSTWriterCSChangeForReaderNewState(csChangeForReader1);
632 /* if there are no messages, detach sending timer */
633 if (!(cstRemoteReader->commStateSend == NOTHNIGTOSEND) &&
634 !(cstRemoteReader->commStateHB == MAYSENDHB))
637 if ((cstRemoteReader->cstWriter->guid.oid & 0x07) == OID_PUBLICATION)
639 eventDetach(cstRemoteReader->cstWriter->domain,
640 cstRemoteReader->sobject->objectEntryAID,
641 &cstRemoteReader->delayResponceTimer,