2 * $Id: RTPSCSTWriter.c,v 0.0.0.1 2003/09/13
4 * DEBUG: section 51 CSTWriter
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(CSTWriter,
25 CSTPublications, CSTWriter, GUID_RTPS,
26 cstWriter, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteReader,
28 CSTWriter, CSTRemoteReader, GUID_RTPS,
29 cstRemoteReader, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeForReader,
31 CSTRemoteReader, CSChangeForReader, SequenceNumber,
32 csChangeForReader, node, csChange->sn, gavl_cmp_sn);
34 /*****************************************************************************/
36 CSTWriterInit(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
37 ObjectId oid,CSTWriterParams *params,ORTETypeRegister *typeRegister) {
39 debug(51,10) ("CSTWriterInit: start\n");
40 //init values of cstwriter
41 cstWriter->guid.hid=object->objectEntryHID->hid;
42 cstWriter->guid.aid=object->objectEntryAID->aid;
43 cstWriter->guid.oid=oid;
44 cstWriter->objectEntryOID=object;
45 memcpy(&cstWriter->params,params,sizeof(CSTWriterParams));
46 cstWriter->strictReliableCounter=0;
47 cstWriter->bestEffortsCounter=0;
48 cstWriter->csChangesCounter=0;
49 cstWriter->cstRemoteReaderCounter=0;
50 SEQUENCE_NUMBER_NONE(cstWriter->firstSN);
51 SEQUENCE_NUMBER_NONE(cstWriter->lastSN);
52 CSTWriterCSChange_init_head(cstWriter);
53 CSTRemoteReader_init_root_field(cstWriter);
54 pthread_rwlock_init(&cstWriter->lock,NULL);
55 ul_htim_queue_init_detached(&cstWriter->refreshPeriodTimer.htim);
57 cstWriter->typeRegister=typeRegister;
58 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
59 pthread_mutex_init(&cstWriter->mutexCSChangeDestroyed,NULL);
61 //add event for refresh
62 if (NtpTimeCmp(cstWriter->params.refreshPeriod,iNtpTime)!=0) {
63 CSTWriterRefreshTimer(d,(void*)cstWriter);
65 debug(51,4) ("CSTWriterInit: 0x%x-0x%x-0x%x\n",
69 debug(51,10) ("CSTWriterInit: finished\n");
72 /*****************************************************************************/
74 CSTWriterDelete(ORTEDomain *d,CSTWriter *cstWriter) {
75 CSTRemoteReader *cstRemoteReader;
78 debug(51,10) ("CSTWriterDelete: start\n");
80 debug(51,4) ("CSTWriterDelete: 0x%x-0x%x-0x%x\n",
84 //Destroy all cstRemoteReader connected on cstWriter
85 while((cstRemoteReader=CSTRemoteReader_first(cstWriter))) {
86 CSTWriterDestroyRemoteReader(d,cstRemoteReader);
88 //Destroy all csChnages connected on cstWriter
89 while((csChange=CSTWriterCSChange_cut_first(cstWriter))) {
90 parameterDelete(csChange);
94 cstWriter->objectEntryOID->objectEntryAID,
95 &cstWriter->refreshPeriodTimer,
97 if ((cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
98 pthread_mutex_destroy(&cstWriter->mutexCSChangeDestroyed);
100 pthread_rwlock_destroy(&cstWriter->lock);
101 debug(51,10) ("CSTWriterDelete: finished\n");
104 /*****************************************************************************/
106 CSTWriterAddRemoteReader(ORTEDomain *d,CSTWriter *cstWriter,ObjectEntryOID *object,
108 CSTRemoteReader *cstRemoteReader;
109 CSChangeForReader *csChangeForReader;
110 CSChange *csChange=NULL;
112 cstWriter->cstRemoteReaderCounter++;
113 cstRemoteReader=(CSTRemoteReader*)MALLOC(sizeof(CSTRemoteReader));
114 cstRemoteReader->guid.hid=object->objectEntryHID->hid;
115 cstRemoteReader->guid.aid=object->objectEntryAID->aid;
116 cstRemoteReader->guid.oid=oid;
117 cstRemoteReader->objectEntryOID=object;
118 cstRemoteReader->cstWriter=cstWriter;
119 CSChangeForReader_init_root_field(cstRemoteReader);
120 cstRemoteReader->commStateHB=MAYSENDHB;
121 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
122 cstRemoteReader->HBRetriesCounter=0;
123 cstRemoteReader->csChangesCounter=0;
124 NTPTIME_ZERO(cstRemoteReader->lastSentIssueTime);
125 ul_htim_queue_init_detached(&cstRemoteReader->delayResponceTimer.htim);
126 ul_htim_queue_init_detached(&cstRemoteReader->repeatAnnounceTimer.htim);
127 //insert remote reader
128 CSTRemoteReader_insert(cstWriter,cstRemoteReader);
129 //copy all csChanges (not for publication)
130 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
131 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
132 csChange->remoteReaderCount++;
133 cstRemoteReader->csChangesCounter++;
134 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
135 csChangeForReader->commStateChFReader=TOSEND;
136 csChangeForReader->csChange=csChange;
137 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
138 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
139 cstRemoteReader->commStateSend=MUSTSENDDATA;
141 if (cstRemoteReader->commStateSend==MUSTSENDDATA) {
143 cstRemoteReader->objectEntryOID->objectEntryAID,
144 &cstRemoteReader->delayResponceTimer,
146 "CSTWriterSendTimer",
148 &cstRemoteReader->cstWriter->lock,
150 &cstRemoteReader->cstWriter->params.delayResponceTime);
154 ORTESubsProp *sp=(ORTESubsProp*)object->attributes;
155 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
156 cstWriter->strictReliableCounter++;
158 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
159 cstWriter->bestEffortsCounter++;
162 debug(51,4) ("CSTWriterAddRemoteReader: 0x%x-0x%x-0x%x\n",
163 cstRemoteReader->guid.hid,
164 cstRemoteReader->guid.aid,
165 cstRemoteReader->guid.oid);
168 /*****************************************************************************/
170 CSTWriterDestroyRemoteReader(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
171 CSChangeForReader *csChangeForReader;
173 if (!cstRemoteReader) return;
174 cstRemoteReader->cstWriter->cstRemoteReaderCounter--;
175 debug(51,4) ("CSTWriterDestroyRemoteReader: 0x%x-0x%x-0x%x\n",
176 cstRemoteReader->guid.hid,
177 cstRemoteReader->guid.aid,
178 cstRemoteReader->guid.oid);
179 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
181 sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
182 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0)
183 cstRemoteReader->cstWriter->strictReliableCounter--;
185 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
186 cstRemoteReader->cstWriter->bestEffortsCounter--;
189 while((csChangeForReader=CSChangeForReader_first(cstRemoteReader))) {
190 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
191 csChangeForReader,ORTE_TRUE);
194 cstRemoteReader->objectEntryOID->objectEntryAID,
195 &cstRemoteReader->delayResponceTimer,
196 1); //metatraffic timer
198 cstRemoteReader->objectEntryOID->objectEntryAID,
199 &cstRemoteReader->delayResponceTimer,
202 cstRemoteReader->objectEntryOID->objectEntryAID,
203 &cstRemoteReader->repeatAnnounceTimer,
204 1); //metatraffic timer
206 cstRemoteReader->objectEntryOID->objectEntryAID,
207 &cstRemoteReader->repeatAnnounceTimer,
209 CSTRemoteReader_delete(cstRemoteReader->cstWriter,cstRemoteReader);
210 FREE(cstRemoteReader);
213 /*****************************************************************************/
215 CSTWriterMakeGAP(ORTEDomain *d,CSTWriter *cstWriter,GUID_RTPS *guid) {
216 CSChange *csChange,*csChange1;
218 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
219 if ((!SeqNumberCmp(csChange->gapSN,noneSN)) &&
220 (!gavl_cmp_guid(&csChange->guid,guid))) { //equal? (VAR)
221 //VAR->GAP - inc gap_sn_no
222 SeqNumberInc(csChange->gapSN,csChange->gapSN);
223 parameterDelete(csChange);
224 //is Gap in prior or next position?
225 csChange1=CSTWriterCSChange_prev(cstWriter,csChange);
227 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
228 SeqNumberAdd(csChange1->gapSN,
231 CSTWriterDestroyCSChange(d,cstWriter,csChange);
235 csChange1=CSTWriterCSChange_next(cstWriter,csChange);
237 if (SeqNumberCmp(csChange1->gapSN,noneSN)) {
238 SeqNumberAdd(csChange->gapSN,
241 CSTWriterDestroyCSChange(d,cstWriter,csChange1);
249 /*****************************************************************************/
251 CSTWriterAddCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
252 CSChangeForReader *csChangeForReader;
253 CSTRemoteReader *cstRemoteReader;
254 CSChange *csChangeFSN;
256 cstWriter->csChangesCounter++;
257 //look for old cschange
258 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION)
259 CSTWriterMakeGAP(d,cstWriter,&csChange->guid);
260 //insert cschange into database changes
261 SeqNumberInc(cstWriter->lastSN,cstWriter->lastSN);
262 csChange->sn=cstWriter->lastSN;
263 SEQUENCE_NUMBER_NONE(csChange->gapSN);
264 csChange->remoteReaderCount=cstWriter->cstRemoteReaderCounter;
265 csChange->remoteReaderProcBest=0;
266 csChange->remoteReaderProcStrict=0;
267 CSTWriterCSChange_insert(cstWriter,csChange);
269 csChangeFSN=CSTWriterCSChange_first(cstWriter);
270 if (SeqNumberCmp(csChangeFSN->gapSN,noneSN)>0) {
271 //minimal are 2 SNs (GAP,VAR) ...
272 // CSTWriterDestroyCSChange(cstWriter,csChange);
274 csChangeFSN=CSTWriterCSChange_first(cstWriter);
275 cstWriter->firstSN=csChangeFSN->sn;
276 //insert new cschange for each reader
277 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
279 csChangeForReader=(CSChangeForReader*)MALLOC(sizeof(CSChangeForReader));
280 csChangeForReader->commStateChFReader=TOSEND;
281 csChangeForReader->csChange=csChange;
282 ul_htim_queue_init_detached(&csChangeForReader->waitWhileDataUnderwayTimer.htim);
283 CSChangeForReader_insert(cstRemoteReader,csChangeForReader);
284 cstRemoteReader->csChangesCounter++;
285 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
286 cstRemoteReader->commStateSend=MUSTSENDDATA;
287 if ((cstWriter->guid.oid & 0x07)!=OID_PUBLICATION) {
289 cstRemoteReader->objectEntryOID->objectEntryAID,
290 &cstRemoteReader->delayResponceTimer,
293 cstRemoteReader->objectEntryOID->objectEntryAID,
294 &cstRemoteReader->delayResponceTimer,
296 "CSTWriterSendTimer",
298 &cstRemoteReader->cstWriter->lock,
302 ORTESubsProp *sp=(ORTESubsProp*)cstRemoteReader->objectEntryOID->attributes;
304 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
305 //Strict reliable subscription
307 cstRemoteReader->objectEntryOID->objectEntryAID,
308 &cstRemoteReader->delayResponceTimer,
311 cstRemoteReader->objectEntryOID->objectEntryAID,
312 &cstRemoteReader->delayResponceTimer,
314 "CSTWriterSendStrictTimer",
315 CSTWriterSendStrictTimer,
316 &cstRemoteReader->cstWriter->lock,
320 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
321 //best efforts subscription
322 NtpTime nextIssueTime,nextIssueDelay,actTime=getActualNtpTime();
324 NtpTimeAdd(nextIssueTime,
325 cstRemoteReader->lastSentIssueTime,
326 sp->minimumSeparation);
327 NtpTimeSub(nextIssueDelay,
330 if (NtpTimeCmp(actTime,nextIssueTime)>=0)
331 NTPTIME_ZERO(nextIssueDelay);
333 cstRemoteReader->objectEntryOID->objectEntryAID,
334 &cstRemoteReader->delayResponceTimer,
336 if (NtpTimeCmp(nextIssueDelay,zNtpTime)==0) {
337 //direct sent issue, for case zero time
338 CSTWriterSendBestEffortTimer(d,(void*)cstRemoteReader);
340 //schedule sent issue (future)
342 cstRemoteReader->objectEntryOID->objectEntryAID,
343 &cstRemoteReader->delayResponceTimer,
345 "CSTWriterSendBestEffortTimer",
346 CSTWriterSendBestEffortTimer,
347 &cstRemoteReader->cstWriter->lock,
352 //!Best_Effort & !Strict_Reliable
353 CSTWriterDestroyCSChangeForReader(cstRemoteReader,csChangeForReader,
359 debug(51,5) ("CSTWriterAddCSChange: scheduled Var | Gap | Issue | HB \n");
363 /*****************************************************************************/
365 CSTWriterDestroyCSChangeForReader(CSTRemoteReader *cstRemoteReader,
366 CSChangeForReader *csChangeForReader,Boolean destroyCSChange) {
368 if (!csChangeForReader) return;
369 csChange=csChangeForReader->csChange;
370 csChange->remoteReaderCount--;
371 cstRemoteReader->csChangesCounter--;
372 eventDetach(cstRemoteReader->cstWriter->domain,
373 cstRemoteReader->objectEntryOID->objectEntryAID,
374 &csChangeForReader->waitWhileDataUnderwayTimer,
376 CSChangeForReader_delete(cstRemoteReader,csChangeForReader);
377 FREE(csChangeForReader);
378 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION) {
379 if (csChange->remoteReaderCount<=
380 (csChange->remoteReaderProcBest+csChange->remoteReaderProcStrict)) {
381 if (destroyCSChange) {
382 CSTWriterDestroyCSChange(cstRemoteReader->cstWriter->domain,
383 cstRemoteReader->cstWriter,csChange);
385 pthread_mutex_unlock(&cstRemoteReader->cstWriter->mutexCSChangeDestroyed);
386 debug(51,5) ("Publication: new queue level (%d)\n",
387 cstRemoteReader->cstWriter->csChangesCounter);
392 /*****************************************************************************/
394 CSTWriterDestroyCSChange(ORTEDomain *d,CSTWriter *cstWriter,CSChange *csChange) {
395 CSTRemoteReader *cstRemoteReader;
396 CSChangeForReader *csChangeForReader;
398 if (!csChange) return;
399 cstWriter->csChangesCounter--;
400 CSTWriterCSChange_delete(cstWriter,csChange);
401 gavl_cust_for_each(CSTRemoteReader,cstWriter,cstRemoteReader) {
402 csChangeForReader=CSChangeForReader_find(cstRemoteReader,&csChange->sn);
403 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
404 csChangeForReader,ORTE_FALSE);
406 if (csChange->cdrStream.buffer)
407 FREE(csChange->cdrStream.buffer);
408 parameterDelete(csChange);
412 /*****************************************************************************/
414 CSTWriterTryDestroyBestEffortIssue(CSTWriter *cstWriter) {
416 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange) {
417 if (!csChange->remoteReaderProcStrict) {
418 CSTWriterDestroyCSChange(cstWriter->domain,cstWriter,csChange);
425 /*****************************************************************************/
427 CSTWriterRefreshAllCSChanges(ORTEDomain *d,CSTRemoteReader *cstRemoteReader) {
428 CSChangeForReader *csChangeForReader;
429 int32_t timerQueue=1;
431 if ((cstRemoteReader->cstWriter->guid.oid & 0x07)==OID_PUBLICATION)
432 timerQueue=2; //userdata timer queue
433 gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
434 csChangeForReader->commStateChFReader=TOSEND;
435 if (cstRemoteReader->commStateSend==NOTHNIGTOSEND) {
436 cstRemoteReader->commStateSend=MUSTSENDDATA;
438 cstRemoteReader->objectEntryOID->objectEntryAID,
439 &cstRemoteReader->delayResponceTimer,
442 cstRemoteReader->objectEntryOID->objectEntryAID,
443 &cstRemoteReader->delayResponceTimer,
445 "CSTWriterSendTimer",
447 &cstRemoteReader->cstWriter->lock,
449 &cstRemoteReader->cstWriter->params.delayResponceTime);