2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTWriter,
25 CSTPublications, CSTWriter, GUID_RTPS,
26 cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
28 CSTWriter, CSTRemoteReader, GUID_RTPS,
29 cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31 CSTRemoteReader, CSChangeForReader, SequenceNumber,
32 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
39 debug(51,10) ("CSTWriterInit: start\n");
40 //init values of cstwriter
41 cstWriter->guid.hid=object->objectEntryHID->hid;
42 cstWriter->guid.aid=object->objectEntryAID->aid;
43 cstWriter->guid.oid=oid;
44 cstWriter->objectEntryOID=object;
45 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46 cstWriter->strictReliableCounter=0;
47 cstWriter->bestEffortsCounter=0;
48 cstWriter->csChangesCounter=0;
49 cstWriter->cstRemoteReaderCounter=0;
50 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52 CSTWriterCSChange_init_head(cstWriter);
53 CSTRemoteReader_init_root_field(cstWriter);
54 pthread_rwlock_init(&cstWriter->lock,NULL);
55 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
57 cstWriter->typeRegister=typeRegister;
58 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59 pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
60 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
61 cstWriter->condValueCSChangeDestroyed=0;
63 //add event for refresh
64 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
65 CSTWriterRefreshTimer(d,(void*)cstWriter);
67 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
71 debug(51,10) ("CSTWriterInit: finished\n");
74 /*****************************************************************************/
76 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
77 CSTRemoteReader *cstRemoteReader;
80 debug(51,10) ("CSTWriterDelete: start\n");
82 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
86 //Destroy all cstRemoteReader connected on cstWriter
87 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
88 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
90 //Destroy all csChnages connected on cstWriter
91 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
92 parameterDelete(csChange);
96 cstWriter->objectEntryOID->objectEntryAID,
97 &cstWriter->refreshPeriodTimer,
99 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
100 pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
101 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
103 pthread_rwlock_destroy(&cstWriter->lock);
104 debug(51,10) ("CSTWriterDelete: finished\n");
107 /*****************************************************************************/
109 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
111 CSTRemoteReader *cstRemoteReader;
112 CSChangeForReader *csChangeForReader;
113 CSChange *csChange=NULL;
115 cstWriter->cstRemoteReaderCounter++;
116 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
117 cstRemoteReader->guid.hid=object->objectEntryHID->hid;
118 cstRemoteReader->guid.aid=object->objectEntryAID->aid;
119 cstRemoteReader->guid.oid=oid;
120 cstRemoteReader->objectEntryOID=object;
121 cstRemoteReader->cstWriter=cstWriter;
122 CSChangeForReader_init_root_field(cstRemoteReader);
123 cstRemoteReader->commStateHB=MAYSENDHB;
124 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
125 cstRemoteReader->HBRetriesCounter=0;
126 cstRemoteReader->csChangesCounter=0;
127 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
128 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
129 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
130 //insert remote reader
131 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
132 //copy all csChanges (not for publication)
133 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
134 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
135 csChange->remoteReaderCount++;
136 cstRemoteReader->csChangesCounter++;
137 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
138 csChangeForReader->commStateChFReader=TOSEND;
139 csChangeForReader->csChange=csChange;
140 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
141 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
142 cstRemoteReader->commStateSend=MUSTSENDDATA;
144 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
146 cstRemoteReader->objectEntryOID->objectEntryAID,
147 &cstRemoteReader->delayResponceTimer,
149 "CSTWriterSendTimer",
151 &cstRemoteReader->cstWriter->lock,
153 &cstRemoteReader->cstWriter->params.delayResponceTime);
157 ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
158 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
159 cstWriter->strictReliableCounter++;
161 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
162 cstWriter->bestEffortsCounter++;
165 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
166 cstRemoteReader->guid.hid,
167 cstRemoteReader->guid.aid,
168 cstRemoteReader->guid.oid);
171 /*****************************************************************************/
173 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
174 CSChangeForReader *csChangeForReader;
176 if (!cstRemoteReader) return;
177 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
178 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
179 cstRemoteReader->guid.hid,
180 cstRemoteReader->guid.aid,
181 cstRemoteReader->guid.oid);
182 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
184 sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
185 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
186 cstRemoteReader->cstWriter->strictReliableCounter--;
188 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
189 cstRemoteReader->cstWriter->bestEffortsCounter--;
192 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
193 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
194 csChangeForReader,ORTE_TRUE);
197 cstRemoteReader->objectEntryOID->objectEntryAID,
198 &cstRemoteReader->delayResponceTimer,
199 1); //metatraffic timer
201 cstRemoteReader->objectEntryOID->objectEntryAID,
202 &cstRemoteReader->delayResponceTimer,
205 cstRemoteReader->objectEntryOID->objectEntryAID,
206 &cstRemoteReader->repeatAnnounceTimer,
207 1); //metatraffic timer
209 cstRemoteReader->objectEntryOID->objectEntryAID,
210 &cstRemoteReader->repeatAnnounceTimer,
212 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
213 FREE(cstRemoteReader);
216 /*****************************************************************************/
218 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
219 CSChange *csChange,*csChange1;
221 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
222 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
223 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
224 //VAR->GAP - inc gap_sn_no
225 SeqNumberInc(csChange->gapSN,csChange->gapSN);
226 parameterDelete(csChange);
227 //is Gap in prior or next position?
228 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
230 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
231 SeqNumberAdd(csChange1->gapSN,
234 CSTWriterDestroyCSChange(d,cstWriter,csChange);
238 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
240 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
241 SeqNumberAdd(csChange->gapSN,
244 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
252 /*****************************************************************************/
254 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
255 CSChangeForReader *csChangeForReader;
256 CSTRemoteReader *cstRemoteReader;
257 CSChange *csChangeFSN;
259 debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
260 cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
261 cstWriter->csChangesCounter++;
262 //look for old cschange
263 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
264 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
265 //insert cschange into database changes
266 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
267 csChange->sn=cstWriter->lastSN;
268 SEQUENCE_NUMBER_NONE(csChange->gapSN);
269 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
270 csChange->remoteReaderBest=0;
271 csChange->remoteReaderStrict=0;
272 CSTWriterCSChange_insert(cstWriter,csChange);
273 debug(51,5) ("CSTWriterAddCSChange: sn:0x%x\n",
276 csChangeFSN=CSTWriterCSChange_first(cstWriter);
277 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
278 //minimal are 2 SNs (GAP,VAR) ...
279 // CSTWriterDestroyCSChange(cstWriter,csChange);
281 csChangeFSN=CSTWriterCSChange_first(cstWriter);
282 cstWriter->firstSN=csChangeFSN->sn;
283 //insert new cschange for each reader
284 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
286 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
287 csChangeForReader->commStateChFReader=TOSEND;
288 csChangeForReader->csChange=csChange;
289 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
290 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
291 cstRemoteReader->csChangesCounter++;
292 cstRemoteReader->HBRetriesCounter=0;
293 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
294 cstRemoteReader->commStateSend=MUSTSENDDATA;
295 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
297 cstRemoteReader->objectEntryOID->objectEntryAID,
298 &cstRemoteReader->delayResponceTimer,
301 cstRemoteReader->objectEntryOID->objectEntryAID,
302 &cstRemoteReader->delayResponceTimer,
304 "CSTWriterSendTimer",
306 &cstRemoteReader->cstWriter->lock,
310 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
312 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
313 //Strict reliable subscription
314 csChange->remoteReaderStrict++;
316 cstRemoteReader->objectEntryOID->objectEntryAID,
317 &cstRemoteReader->delayResponceTimer,
320 cstRemoteReader->objectEntryOID->objectEntryAID,
321 &cstRemoteReader->delayResponceTimer,
323 "CSTWriterSendStrictTimer",
324 CSTWriterSendStrictTimer,
325 &cstRemoteReader->cstWriter->lock,
329 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
330 //best efforts subscription
331 NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
333 csChange->remoteReaderBest++;
334 NtpTimeAdd(nextIssueTime,
335 cstRemoteReader->lastSentIssueTime,
336 sp->minimumSeparation);
337 NtpTimeSub(nextIssueDelay,
340 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
341 NTPTIME_ZERO(nextIssueDelay);
343 cstRemoteReader->objectEntryOID->objectEntryAID,
344 &cstRemoteReader->delayResponceTimer,
346 if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
347 //direct sent issue, for case zero time
348 CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
350 //schedule sent issue (future)
352 cstRemoteReader->objectEntryOID->objectEntryAID,
353 &cstRemoteReader->delayResponceTimer,
355 "CSTWriterSendBestEffortTimer",
356 CSTWriterSendBestEffortTimer,
357 &cstRemoteReader->cstWriter->lock,
362 //!Best_Effort & !Strict_Reliable
363 CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
365 debug(51,5) ("CSTWriterAddCSChange: destryed\n");
371 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
373 debug(51,5) ("CSTWriterAddCSChange: finished\n");
376 /*****************************************************************************/
378 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
379 CSChangeForReader *csChangeForReader,Boolean destroyCSChange) {
381 if (!csChangeForReader) return;
382 csChange=csChangeForReader->csChange;
383 csChange->remoteReaderCount--;
384 cstRemoteReader->csChangesCounter--;
385 if (!cstRemoteReader->csChangesCounter) {
386 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
388 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
389 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
390 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
391 csChange->remoteReaderStrict--;
393 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
394 csChange->remoteReaderBest--;
398 eventDetach(cstRemoteReader->cstWriter->domain,
399 cstRemoteReader->objectEntryOID->objectEntryAID,
400 &csChangeForReader->waitWhileDataUnderwayTimer,
402 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
403 FREE(csChangeForReader);
404 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
405 if (!csChange->remoteReaderCount) {
406 if (destroyCSChange) {
407 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
408 cstRemoteReader->cstWriter,csChange);
410 pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
411 cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
412 pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
413 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
414 debug(51,5) ("Publication: new queue level (%d)\n",
415 cstRemoteReader->cstWriter->csChangesCounter);
420 /*****************************************************************************/
422 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
423 CSTRemoteReader *cstRemoteReader;
424 CSChangeForReader *csChangeForReader;
425 CSChange *csChangeFSN;
427 if (!csChange) return;
428 cstWriter->csChangesCounter--;
429 CSTWriterCSChange_delete(cstWriter,csChange);
430 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
431 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
432 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
433 csChangeForReader,ORTE_FALSE);
435 if (csChange->cdrStream.buffer)
436 FREE(csChange->cdrStream.buffer);
437 parameterDelete(csChange);
440 csChangeFSN=CSTWriterCSChange_first(cstWriter);
442 cstWriter->firstSN=csChangeFSN->sn;
444 cstWriter->firstSN=cstWriter->lastSN;
447 /*****************************************************************************/
449 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
451 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
452 if (!csChange->remoteReaderStrict) {
453 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
460 /*****************************************************************************/
462 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
463 CSChangeForReader *csChangeForReader;
464 int32_t timerQueue=1;
466 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
467 timerQueue=2; //userdata timer queue
468 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
470 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
471 csChangeForReader->commStateChFReader=TOSEND;
472 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
473 cstRemoteReader->commStateSend=MUSTSENDDATA;
475 cstRemoteReader->objectEntryOID->objectEntryAID,
476 &cstRemoteReader->delayResponceTimer,
479 cstRemoteReader->objectEntryOID->objectEntryAID,
480 &cstRemoteReader->delayResponceTimer,
482 "CSTWriterSendTimer",
484 &cstRemoteReader->cstWriter->lock,
486 &cstRemoteReader->cstWriter->params.delayResponceTime);