2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(CSTWriter,
35 CSTPublications, CSTWriter, GUID_RTPS,
36 cstWriter, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
38 CSTWriter, CSTRemoteReader, GUID_RTPS,
39 cstRemoteReader, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
41 CSTRemoteReader, CSChangeForReader, SequenceNumber,
42 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
44 /*****************************************************************************/
46 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
47 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
49 debug(51,10) ("CSTWriterInit: start\n");
50 //init values of cstwriter
51 cstWriter->guid.hid=object->objectEntryHID->hid;
52 cstWriter->guid.aid=object->objectEntryAID->aid;
53 cstWriter->guid.oid=oid;
54 cstWriter->objectEntryOID=object;
55 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
56 cstWriter->registrationCounter=0;
57 ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
58 cstWriter->strictReliableCounter=0;
59 cstWriter->bestEffortsCounter=0;
60 cstWriter->csChangesCounter=0;
61 cstWriter->cstRemoteReaderCounter=0;
62 cstWriter->registrationCounter=cstWriter->params.registrationRetries;
63 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
64 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
65 CSTWriterCSChange_init_head(cstWriter);
66 CSTRemoteReader_init_root_field(cstWriter);
67 pthread_rwlock_init(&cstWriter->lock,NULL);
68 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
70 cstWriter->typeRegister=typeRegister;
71 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
72 pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
73 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
74 cstWriter->condValueCSChangeDestroyed=0;
76 //add event for refresh
77 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
78 CSTWriterRefreshTimer(d,(void*)cstWriter);
80 //add event for registration
81 if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
82 CSTWriterRegistrationTimer(d,(void*)cstWriter);
84 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
85 GUID_PRINTF(cstWriter->guid));
86 debug(51,10) ("CSTWriterInit: finished\n");
89 /*****************************************************************************/
91 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
92 CSTRemoteReader *cstRemoteReader;
95 debug(51,10) ("CSTWriterDelete: start\n");
97 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
98 GUID_PRINTF(cstWriter->guid));
99 //Destroy all cstRemoteReader connected on cstWriter
100 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
101 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
103 //Destroy all csChnages connected on cstWriter
104 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
105 parameterDelete(csChange);
109 cstWriter->objectEntryOID->objectEntryAID,
110 &cstWriter->refreshPeriodTimer,
113 cstWriter->objectEntryOID->objectEntryAID,
114 &cstWriter->registrationTimer,
116 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
117 pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
118 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
120 pthread_rwlock_destroy(&cstWriter->lock);
121 debug(51,10) ("CSTWriterDelete: finished\n");
124 /*****************************************************************************/
126 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
127 ObjectId oid,ObjectEntryOID *sobject) {
128 CSTRemoteReader *cstRemoteReader;
129 CSChangeForReader *csChangeForReader;
130 CSChange *csChange=NULL;
132 cstWriter->cstRemoteReaderCounter++;
133 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
134 cstRemoteReader->guid.hid=pobject->guid.hid;
135 cstRemoteReader->guid.aid=pobject->guid.aid;
136 cstRemoteReader->guid.oid=oid;
137 cstRemoteReader->sobject=sobject;
138 cstRemoteReader->pobject=pobject;
139 cstRemoteReader->cstWriter=cstWriter;
140 CSChangeForReader_init_root_field(cstRemoteReader);
141 cstRemoteReader->commStateHB=MAYSENDHB;
142 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
143 cstRemoteReader->HBRetriesCounter=0;
144 cstRemoteReader->csChangesCounter=0;
145 cstRemoteReader->commStateToSentCounter=0;
146 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
147 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
148 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
149 //insert remote reader
150 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
152 if (cstRemoteReader->sobject->multicastPort) {
153 debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
154 GUID_PRINTF(cstRemoteReader->guid),
155 GUID_PRINTF(cstRemoteReader->sobject->guid));
156 ObjectEntryMulticast_insert(cstRemoteReader->sobject,
159 //copy all csChanges (not for publication)
160 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
161 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
162 csChange->remoteReaderCount++;
163 cstRemoteReader->csChangesCounter++;
164 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
165 csChangeForReader->commStateChFReader=TOSEND;
166 cstRemoteReader->commStateToSentCounter++;
167 csChangeForReader->csChange=csChange;
168 csChangeForReader->cstRemoteReader=cstRemoteReader;
169 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
170 CSChangeParticipant_insert(csChange,csChangeForReader);
171 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
172 cstRemoteReader->commStateSend=MUSTSENDDATA;
174 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
176 cstRemoteReader->sobject->objectEntryAID,
177 &cstRemoteReader->delayResponceTimer,
179 "CSTWriterSendTimer",
181 &cstRemoteReader->cstWriter->lock,
183 &cstRemoteReader->cstWriter->params.delayResponceTime);
187 ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
188 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
189 cstWriter->strictReliableCounter++;
191 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
192 cstWriter->bestEffortsCounter++;
195 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
196 GUID_PRINTF(cstRemoteReader->guid));
197 return cstRemoteReader;
200 /*****************************************************************************/
202 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
203 CSChangeForReader *csChangeForReader;
205 if (!cstRemoteReader) return;
206 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
207 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
208 GUID_PRINTF(cstRemoteReader->guid));
209 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
211 sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
212 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
213 cstRemoteReader->cstWriter->strictReliableCounter--;
215 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
216 cstRemoteReader->cstWriter->bestEffortsCounter--;
219 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
220 CSTWriterDestroyCSChangeForReader(
221 csChangeForReader,ORTE_TRUE);
224 cstRemoteReader->sobject->objectEntryAID,
225 &cstRemoteReader->delayResponceTimer,
226 1); //metatraffic timer
228 cstRemoteReader->sobject->objectEntryAID,
229 &cstRemoteReader->delayResponceTimer,
232 cstRemoteReader->sobject->objectEntryAID,
233 &cstRemoteReader->repeatAnnounceTimer,
234 1); //metatraffic timer
236 cstRemoteReader->sobject->objectEntryAID,
237 &cstRemoteReader->repeatAnnounceTimer,
240 if (cstRemoteReader->sobject->multicastPort) {
241 ObjectEntryOID *object;
243 object=cstRemoteReader->sobject;
245 ObjectEntryMulticast_delete(object,cstRemoteReader);
246 debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
247 GUID_PRINTF(cstRemoteReader->guid),
248 GUID_PRINTF(object->guid));
250 if (ObjectEntryMulticast_is_empty(object)) {
251 objectEntryDelete(d,object,ORTE_TRUE);
254 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
255 FREE(cstRemoteReader);
258 /*****************************************************************************/
260 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
261 CSChange *csChange,*csChange1;
263 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
264 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
265 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
266 //VAR->GAP - inc gap_sn_no
267 SeqNumberInc(csChange->gapSN,csChange->gapSN);
268 parameterDelete(csChange);
269 //is Gap in prior or next position?
270 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
272 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
273 SeqNumberAdd(csChange1->gapSN,
276 CSTWriterDestroyCSChange(d,cstWriter,csChange);
280 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
282 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
283 SeqNumberAdd(csChange->gapSN,
286 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
294 /*****************************************************************************/
296 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
297 CSChangeForReader *csChangeForReader;
298 CSTRemoteReader *cstRemoteReader;
299 CSChange *csChangeFSN;
301 debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
302 GUID_PRINTF(cstWriter->guid));
303 cstWriter->csChangesCounter++;
304 //look for old cschange
305 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
306 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
307 //insert cschange into database changes
308 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
309 csChange->sn=cstWriter->lastSN;
310 SEQUENCE_NUMBER_NONE(csChange->gapSN);
311 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
312 csChange->remoteReaderBest=0;
313 csChange->remoteReaderStrict=0;
314 CSChangeParticipant_init_head(csChange);
315 CSTWriterCSChange_insert(cstWriter,csChange);
316 debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
319 csChangeFSN=CSTWriterCSChange_first(cstWriter);
320 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
321 //minimal are 2 SNs (GAP,VAR) ...
322 // CSTWriterDestroyCSChange(cstWriter,csChange);
324 csChangeFSN=CSTWriterCSChange_first(cstWriter);
325 cstWriter->firstSN=csChangeFSN->sn;
326 //insert new cschange for each reader
327 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
329 debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
330 GUID_PRINTF(cstRemoteReader->guid));
331 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
332 csChangeForReader->commStateChFReader=TOSEND;
333 cstRemoteReader->commStateToSentCounter++;
334 csChangeForReader->csChange=csChange;
335 csChangeForReader->cstRemoteReader=cstRemoteReader;
336 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
337 CSChangeParticipant_insert(csChange,csChangeForReader);
338 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
339 cstRemoteReader->csChangesCounter++;
340 cstRemoteReader->HBRetriesCounter=0;
341 cstRemoteReader->commStateSend=MUSTSENDDATA;
342 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
344 cstRemoteReader->sobject->objectEntryAID,
345 &cstRemoteReader->delayResponceTimer,
348 cstRemoteReader->sobject->objectEntryAID,
349 &cstRemoteReader->delayResponceTimer,
351 "CSTWriterSendTimer",
353 &cstRemoteReader->cstWriter->lock,
357 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
359 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
360 //Strict reliable subscription
361 csChange->remoteReaderStrict++;
363 cstRemoteReader->sobject->objectEntryAID,
364 &cstRemoteReader->delayResponceTimer,
367 cstRemoteReader->sobject->objectEntryAID,
368 &cstRemoteReader->delayResponceTimer,
370 "CSTWriterSendStrictTimer",
371 CSTWriterSendStrictTimer,
372 &cstRemoteReader->cstWriter->lock,
376 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
377 //best efforts subscription
378 NtpTime nextIssueTime,nextIssueDelay,actTime;
380 actTime=getActualNtpTime();
381 csChange->remoteReaderBest++;
382 NtpTimeAdd(nextIssueTime,
383 cstRemoteReader->lastSentIssueTime,
384 sp->minimumSeparation);
385 NtpTimeSub(nextIssueDelay,
388 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
389 NTPTIME_ZERO(nextIssueDelay);
391 cstRemoteReader->sobject->objectEntryAID,
392 &cstRemoteReader->delayResponceTimer,
394 //schedule sent issue
396 cstRemoteReader->sobject->objectEntryAID,
397 &cstRemoteReader->delayResponceTimer,
399 "CSTWriterSendBestEffortTimer",
400 CSTWriterSendBestEffortTimer,
401 &cstRemoteReader->cstWriter->lock,
405 //!Best_Effort & !Strict_Reliable
406 CSTWriterDestroyCSChangeForReader(csChangeForReader,
408 debug(51,5) ("CSTWriterAddCSChange: destroyed\n");
412 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
414 debug(51,5) ("CSTWriterAddCSChange: finished\n");
417 /*****************************************************************************/
419 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
420 Boolean destroyCSChange) {
421 CSTRemoteReader *cstRemoteReader;
424 if (!csChangeForReader) return;
425 cstRemoteReader=csChangeForReader->cstRemoteReader;
426 csChange=csChangeForReader->csChange;
427 csChange->remoteReaderCount--;
428 cstRemoteReader->csChangesCounter--;
429 if (!cstRemoteReader->csChangesCounter) {
430 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
432 if (csChangeForReader->commStateChFReader==TOSEND) {
433 cstRemoteReader->commStateToSentCounter--;
435 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
436 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
437 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
438 csChange->remoteReaderStrict--;
440 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
441 csChange->remoteReaderBest--;
445 eventDetach(cstRemoteReader->cstWriter->domain,
446 cstRemoteReader->sobject->objectEntryAID,
447 &csChangeForReader->waitWhileDataUnderwayTimer,
449 CSChangeParticipant_delete(csChange,csChangeForReader);
450 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
451 FREE(csChangeForReader);
453 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
454 if (!csChange->remoteReaderCount) {
455 if (destroyCSChange) {
456 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
457 cstRemoteReader->cstWriter,csChange);
459 pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
460 cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
461 pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
462 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
463 debug(51,5) ("Publication: new queue level (%d)\n",
464 cstRemoteReader->cstWriter->csChangesCounter);
469 /*****************************************************************************/
471 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
472 CSTRemoteReader *cstRemoteReader;
473 CSChangeForReader *csChangeForReader;
474 CSChange *csChangeFSN;
476 if (!csChange) return;
478 cstWriter->csChangesCounter--;
479 CSTWriterCSChange_delete(cstWriter,csChange);
480 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
481 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
482 CSTWriterDestroyCSChangeForReader(
483 csChangeForReader,ORTE_FALSE);
486 if (csChange->cdrCodec.buffer)
487 FREE(csChange->cdrCodec.buffer);
488 parameterDelete(csChange);
492 csChangeFSN=CSTWriterCSChange_first(cstWriter);
494 cstWriter->firstSN=csChangeFSN->sn;
496 cstWriter->firstSN=cstWriter->lastSN;
499 /*****************************************************************************/
501 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
504 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
506 if (!csChange->remoteReaderStrict) {
507 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
514 /*****************************************************************************/
516 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
517 CSChangeForReader *csChangeForReader;
518 int32_t timerQueue=1;
520 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
521 timerQueue=2; //userdata timer queue
523 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
526 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
528 if (csChangeForReader->commStateChFReader!=TOSEND) {
529 csChangeForReader->commStateChFReader=TOSEND;
530 cstRemoteReader->commStateToSentCounter++;
533 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
534 cstRemoteReader->commStateSend=MUSTSENDDATA;
536 cstRemoteReader->sobject->objectEntryAID,
537 &cstRemoteReader->delayResponceTimer,
540 cstRemoteReader->sobject->objectEntryAID,
541 &cstRemoteReader->delayResponceTimer,
543 "CSTWriterSendTimer",
545 &cstRemoteReader->cstWriter->lock,
547 &cstRemoteReader->cstWriter->params.delayResponceTime);
553 /*****************************************************************************/
555 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader)
557 CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
559 //setup new state for csChangeForReader
560 if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
561 cstRemoteReader->commStateToSentCounter--;
563 if (!cstRemoteReader->commStateToSentCounter)
564 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
566 if (NtpTimeCmp(zNtpTime,
567 cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
568 csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
570 csChangeForReader->commStateChFReader=UNDERWAY;
571 eventDetach(cstRemoteReader->cstWriter->domain,
572 cstRemoteReader->sobject->objectEntryAID,
573 &csChangeForReader->waitWhileDataUnderwayTimer,
575 eventAdd(cstRemoteReader->cstWriter->domain,
576 cstRemoteReader->sobject->objectEntryAID,
577 &csChangeForReader->waitWhileDataUnderwayTimer,
579 "CSChangeForReaderUnderwayTimer",
580 CSChangeForReaderUnderwayTimer,
581 &cstRemoteReader->cstWriter->lock,
583 &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
588 /*****************************************************************************/
590 CSTWriterMulticast(CSChangeForReader *csChangeForReader)
592 CSTRemoteReader *cstRemoteReader;
593 ObjectEntryOID *objectEntryOID;
594 CSChangeForReader *csChangeForReader1;
597 cstRemoteReader=csChangeForReader->cstRemoteReader;
598 objectEntryOID=cstRemoteReader->sobject;
600 //multicast can do an application with multicast interface
601 if (!objectEntryOID->multicastPort)
604 ul_list_for_each(CSChangeParticipant,
605 csChangeForReader->csChange,
606 csChangeForReader1) {
607 ObjectEntryOID *objectEntryOID1;
608 CSTRemoteReader *cstRemoteReader1;
610 cstRemoteReader1=csChangeForReader1->cstRemoteReader;
611 objectEntryOID1=cstRemoteReader1->sobject;
613 /* are RRs from same GROUP */
614 if (objectEntryOID!=objectEntryOID1)
617 /* is the csChange in state TOSEND ? If yes, marks like proc. */
618 CSTWriterCSChangeForReaderNewState(csChangeForReader1);
620 /* if there are no messages, detach sending timer */
621 if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
622 !(cstRemoteReader->commStateHB==MAYSENDHB))
625 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
627 eventDetach(cstRemoteReader->cstWriter->domain,
628 cstRemoteReader->sobject->objectEntryAID,
629 &cstRemoteReader->delayResponceTimer,