2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(PublicationList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 publications, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
31 void *instance,NtpTime *persistence,int strength,
32 ORTESendCallBack sendCallBack,void *sendCallBackParam,
33 NtpTime *sendCallBackDelay) {
36 CSTWriterParams cstWriterParams;
38 ObjectEntryOID *objectEntryOID;
42 debug(31,10) ("ORTEPublicationCreate: start\n");
43 cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
44 if (!cstWriter) return NULL;
45 debug(31,10) ("ORTEPublicationCreate: memory OK\n");
46 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
47 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
48 pthread_rwlock_rdlock(&d->typeEntry.lock);
49 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
50 pthread_rwlock_unlock(&d->typeEntry.lock);
51 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
52 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
53 printf("before call ORTEPublicationCreate is necessary to register \n\
54 ser./deser. function for a given typeName!!!\n");
57 pthread_rwlock_wrlock(&d->publications.lock);
58 //generate new guid of publisher
59 d->publications.counter++;
60 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
61 guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
62 pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
63 memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
64 strcpy(pp->topic,topic);
65 strcpy(pp->typeName,typeName);
66 pp->persistence=*persistence;
67 pp->strength=strength;
68 pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS |
69 PID_VALUE_RELIABILITY_STRICT;
70 //insert object to structure objectEntry
71 objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
72 objectEntryOID->privateCreated=ORTE_TRUE;
73 objectEntryOID->instance=instance;
74 objectEntryOID->sendCallBack=sendCallBack;
75 objectEntryOID->callBackParam=sendCallBackParam;
76 if (objectEntryOID->sendCallBack!=NULL) {
77 if (sendCallBackDelay!=NULL) {
78 objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
80 objectEntryOID->objectEntryAID,
81 &objectEntryOID->sendCallBackDelayTimer,
83 "PublicationCallBackTimer",
84 PublicationCallBackTimer,
87 &objectEntryOID->sendCallBackDelay);
90 //create writerPublication
91 cstWriterParams.registrationRetries=0;
92 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
93 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
94 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
95 cstWriterParams.refreshPeriod=iNtpTime; //can't refresh csChange(s)
96 cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
97 cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
98 cstWriterParams.fullAcknowledge=ORTE_TRUE;
99 CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
100 &typeNode->typeRegister);
101 //insert cstWriter to list of publications
102 CSTWriter_insert(&d->publications,cstWriter);
103 //generate csChange for writerPublisher
104 pthread_rwlock_wrlock(&d->writerPublications.lock);
105 csChange=(CSChange*)MALLOC(sizeof(CSChange));
106 parameterUpdateCSChangeFromPublication(csChange,pp);
108 csChange->alive=ORTE_TRUE;
109 csChange->cdrCodec.buffer=NULL;
110 debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
111 CSTWriterAddCSChange(d,&d->writerPublications,csChange);
112 pthread_rwlock_unlock(&d->writerPublications.lock);
113 pthread_rwlock_unlock(&d->publications.lock);
114 pthread_rwlock_unlock(&d->typeEntry.lock);
115 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
116 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
117 debug(31,10) ("ORTEPublicationCreate: finished\n");
121 /*****************************************************************************/
123 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
126 if (!cstWriter) return ORTE_BAD_HANDLE;
127 //generate csChange for writerPublisher
128 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
129 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
130 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
131 if (cstWriter->objectEntryOID->sendCallBack!=NULL) {
132 eventDetach(cstWriter->domain,
133 cstWriter->objectEntryOID->objectEntryAID,
134 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
137 csChange=(CSChange*)MALLOC(sizeof(CSChange));
138 CSChangeAttributes_init_head(csChange);
139 csChange->cdrCodec.buffer=NULL;
140 csChange->guid=cstWriter->guid;
141 csChange->alive=ORTE_FALSE;
142 CSTWriterAddCSChange(cstWriter->domain,
143 &cstWriter->domain->writerPublications,
145 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
146 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
147 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
152 /*****************************************************************************/
154 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
155 if (!cstWriter) return ORTE_BAD_HANDLE;
156 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
157 pthread_rwlock_rdlock(&cstWriter->lock);
158 *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
159 pthread_rwlock_unlock(&cstWriter->lock);
160 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
164 /*****************************************************************************/
166 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
169 if (!cstWriter) return ORTE_BAD_HANDLE;
170 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
171 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
172 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
173 pthread_rwlock_rdlock(&cstWriter->lock);
174 csChange=(CSChange*)MALLOC(sizeof(CSChange));
175 parameterUpdateCSChangeFromPublication(csChange,pp);
176 csChange->guid=cstWriter->guid;
177 csChange->alive=ORTE_TRUE;
178 csChange->cdrCodec.buffer=NULL;
179 CSTWriterAddCSChange(cstWriter->domain,
180 &cstWriter->domain->writerPublications,csChange);
181 pthread_rwlock_unlock(&cstWriter->lock);
182 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
183 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
184 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
188 /*****************************************************************************/
190 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
191 unsigned int retries,unsigned int noSubscriptions) {
192 unsigned int rSubscriptions;
195 if (!cstWriter) return ORTE_BAD_HANDLE;
196 NtpTimeDisAssembToMs(sec,ms,wait);
198 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
199 pthread_rwlock_rdlock(&cstWriter->lock);
200 rSubscriptions=cstWriter->cstRemoteReaderCounter;
201 pthread_rwlock_unlock(&cstWriter->lock);
202 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
203 if (rSubscriptions>=noSubscriptions)
205 ORTESleepMs(sec*1000+ms);
210 /*****************************************************************************/
212 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
215 if (!cstWriter) return ORTE_BAD_HANDLE;
216 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
217 pthread_rwlock_rdlock(&cstWriter->lock);
218 status->strict=cstWriter->strictReliableCounter;
219 status->bestEffort=cstWriter->bestEffortsCounter;
221 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
223 pthread_rwlock_unlock(&cstWriter->lock);
224 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
228 /*****************************************************************************/
230 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
233 if (!cstWriter) return ORTE_BAD_HANDLE;
234 pthread_rwlock_wrlock(&cstWriter->lock);
235 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
236 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
237 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
238 NtpTime expire,atime=getActualNtpTime();
239 struct timespec wtime;
240 //count max block time
241 NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
242 NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
243 wtime.tv_nsec*=1000; //conver to nano seconds
244 while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
245 pthread_rwlock_unlock(&cstWriter->lock);
246 //wait till a message will be processed
247 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
248 if (cstWriter->condValueCSChangeDestroyed==0) {
249 if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
250 &cstWriter->mutexCSChangeDestroyed,
251 &wtime)==ETIMEDOUT) {
252 debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
253 cstWriter->csChangesCounter);
254 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
255 pthread_rwlock_unlock(&cstWriter->lock);
256 return ORTE_QUEUE_FULL;
259 cstWriter->condValueCSChangeDestroyed=0;
260 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
261 pthread_rwlock_wrlock(&cstWriter->lock);
265 pthread_rwlock_unlock(&cstWriter->lock);
269 /*****************************************************************************/
271 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
272 ORTEPublicationSendParam *psp) {
274 SequenceNumber snNext;
277 if (!cstWriter) return ORTE_BAD_HANDLE;
278 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
279 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
280 if (!CSTRemoteReader_is_empty(cstWriter)) {
281 ORTEGetMaxSizeParam gms;
283 csChange=(CSChange*)MALLOC(sizeof(CSChange));
284 CSChangeAttributes_init_head(csChange);
285 csChange->guid=cstWriter->guid;
286 csChange->alive=ORTE_FALSE;
287 CDR_codec_init_static(&csChange->cdrCodec);
288 csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
291 csChange->cdrCodec.data_endian = psp->data_endian;
292 cstWriter->objectEntryOID->instance=psp->instance;
295 /* determine maximal size */
296 gms.host_endian=csChange->cdrCodec.host_endian;
297 gms.data_endian=csChange->cdrCodec.data_endian;
298 gms.data=cstWriter->objectEntryOID->instance;
299 gms.max_size=cstWriter->typeRegister->maxSize;
302 if (cstWriter->typeRegister->getMaxSize)
303 max_size=cstWriter->typeRegister->getMaxSize(&gms);
305 max_size=cstWriter->typeRegister->maxSize;
307 /* prepare csChange */
308 CDR_buffer_init(&csChange->cdrCodec,
309 RTPS_HEADER_LENGTH+12+20+max_size); //HEADER+INFO_TS+ISSUE
310 csChange->cdrCodec.wptr_max=
311 cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
313 /* SN for next issue */
314 SeqNumberInc(snNext,cstWriter->lastSN);
317 RTPSHeaderCreate(&csChange->cdrCodec,
318 cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
319 RTPSInfoTSCreate(&csChange->cdrCodec,
321 RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
322 OID_UNKNOWN,cstWriter->guid.oid,snNext);
324 //serialization routine
325 if (cstWriter->typeRegister->serialize) {
326 cstWriter->typeRegister->serialize(
328 cstWriter->objectEntryOID->instance);
330 //no deserialization -> memcpy
331 CDR_buffer_puts(&csChange->cdrCodec,
332 cstWriter->objectEntryOID->instance,max_size);
335 debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
336 max_size,snNext.low);
338 CSTWriterAddCSChange(cstWriter->domain,
342 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
343 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
347 /*****************************************************************************/
349 ORTEPublicationSendEx(ORTEPublication *cstWriter,
350 ORTEPublicationSendParam *psp) {
353 if (!cstWriter) return ORTE_BAD_HANDLE;
354 //prepare sending queue
355 r=ORTEPublicationPrepareQueue(cstWriter);
359 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
360 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
361 pthread_rwlock_wrlock(&cstWriter->lock);
362 r=ORTEPublicationSendLocked(cstWriter,psp);
363 pthread_rwlock_unlock(&cstWriter->lock);
364 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
365 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
369 /*****************************************************************************/
371 ORTEPublicationSend(ORTEPublication *cstWriter) {
372 return ORTEPublicationSendEx(cstWriter,NULL);