2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTWriter,
25 CSTPublications, CSTWriter, GUID_RTPS,
26 cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
28 CSTWriter, CSTRemoteReader, GUID_RTPS,
29 cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31 CSTRemoteReader, CSChangeForReader, SequenceNumber,
32 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
39 debug(51,10) ("CSTWriterInit: start\n");
40 //init values of cstwriter
41 cstWriter->guid.hid=object->objectEntryHID->hid;
42 cstWriter->guid.aid=object->objectEntryAID->aid;
43 cstWriter->guid.oid=oid;
44 cstWriter->objectEntryOID=object;
45 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46 cstWriter->strictReliableCounter=0;
47 cstWriter->bestEffortsCounter=0;
48 cstWriter->csChangesCounter=0;
49 cstWriter->cstRemoteReaderCounter=0;
50 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52 CSTWriterCSChange_init_head(cstWriter);
53 CSTRemoteReader_init_root_field(cstWriter);
54 pthread_rwlock_init(&cstWriter->lock,NULL);
55 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
57 cstWriter->typeRegister=typeRegister;
58 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59 sem_init(&cstWriter->semCSChangeDestroyed, 0, 0);
61 //add event for refresh
62 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
63 CSTWriterRefreshTimer(d,(void*)cstWriter);
65 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
69 debug(51,10) ("CSTWriterInit: finished\n");
72 /*****************************************************************************/
74 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
75 CSTRemoteReader *cstRemoteReader;
78 debug(51,10) ("CSTWriterDelete: start\n");
80 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
84 //Destroy all cstRemoteReader connected on cstWriter
85 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
86 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
88 //Destroy all csChnages connected on cstWriter
89 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
90 parameterDelete(csChange);
94 cstWriter->objectEntryOID->objectEntryAID,
95 &cstWriter->refreshPeriodTimer,
97 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
98 sem_destroy(&cstWriter->semCSChangeDestroyed);
100 pthread_rwlock_destroy(&cstWriter->lock);
101 debug(51,10) ("CSTWriterDelete: finished\n");
104 /*****************************************************************************/
106 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
108 CSTRemoteReader *cstRemoteReader;
109 CSChangeForReader *csChangeForReader;
110 CSChange *csChange=NULL;
112 cstWriter->cstRemoteReaderCounter++;
113 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
114 cstRemoteReader->guid.hid=object->objectEntryHID->hid;
115 cstRemoteReader->guid.aid=object->objectEntryAID->aid;
116 cstRemoteReader->guid.oid=oid;
117 cstRemoteReader->objectEntryOID=object;
118 cstRemoteReader->cstWriter=cstWriter;
119 CSChangeForReader_init_root_field(cstRemoteReader);
120 cstRemoteReader->commStateHB=MAYSENDHB;
121 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
122 cstRemoteReader->HBRetriesCounter=0;
123 cstRemoteReader->csChangesCounter=0;
124 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
125 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
126 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
127 //insert remote reader
128 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
129 //copy all csChanges (not for publication)
130 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
131 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
132 csChange->remoteReaderCount++;
133 cstRemoteReader->csChangesCounter++;
134 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
135 csChangeForReader->commStateChFReader=TOSEND;
136 csChangeForReader->csChange=csChange;
137 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
138 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
139 cstRemoteReader->commStateSend=MUSTSENDDATA;
141 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
143 cstRemoteReader->objectEntryOID->objectEntryAID,
144 &cstRemoteReader->delayResponceTimer,
146 "CSTWriterSendTimer",
148 &cstRemoteReader->cstWriter->lock,
150 &cstRemoteReader->cstWriter->params.delayResponceTime);
154 ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
155 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
156 cstWriter->strictReliableCounter++;
158 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
159 cstWriter->bestEffortsCounter++;
162 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
163 cstRemoteReader->guid.hid,
164 cstRemoteReader->guid.aid,
165 cstRemoteReader->guid.oid);
168 /*****************************************************************************/
170 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
171 CSChangeForReader *csChangeForReader;
173 if (!cstRemoteReader) return;
174 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
175 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
176 cstRemoteReader->guid.hid,
177 cstRemoteReader->guid.aid,
178 cstRemoteReader->guid.oid);
179 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
181 sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
182 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
183 cstRemoteReader->cstWriter->strictReliableCounter--;
185 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
186 cstRemoteReader->cstWriter->bestEffortsCounter--;
189 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
190 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
191 csChangeForReader,ORTE_TRUE);
194 cstRemoteReader->objectEntryOID->objectEntryAID,
195 &cstRemoteReader->delayResponceTimer,
196 1); //metatraffic timer
198 cstRemoteReader->objectEntryOID->objectEntryAID,
199 &cstRemoteReader->delayResponceTimer,
202 cstRemoteReader->objectEntryOID->objectEntryAID,
203 &cstRemoteReader->repeatAnnounceTimer,
204 1); //metatraffic timer
206 cstRemoteReader->objectEntryOID->objectEntryAID,
207 &cstRemoteReader->repeatAnnounceTimer,
209 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
210 FREE(cstRemoteReader);
213 /*****************************************************************************/
215 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
216 CSChange *csChange,*csChange1;
218 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
219 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
220 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
221 //VAR->GAP - inc gap_sn_no
222 SeqNumberInc(csChange->gapSN,csChange->gapSN);
223 parameterDelete(csChange);
224 //is Gap in prior or next position?
225 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
227 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
228 SeqNumberAdd(csChange1->gapSN,
231 CSTWriterDestroyCSChange(d,cstWriter,csChange);
235 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
237 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
238 SeqNumberAdd(csChange->gapSN,
241 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
249 /*****************************************************************************/
251 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
252 CSChangeForReader *csChangeForReader;
253 CSTRemoteReader *cstRemoteReader;
254 CSChange *csChangeFSN;
256 debug(51,5) ("CSTWriterAddCSChange: cstWriter:0x%x-0x%x-0x%x\n",
257 cstWriter->guid.hid,cstWriter->guid.aid,cstWriter->guid.oid);
258 cstWriter->csChangesCounter++;
259 //look for old cschange
260 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
261 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
262 //insert cschange into database changes
263 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
264 csChange->sn=cstWriter->lastSN;
265 SEQUENCE_NUMBER_NONE(csChange->gapSN);
266 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
267 csChange->remoteReaderBest=0;
268 csChange->remoteReaderStrict=0;
269 CSTWriterCSChange_insert(cstWriter,csChange);
271 csChangeFSN=CSTWriterCSChange_first(cstWriter);
272 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
273 //minimal are 2 SNs (GAP,VAR) ...
274 // CSTWriterDestroyCSChange(cstWriter,csChange);
276 csChangeFSN=CSTWriterCSChange_first(cstWriter);
277 cstWriter->firstSN=csChangeFSN->sn;
278 //insert new cschange for each reader
279 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
281 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
282 csChangeForReader->commStateChFReader=TOSEND;
283 csChangeForReader->csChange=csChange;
284 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
285 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
286 cstRemoteReader->csChangesCounter++;
287 cstRemoteReader->HBRetriesCounter=0;
288 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
289 cstRemoteReader->commStateSend=MUSTSENDDATA;
290 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
292 cstRemoteReader->objectEntryOID->objectEntryAID,
293 &cstRemoteReader->delayResponceTimer,
296 cstRemoteReader->objectEntryOID->objectEntryAID,
297 &cstRemoteReader->delayResponceTimer,
299 "CSTWriterSendTimer",
301 &cstRemoteReader->cstWriter->lock,
305 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
307 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
308 //Strict reliable subscription
309 csChange->remoteReaderStrict++;
311 cstRemoteReader->objectEntryOID->objectEntryAID,
312 &cstRemoteReader->delayResponceTimer,
315 cstRemoteReader->objectEntryOID->objectEntryAID,
316 &cstRemoteReader->delayResponceTimer,
318 "CSTWriterSendStrictTimer",
319 CSTWriterSendStrictTimer,
320 &cstRemoteReader->cstWriter->lock,
324 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
325 //best efforts subscription
326 NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
328 csChange->remoteReaderBest++;
329 NtpTimeAdd(nextIssueTime,
330 cstRemoteReader->lastSentIssueTime,
331 sp->minimumSeparation);
332 NtpTimeSub(nextIssueDelay,
335 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
336 NTPTIME_ZERO(nextIssueDelay);
338 cstRemoteReader->objectEntryOID->objectEntryAID,
339 &cstRemoteReader->delayResponceTimer,
341 if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
342 //direct sent issue, for case zero time
343 CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
345 //schedule sent issue (future)
347 cstRemoteReader->objectEntryOID->objectEntryAID,
348 &cstRemoteReader->delayResponceTimer,
350 "CSTWriterSendBestEffortTimer",
351 CSTWriterSendBestEffortTimer,
352 &cstRemoteReader->cstWriter->lock,
357 //!Best_Effort & !Strict_Reliable
358 CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
360 debug(51,5) ("CSTWriterAddCSChange: destryed\n");
366 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
368 debug(51,5) ("CSTWriterAddCSChange: finished\n");
371 /*****************************************************************************/
373 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
374 CSChangeForReader *csChangeForReader,Boolean destroyCSChange) {
376 if (!csChangeForReader) return;
377 csChange=csChangeForReader->csChange;
378 csChange->remoteReaderCount--;
379 cstRemoteReader->csChangesCounter--;
380 if (!cstRemoteReader->csChangesCounter) {
381 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
383 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
384 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
385 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
386 csChange->remoteReaderStrict--;
388 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
389 csChange->remoteReaderBest--;
393 eventDetach(cstRemoteReader->cstWriter->domain,
394 cstRemoteReader->objectEntryOID->objectEntryAID,
395 &csChangeForReader->waitWhileDataUnderwayTimer,
397 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
398 FREE(csChangeForReader);
399 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
400 if (!csChange->remoteReaderCount) {
401 if (destroyCSChange) {
402 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
403 cstRemoteReader->cstWriter,csChange);
405 sem_post(&cstRemoteReader->cstWriter->semCSChangeDestroyed);
406 debug(51,5) ("Publication: new queue level (%d)\n",
407 cstRemoteReader->cstWriter->csChangesCounter);
412 /*****************************************************************************/
414 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
415 CSTRemoteReader *cstRemoteReader;
416 CSChangeForReader *csChangeForReader;
417 CSChange *csChangeFSN;
419 if (!csChange) return;
420 cstWriter->csChangesCounter--;
421 CSTWriterCSChange_delete(cstWriter,csChange);
422 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
423 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
424 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
425 csChangeForReader,ORTE_FALSE);
427 if (csChange->cdrStream.buffer)
428 FREE(csChange->cdrStream.buffer);
429 parameterDelete(csChange);
432 csChangeFSN=CSTWriterCSChange_first(cstWriter);
434 cstWriter->firstSN=csChangeFSN->sn;
436 cstWriter->firstSN=cstWriter->lastSN;
439 /*****************************************************************************/
441 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
443 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
444 if (!csChange->remoteReaderStrict) {
445 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
452 /*****************************************************************************/
454 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
455 CSChangeForReader *csChangeForReader;
456 int32_t timerQueue=1;
458 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
459 timerQueue=2; //userdata timer queue
460 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
462 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)==0) {
463 csChangeForReader->commStateChFReader=TOSEND;
464 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
465 cstRemoteReader->commStateSend=MUSTSENDDATA;
467 cstRemoteReader->objectEntryOID->objectEntryAID,
468 &cstRemoteReader->delayResponceTimer,
471 cstRemoteReader->objectEntryOID->objectEntryAID,
472 &cstRemoteReader->delayResponceTimer,
474 "CSTWriterSendTimer",
476 &cstRemoteReader->cstWriter->lock,
478 &cstRemoteReader->cstWriter->params.delayResponceTime);