2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTWriter,
25 CSTPublications, CSTWriter, GUID_RTPS,
26 cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
28 CSTWriter, CSTRemoteReader, GUID_RTPS,
29 cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31 CSTRemoteReader, CSChangeForReader, SequenceNumber,
32 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
39 debug(51,10) ("CSTWriterInit: start\n");
40 //init values of cstwriter
41 cstWriter->guid.hid=object->objectEntryHID->hid;
42 cstWriter->guid.aid=object->objectEntryAID->aid;
43 cstWriter->guid.oid=oid;
44 cstWriter->objectEntryOID=object;
45 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46 cstWriter->registrationCounter=0;
47 ul_htim_queue_init_detached(&cstWriter->registrationTimer.htim);
48 cstWriter->strictReliableCounter=0;
49 cstWriter->bestEffortsCounter=0;
50 cstWriter->csChangesCounter=0;
51 cstWriter->cstRemoteReaderCounter=0;
52 cstWriter->registrationCounter=cstWriter->params.registrationRetries;
53 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
54 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
55 CSTWriterCSChange_init_head(cstWriter);
56 CSTRemoteReader_init_root_field(cstWriter);
57 pthread_rwlock_init(&cstWriter->lock,NULL);
58 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
60 cstWriter->typeRegister=typeRegister;
61 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
62 pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
63 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
64 cstWriter->condValueCSChangeDestroyed=0;
66 //add event for refresh
67 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
68 CSTWriterRefreshTimer(d,(void*)cstWriter);
70 //add event for registration
71 if (NtpTimeCmp(cstWriter->params.registrationPeriod,zNtpTime)!=0) {
72 CSTWriterRegistrationTimer(d,(void*)cstWriter);
74 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
75 GUID_PRINTF(cstWriter->guid));
76 debug(51,10) ("CSTWriterInit: finished\n");
79 /*****************************************************************************/
81 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
82 CSTRemoteReader *cstRemoteReader;
85 debug(51,10) ("CSTWriterDelete: start\n");
87 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
88 GUID_PRINTF(cstWriter->guid));
89 //Destroy all cstRemoteReader connected on cstWriter
90 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
91 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
93 //Destroy all csChnages connected on cstWriter
94 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
95 parameterDelete(csChange);
99 cstWriter->objectEntryOID->objectEntryAID,
100 &cstWriter->refreshPeriodTimer,
103 cstWriter->objectEntryOID->objectEntryAID,
104 &cstWriter->registrationTimer,
106 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
107 pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
108 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
110 pthread_rwlock_destroy(&cstWriter->lock);
111 debug(51,10) ("CSTWriterDelete: finished\n");
114 /*****************************************************************************/
116 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *pobject,
117 ObjectId oid,ObjectEntryOID *sobject) {
118 CSTRemoteReader *cstRemoteReader;
119 CSChangeForReader *csChangeForReader;
120 CSChange *csChange=NULL;
122 cstWriter->cstRemoteReaderCounter++;
123 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
124 cstRemoteReader->guid.hid=pobject->guid.hid;
125 cstRemoteReader->guid.aid=pobject->guid.aid;
126 cstRemoteReader->guid.oid=oid;
127 cstRemoteReader->sobject=sobject;
128 cstRemoteReader->pobject=pobject;
129 cstRemoteReader->cstWriter=cstWriter;
130 CSChangeForReader_init_root_field(cstRemoteReader);
131 cstRemoteReader->commStateHB=MAYSENDHB;
132 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
133 cstRemoteReader->HBRetriesCounter=0;
134 cstRemoteReader->csChangesCounter=0;
135 cstRemoteReader->commStateToSentCounter=0;
136 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
137 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
138 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
139 //insert remote reader
140 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
142 if (cstRemoteReader->sobject->multicastPort) {
143 debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x added to multicast list on object 0x%x-0x%x-0x%x\n",
144 GUID_PRINTF(cstRemoteReader->guid),
145 GUID_PRINTF(cstRemoteReader->sobject->guid));
146 ObjectEntryMulticast_insert(cstRemoteReader->sobject,
149 //copy all csChanges (not for publication)
150 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
151 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
152 csChange->remoteReaderCount++;
153 cstRemoteReader->csChangesCounter++;
154 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
155 csChangeForReader->commStateChFReader=TOSEND;
156 cstRemoteReader->commStateToSentCounter++;
157 csChangeForReader->csChange=csChange;
158 csChangeForReader->cstRemoteReader=cstRemoteReader;
159 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
160 CSChangeParticipant_insert(csChange,csChangeForReader);
161 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
162 cstRemoteReader->commStateSend=MUSTSENDDATA;
164 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
166 cstRemoteReader->sobject->objectEntryAID,
167 &cstRemoteReader->delayResponceTimer,
169 "CSTWriterSendTimer",
171 &cstRemoteReader->cstWriter->lock,
173 &cstRemoteReader->cstWriter->params.delayResponceTime);
177 ORTESubsProp *sp=(ORTESubsProp*)pobject->attributes;
178 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
179 cstWriter->strictReliableCounter++;
181 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
182 cstWriter->bestEffortsCounter++;
185 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
186 GUID_PRINTF(cstRemoteReader->guid));
187 return cstRemoteReader;
190 /*****************************************************************************/
192 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
193 CSChangeForReader *csChangeForReader;
195 if (!cstRemoteReader) return;
196 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
197 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
198 GUID_PRINTF(cstRemoteReader->guid));
199 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
201 sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
202 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
203 cstRemoteReader->cstWriter->strictReliableCounter--;
205 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
206 cstRemoteReader->cstWriter->bestEffortsCounter--;
209 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
210 CSTWriterDestroyCSChangeForReader(
211 csChangeForReader,ORTE_TRUE);
214 cstRemoteReader->sobject->objectEntryAID,
215 &cstRemoteReader->delayResponceTimer,
216 1); //metatraffic timer
218 cstRemoteReader->sobject->objectEntryAID,
219 &cstRemoteReader->delayResponceTimer,
222 cstRemoteReader->sobject->objectEntryAID,
223 &cstRemoteReader->repeatAnnounceTimer,
224 1); //metatraffic timer
226 cstRemoteReader->sobject->objectEntryAID,
227 &cstRemoteReader->repeatAnnounceTimer,
230 if (cstRemoteReader->sobject->multicastPort) {
231 ObjectEntryOID *object;
233 object=cstRemoteReader->sobject;
235 ObjectEntryMulticast_delete(object,cstRemoteReader);
236 debug(51,9) ("cstRemoteReader 0x%x-0x%x-0x%x deleted from multicast list on object 0x%x-0x%x-0x%x\n",
237 GUID_PRINTF(cstRemoteReader->guid),
238 GUID_PRINTF(object->guid));
240 if (ObjectEntryMulticast_is_empty(object)) {
241 objectEntryDelete(d,object,ORTE_TRUE);
244 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
245 FREE(cstRemoteReader);
248 /*****************************************************************************/
250 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
251 CSChange *csChange,*csChange1;
253 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
254 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
255 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
256 //VAR->GAP - inc gap_sn_no
257 SeqNumberInc(csChange->gapSN,csChange->gapSN);
258 parameterDelete(csChange);
259 //is Gap in prior or next position?
260 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
262 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
263 SeqNumberAdd(csChange1->gapSN,
266 CSTWriterDestroyCSChange(d,cstWriter,csChange);
270 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
272 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
273 SeqNumberAdd(csChange->gapSN,
276 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
284 /*****************************************************************************/
286 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
287 CSChangeForReader *csChangeForReader;
288 CSTRemoteReader *cstRemoteReader;
289 CSChange *csChangeFSN;
291 debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
292 GUID_PRINTF(cstWriter->guid));
293 cstWriter->csChangesCounter++;
294 //look for old cschange
295 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
296 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
297 //insert cschange into database changes
298 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
299 csChange->sn=cstWriter->lastSN;
300 SEQUENCE_NUMBER_NONE(csChange->gapSN);
301 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
302 csChange->remoteReaderBest=0;
303 csChange->remoteReaderStrict=0;
304 CSChangeParticipant_init_head(csChange);
305 CSTWriterCSChange_insert(cstWriter,csChange);
306 debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
309 csChangeFSN=CSTWriterCSChange_first(cstWriter);
310 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
311 //minimal are 2 SNs (GAP,VAR) ...
312 // CSTWriterDestroyCSChange(cstWriter,csChange);
314 csChangeFSN=CSTWriterCSChange_first(cstWriter);
315 cstWriter->firstSN=csChangeFSN->sn;
316 //insert new cschange for each reader
317 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
319 debug(51,10) ("CSTWriterAddCSChange: sending to cstRemoteReader 0x%x-0x%x-0x%x\n",
320 GUID_PRINTF(cstRemoteReader->guid));
321 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
322 csChangeForReader->commStateChFReader=TOSEND;
323 cstRemoteReader->commStateToSentCounter++;
324 csChangeForReader->csChange=csChange;
325 csChangeForReader->cstRemoteReader=cstRemoteReader;
326 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
327 CSChangeParticipant_insert(csChange,csChangeForReader);
328 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
329 cstRemoteReader->csChangesCounter++;
330 cstRemoteReader->HBRetriesCounter=0;
331 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
332 cstRemoteReader->commStateSend=MUSTSENDDATA;
333 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
335 cstRemoteReader->sobject->objectEntryAID,
336 &cstRemoteReader->delayResponceTimer,
339 cstRemoteReader->sobject->objectEntryAID,
340 &cstRemoteReader->delayResponceTimer,
342 "CSTWriterSendTimer",
344 &cstRemoteReader->cstWriter->lock,
348 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
350 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
351 //Strict reliable subscription
352 csChange->remoteReaderStrict++;
354 cstRemoteReader->sobject->objectEntryAID,
355 &cstRemoteReader->delayResponceTimer,
358 cstRemoteReader->sobject->objectEntryAID,
359 &cstRemoteReader->delayResponceTimer,
361 "CSTWriterSendStrictTimer",
362 CSTWriterSendStrictTimer,
363 &cstRemoteReader->cstWriter->lock,
367 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
368 //best efforts subscription
369 NtpTime nextIssueTime,nextIssueDelay,actTime;
371 actTime=getActualNtpTime();
372 csChange->remoteReaderBest++;
373 NtpTimeAdd(nextIssueTime,
374 cstRemoteReader->lastSentIssueTime,
375 sp->minimumSeparation);
376 NtpTimeSub(nextIssueDelay,
379 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
380 NTPTIME_ZERO(nextIssueDelay);
382 cstRemoteReader->sobject->objectEntryAID,
383 &cstRemoteReader->delayResponceTimer,
385 //schedule sent issue
387 cstRemoteReader->sobject->objectEntryAID,
388 &cstRemoteReader->delayResponceTimer,
390 "CSTWriterSendBestEffortTimer",
391 CSTWriterSendBestEffortTimer,
392 &cstRemoteReader->cstWriter->lock,
396 //!Best_Effort & !Strict_Reliable
397 CSTWriterDestroyCSChangeForReader(csChangeForReader,
399 debug(51,5) ("CSTWriterAddCSChange: destroyed\n");
404 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
406 debug(51,5) ("CSTWriterAddCSChange: finished\n");
409 /*****************************************************************************/
411 CSTWriterDestroyCSChangeForReader(CSChangeForReader *csChangeForReader,
412 Boolean destroyCSChange) {
413 CSTRemoteReader *cstRemoteReader;
416 if (!csChangeForReader) return;
417 cstRemoteReader=csChangeForReader->cstRemoteReader;
418 csChange=csChangeForReader->csChange;
419 csChange->remoteReaderCount--;
420 cstRemoteReader->csChangesCounter--;
421 if (!cstRemoteReader->csChangesCounter) {
422 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
424 if (csChangeForReader->commStateChFReader==TOSEND) {
425 cstRemoteReader->commStateToSentCounter--;
427 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
428 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->pobject->attributes;
429 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
430 csChange->remoteReaderStrict--;
432 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
433 csChange->remoteReaderBest--;
437 eventDetach(cstRemoteReader->cstWriter->domain,
438 cstRemoteReader->sobject->objectEntryAID,
439 &csChangeForReader->waitWhileDataUnderwayTimer,
441 CSChangeParticipant_delete(csChange,csChangeForReader);
442 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
443 FREE(csChangeForReader);
445 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
446 if (!csChange->remoteReaderCount) {
447 if (destroyCSChange) {
448 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
449 cstRemoteReader->cstWriter,csChange);
451 pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
452 cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
453 pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
454 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
455 debug(51,5) ("Publication: new queue level (%d)\n",
456 cstRemoteReader->cstWriter->csChangesCounter);
461 /*****************************************************************************/
463 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
464 CSTRemoteReader *cstRemoteReader;
465 CSChangeForReader *csChangeForReader;
466 CSChange *csChangeFSN;
468 if (!csChange) return;
470 cstWriter->csChangesCounter--;
471 CSTWriterCSChange_delete(cstWriter,csChange);
472 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
473 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
474 CSTWriterDestroyCSChangeForReader(
475 csChangeForReader,ORTE_FALSE);
478 if (csChange->cdrCodec.buffer)
479 FREE(csChange->cdrCodec.buffer);
480 parameterDelete(csChange);
484 csChangeFSN=CSTWriterCSChange_first(cstWriter);
486 cstWriter->firstSN=csChangeFSN->sn;
488 cstWriter->firstSN=cstWriter->lastSN;
491 /*****************************************************************************/
493 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
496 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
498 if (!csChange->remoteReaderStrict) {
499 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
506 /*****************************************************************************/
508 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
509 CSChangeForReader *csChangeForReader;
510 int32_t timerQueue=1;
512 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
513 timerQueue=2; //userdata timer queue
515 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
518 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
520 if (csChangeForReader->commStateChFReader!=TOSEND) {
521 csChangeForReader->commStateChFReader=TOSEND;
522 cstRemoteReader->commStateToSentCounter++;
525 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
526 cstRemoteReader->commStateSend=MUSTSENDDATA;
528 cstRemoteReader->sobject->objectEntryAID,
529 &cstRemoteReader->delayResponceTimer,
532 cstRemoteReader->sobject->objectEntryAID,
533 &cstRemoteReader->delayResponceTimer,
535 "CSTWriterSendTimer",
537 &cstRemoteReader->cstWriter->lock,
539 &cstRemoteReader->cstWriter->params.delayResponceTime);
545 /*****************************************************************************/
547 CSTWriterCSChangeForReaderNewState(CSChangeForReader *csChangeForReader)
549 CSTRemoteReader *cstRemoteReader=csChangeForReader->cstRemoteReader;
551 //setup new state for csChangeForReader
552 if (csChangeForReader->commStateChFReader!=TOSEND) return -1;
553 cstRemoteReader->commStateToSentCounter--;
555 if (!cstRemoteReader->commStateToSentCounter)
556 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
558 if (NtpTimeCmp(zNtpTime,
559 cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime)==0) {
560 csChangeForReader->commStateChFReader=UNACKNOWLEDGED;
562 csChangeForReader->commStateChFReader=UNDERWAY;
563 eventDetach(cstRemoteReader->cstWriter->domain,
564 cstRemoteReader->sobject->objectEntryAID,
565 &csChangeForReader->waitWhileDataUnderwayTimer,
567 eventAdd(cstRemoteReader->cstWriter->domain,
568 cstRemoteReader->sobject->objectEntryAID,
569 &csChangeForReader->waitWhileDataUnderwayTimer,
571 "CSChangeForReaderUnderwayTimer",
572 CSChangeForReaderUnderwayTimer,
573 &cstRemoteReader->cstWriter->lock,
575 &cstRemoteReader->cstWriter->params.waitWhileDataUnderwayTime);
580 /*****************************************************************************/
582 CSTWriterMulticast(CSChangeForReader *csChangeForReader)
584 CSTRemoteReader *cstRemoteReader;
585 ObjectEntryOID *objectEntryOID;
586 CSChangeForReader *csChangeForReader1;
589 cstRemoteReader=csChangeForReader->cstRemoteReader;
590 objectEntryOID=cstRemoteReader->sobject;
592 //multicast can do an application with multicast interface
593 if (!objectEntryOID->multicastPort)
596 ul_list_for_each(CSChangeParticipant,
597 csChangeForReader->csChange,
598 csChangeForReader1) {
599 ObjectEntryOID *objectEntryOID1;
600 CSTRemoteReader *cstRemoteReader1;
602 cstRemoteReader1=csChangeForReader1->cstRemoteReader;
603 objectEntryOID1=cstRemoteReader1->sobject;
605 /* are RRs from same GROUP */
606 if (objectEntryOID!=objectEntryOID1)
609 /* is the csChange in state TOSEND ? If yes, marks like proc. */
610 CSTWriterCSChangeForReaderNewState(csChangeForReader1);
612 /* if there are no messages, detach sending timer */
613 if (!(cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
614 !(cstRemoteReader->commStateHB==MAYSENDHB))
617 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
619 eventDetach(cstRemoteReader->cstWriter->domain,
620 cstRemoteReader->sobject->objectEntryAID,
621 &cstRemoteReader->delayResponceTimer,