2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTWriter,
25 CSTPublications, CSTWriter, GUID_RTPS,
26 cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
28 CSTWriter, CSTRemoteReader, GUID_RTPS,
29 cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31 CSTRemoteReader, CSChangeForReader, SequenceNumber,
32 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
39 debug(51,10) ("CSTWriterInit: start\n");
40 //init values of cstwriter
41 cstWriter->guid.hid=object->objectEntryHID->hid;
42 cstWriter->guid.aid=object->objectEntryAID->aid;
43 cstWriter->guid.oid=oid;
44 cstWriter->objectEntryOID=object;
45 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46 cstWriter->strictReliableCounter=0;
47 cstWriter->bestEffortsCounter=0;
48 cstWriter->csChangesCounter=0;
49 cstWriter->cstRemoteReaderCounter=0;
50 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52 CSTWriterCSChange_init_head(cstWriter);
53 CSTRemoteReader_init_root_field(cstWriter);
54 pthread_rwlock_init(&cstWriter->lock,NULL);
55 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
57 cstWriter->typeRegister=typeRegister;
58 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59 pthread_cond_init(&cstWriter->condCSChangeDestroyed,NULL);
60 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
61 cstWriter->condValueCSChangeDestroyed=0;
63 //add event for refresh
64 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
65 CSTWriterRefreshTimer(d,(void*)cstWriter);
67 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
71 debug(51,10) ("CSTWriterInit: finished\n");
74 /*****************************************************************************/
76 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
77 CSTRemoteReader *cstRemoteReader;
80 debug(51,10) ("CSTWriterDelete: start\n");
82 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
86 //Destroy all cstRemoteReader connected on cstWriter
87 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
88 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
90 //Destroy all csChnages connected on cstWriter
91 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
92 parameterDelete(csChange);
96 cstWriter->objectEntryOID->objectEntryAID,
97 &cstWriter->refreshPeriodTimer,
99 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
100 pthread_cond_destroy(&cstWriter->condCSChangeDestroyed);
101 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
103 pthread_rwlock_destroy(&cstWriter->lock);
104 debug(51,10) ("CSTWriterDelete: finished\n");
107 /*****************************************************************************/
109 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
111 CSTRemoteReader *cstRemoteReader;
112 CSChangeForReader *csChangeForReader;
113 CSChange *csChange=NULL;
115 cstWriter->cstRemoteReaderCounter++;
116 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
117 cstRemoteReader->guid.hid=object->objectEntryHID->hid;
118 cstRemoteReader->guid.aid=object->objectEntryAID->aid;
119 cstRemoteReader->guid.oid=oid;
120 cstRemoteReader->objectEntryOID=object;
121 cstRemoteReader->cstWriter=cstWriter;
122 CSChangeForReader_init_root_field(cstRemoteReader);
123 cstRemoteReader->commStateHB=MAYSENDHB;
124 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
125 cstRemoteReader->HBRetriesCounter=0;
126 cstRemoteReader->csChangesCounter=0;
127 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
128 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
129 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
130 //insert remote reader
131 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
132 //copy all csChanges (not for publication)
133 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
134 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
135 csChange->remoteReaderCount++;
136 cstRemoteReader->csChangesCounter++;
137 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
138 csChangeForReader->commStateChFReader=TOSEND;
139 csChangeForReader->csChange=csChange;
140 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
141 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
142 cstRemoteReader->commStateSend=MUSTSENDDATA;
144 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
146 cstRemoteReader->objectEntryOID->objectEntryAID,
147 &cstRemoteReader->delayResponceTimer,
149 "CSTWriterSendTimer",
151 &cstRemoteReader->cstWriter->lock,
153 &cstRemoteReader->cstWriter->params.delayResponceTime);
157 ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
158 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
159 cstWriter->strictReliableCounter++;
161 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
162 cstWriter->bestEffortsCounter++;
165 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
166 cstRemoteReader->guid.hid,
167 cstRemoteReader->guid.aid,
168 cstRemoteReader->guid.oid);
171 /*****************************************************************************/
173 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
174 CSChangeForReader *csChangeForReader;
176 if (!cstRemoteReader) return;
177 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
178 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
179 cstRemoteReader->guid.hid,
180 cstRemoteReader->guid.aid,
181 cstRemoteReader->guid.oid);
182 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
184 sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
185 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
186 cstRemoteReader->cstWriter->strictReliableCounter--;
188 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
189 cstRemoteReader->cstWriter->bestEffortsCounter--;
192 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
193 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
194 csChangeForReader,ORTE_TRUE);
197 cstRemoteReader->objectEntryOID->objectEntryAID,
198 &cstRemoteReader->delayResponceTimer,
199 1); //metatraffic timer
201 cstRemoteReader->objectEntryOID->objectEntryAID,
202 &cstRemoteReader->delayResponceTimer,
205 cstRemoteReader->objectEntryOID->objectEntryAID,
206 &cstRemoteReader->repeatAnnounceTimer,
207 1); //metatraffic timer
209 cstRemoteReader->objectEntryOID->objectEntryAID,
210 &cstRemoteReader->repeatAnnounceTimer,
212 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
213 FREE(cstRemoteReader);
216 /*****************************************************************************/
218 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
219 CSChange *csChange,*csChange1;
221 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
222 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
223 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
224 //VAR->GAP - inc gap_sn_no
225 SeqNumberInc(csChange->gapSN,csChange->gapSN);
226 parameterDelete(csChange);
227 //is Gap in prior or next position?
228 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
230 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
231 SeqNumberAdd(csChange1->gapSN,
234 CSTWriterDestroyCSChange(d,cstWriter,csChange);
238 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
240 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
241 SeqNumberAdd(csChange->gapSN,
244 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
252 /*****************************************************************************/
254 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
255 CSChangeForReader *csChangeForReader;
256 CSTRemoteReader *cstRemoteReader;
257 CSChange *csChangeFSN;
259 debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
260 cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
261 cstWriter->csChangesCounter++;
262 //look for old cschange
263 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
264 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
265 //insert cschange into database changes
266 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
267 csChange->sn=cstWriter->lastSN;
268 SEQUENCE_NUMBER_NONE(csChange->gapSN);
269 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
270 csChange->remoteReaderBest=0;
271 csChange->remoteReaderStrict=0;
272 CSTWriterCSChange_insert(cstWriter,csChange);
274 csChangeFSN=CSTWriterCSChange_first(cstWriter);
275 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
276 //minimal are 2 SNs (GAP,VAR) ...
277 // CSTWriterDestroyCSChange(cstWriter,csChange);
279 csChangeFSN=CSTWriterCSChange_first(cstWriter);
280 cstWriter->firstSN=csChangeFSN->sn;
281 //insert new cschange for each reader
282 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
284 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
285 csChangeForReader->commStateChFReader=TOSEND;
286 csChangeForReader->csChange=csChange;
287 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
288 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
289 cstRemoteReader->csChangesCounter++;
290 cstRemoteReader->HBRetriesCounter=0;
291 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
292 cstRemoteReader->commStateSend=MUSTSENDDATA;
293 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
295 cstRemoteReader->objectEntryOID->objectEntryAID,
296 &cstRemoteReader->delayResponceTimer,
299 cstRemoteReader->objectEntryOID->objectEntryAID,
300 &cstRemoteReader->delayResponceTimer,
302 "CSTWriterSendTimer",
304 &cstRemoteReader->cstWriter->lock,
308 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
310 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
311 //Strict reliable subscription
312 csChange->remoteReaderStrict++;
314 cstRemoteReader->objectEntryOID->objectEntryAID,
315 &cstRemoteReader->delayResponceTimer,
318 cstRemoteReader->objectEntryOID->objectEntryAID,
319 &cstRemoteReader->delayResponceTimer,
321 "CSTWriterSendStrictTimer",
322 CSTWriterSendStrictTimer,
323 &cstRemoteReader->cstWriter->lock,
327 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
328 //best efforts subscription
329 NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
331 csChange->remoteReaderBest++;
332 NtpTimeAdd(nextIssueTime,
333 cstRemoteReader->lastSentIssueTime,
334 sp->minimumSeparation);
335 NtpTimeSub(nextIssueDelay,
338 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
339 NTPTIME_ZERO(nextIssueDelay);
341 cstRemoteReader->objectEntryOID->objectEntryAID,
342 &cstRemoteReader->delayResponceTimer,
344 if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
345 //direct sent issue, for case zero time
346 CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
348 //schedule sent issue (future)
350 cstRemoteReader->objectEntryOID->objectEntryAID,
351 &cstRemoteReader->delayResponceTimer,
353 "CSTWriterSendBestEffortTimer",
354 CSTWriterSendBestEffortTimer,
355 &cstRemoteReader->cstWriter->lock,
360 //!Best_Effort & !Strict_Reliable
361 CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
363 debug(51,5) ("CSTWriterAddCSChange: destryed\n");
369 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
371 debug(51,5) ("CSTWriterAddCSChange: finished\n");
374 /*****************************************************************************/
376 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
377 CSChangeForReader *csChangeForReader,Boolean destroyCSChange) {
379 if (!csChangeForReader) return;
380 csChange=csChangeForReader->csChange;
381 csChange->remoteReaderCount--;
382 cstRemoteReader->csChangesCounter--;
383 if (!cstRemoteReader->csChangesCounter) {
384 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
386 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
387 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
388 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
389 csChange->remoteReaderStrict--;
391 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
392 csChange->remoteReaderBest--;
396 eventDetach(cstRemoteReader->cstWriter->domain,
397 cstRemoteReader->objectEntryOID->objectEntryAID,
398 &csChangeForReader->waitWhileDataUnderwayTimer,
400 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
401 FREE(csChangeForReader);
402 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
403 if (!csChange->remoteReaderCount) {
404 if (destroyCSChange) {
405 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
406 cstRemoteReader->cstWriter,csChange);
408 pthread_mutex_lock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
409 cstRemoteReader->cstWriter->condValueCSChangeDestroyed=1;
410 pthread_cond_signal(&cstRemoteReader->cstWriter->condCSChangeDestroyed);
411 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
412 debug(51,5) ("Publication: new queue level (%d)\n",
413 cstRemoteReader->cstWriter->csChangesCounter);
418 /*****************************************************************************/
420 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
421 CSTRemoteReader *cstRemoteReader;
422 CSChangeForReader *csChangeForReader;
423 CSChange *csChangeFSN;
425 if (!csChange) return;
426 cstWriter->csChangesCounter--;
427 CSTWriterCSChange_delete(cstWriter,csChange);
428 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
429 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
430 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
431 csChangeForReader,ORTE_FALSE);
433 if (csChange->cdrStream.buffer)
434 FREE(csChange->cdrStream.buffer);
435 parameterDelete(csChange);
438 csChangeFSN=CSTWriterCSChange_first(cstWriter);
440 cstWriter->firstSN=csChangeFSN->sn;
442 cstWriter->firstSN=cstWriter->lastSN;
445 /*****************************************************************************/
447 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
449 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
450 if (!csChange->remoteReaderStrict) {
451 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
458 /*****************************************************************************/
460 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
461 CSChangeForReader *csChangeForReader;
462 int32_t timerQueue=1;
464 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
465 timerQueue=2; //userdata timer queue
466 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
468 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
469 csChangeForReader->commStateChFReader=TOSEND;
470 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
471 cstRemoteReader->commStateSend=MUSTSENDDATA;
473 cstRemoteReader->objectEntryOID->objectEntryAID,
474 &cstRemoteReader->delayResponceTimer,
477 cstRemoteReader->objectEntryOID->objectEntryAID,
478 &cstRemoteReader->delayResponceTimer,
480 "CSTWriterSendTimer",
482 &cstRemoteReader->cstWriter->lock,
484 &cstRemoteReader->cstWriter->params.delayResponceTime);