2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(PublicationList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 publications, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
41 void *instance,NtpTime *persistence,int strength,
42 ORTESendCallBack sendCallBack,void *sendCallBackParam,
43 NtpTime *sendCallBackDelay) {
46 CSTWriterParams cstWriterParams;
48 ObjectEntryOID *objectEntryOID;
52 debug(31,10) ("ORTEPublicationCreate: start\n");
53 cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
54 if (!cstWriter) return NULL;
55 debug(31,10) ("ORTEPublicationCreate: memory OK\n");
56 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58 pthread_rwlock_rdlock(&d->typeEntry.lock);
59 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
60 pthread_rwlock_unlock(&d->typeEntry.lock);
61 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63 printf("before call ORTEPublicationCreate is necessary to register \n\
64 ser./deser. function for a given typeName!!!\n");
68 pthread_rwlock_wrlock(&d->publications.lock);
69 //generate new guid of publisher
70 d->publications.counter++;
71 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
72 guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
73 pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
74 memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
75 strcpy((char *)pp->topic,topic);
76 strcpy((char *)pp->typeName,typeName);
77 pp->persistence=*persistence;
78 pp->strength=strength;
79 pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS |
80 PID_VALUE_RELIABILITY_STRICT;
81 //insert object to structure objectEntry
82 objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
83 objectEntryOID->privateCreated=ORTE_TRUE;
84 objectEntryOID->instance=instance;
85 objectEntryOID->sendCallBack=sendCallBack;
86 objectEntryOID->callBackParam=sendCallBackParam;
87 if (objectEntryOID->sendCallBack!=NULL) {
88 if (sendCallBackDelay!=NULL) {
89 objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
91 objectEntryOID->objectEntryAID,
92 &objectEntryOID->sendCallBackDelayTimer,
94 "PublicationCallBackTimer",
95 PublicationCallBackTimer,
98 &objectEntryOID->sendCallBackDelay);
101 //create writerPublication
102 cstWriterParams.registrationRetries=0;
103 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
104 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
105 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
106 cstWriterParams.refreshPeriod=iNtpTime; //can't refresh csChange(s)
107 cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
108 cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
109 cstWriterParams.fullAcknowledge=ORTE_TRUE;
110 CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
111 &typeNode->typeRegister);
112 //insert cstWriter to list of publications
113 CSTWriter_insert(&d->publications,cstWriter);
114 //generate csChange for writerPublisher
115 pthread_rwlock_wrlock(&d->writerPublications.lock);
116 csChange=(CSChange*)MALLOC(sizeof(CSChange));
117 parameterUpdateCSChangeFromPublication(csChange,pp);
119 csChange->alive=ORTE_TRUE;
120 csChange->cdrCodec.buffer=NULL;
121 debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
122 CSTWriterAddCSChange(d,&d->writerPublications,csChange);
123 pthread_rwlock_unlock(&d->writerPublications.lock);
124 pthread_rwlock_unlock(&d->publications.lock);
125 pthread_rwlock_unlock(&d->typeEntry.lock);
126 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
127 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
128 debug(31,10) ("ORTEPublicationCreate: finished\n");
132 /*****************************************************************************/
134 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
137 if (!cstWriter) return ORTE_BAD_HANDLE;
138 //generate csChange for writerPublisher
139 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
140 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
141 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
142 if (cstWriter->objectEntryOID->sendCallBack!=NULL) {
143 eventDetach(cstWriter->domain,
144 cstWriter->objectEntryOID->objectEntryAID,
145 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
148 csChange=(CSChange*)MALLOC(sizeof(CSChange));
149 CSChangeAttributes_init_head(csChange);
150 csChange->cdrCodec.buffer=NULL;
151 csChange->guid=cstWriter->guid;
152 csChange->alive=ORTE_FALSE;
153 CSTWriterAddCSChange(cstWriter->domain,
154 &cstWriter->domain->writerPublications,
156 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
157 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
158 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
163 /*****************************************************************************/
165 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
166 if (!cstWriter) return ORTE_BAD_HANDLE;
167 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
168 pthread_rwlock_rdlock(&cstWriter->lock);
169 *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
170 pthread_rwlock_unlock(&cstWriter->lock);
171 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
175 /*****************************************************************************/
177 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
180 if (!cstWriter) return ORTE_BAD_HANDLE;
181 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
182 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
183 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
184 pthread_rwlock_rdlock(&cstWriter->lock);
185 csChange=(CSChange*)MALLOC(sizeof(CSChange));
186 parameterUpdateCSChangeFromPublication(csChange,pp);
187 csChange->guid=cstWriter->guid;
188 csChange->alive=ORTE_TRUE;
189 csChange->cdrCodec.buffer=NULL;
190 CSTWriterAddCSChange(cstWriter->domain,
191 &cstWriter->domain->writerPublications,csChange);
192 pthread_rwlock_unlock(&cstWriter->lock);
193 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
194 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
195 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
199 /*****************************************************************************/
201 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
202 unsigned int retries,unsigned int noSubscriptions) {
203 unsigned int rSubscriptions;
206 if (!cstWriter) return ORTE_BAD_HANDLE;
207 NtpTimeDisAssembToMs(sec,ms,wait);
209 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
210 pthread_rwlock_rdlock(&cstWriter->lock);
211 rSubscriptions=cstWriter->cstRemoteReaderCounter;
212 pthread_rwlock_unlock(&cstWriter->lock);
213 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
214 if (rSubscriptions>=noSubscriptions)
216 ORTESleepMs(sec*1000+ms);
221 /*****************************************************************************/
223 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
226 if (!cstWriter) return ORTE_BAD_HANDLE;
227 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
228 pthread_rwlock_rdlock(&cstWriter->lock);
229 status->strict=cstWriter->strictReliableCounter;
230 status->bestEffort=cstWriter->bestEffortsCounter;
232 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
234 pthread_rwlock_unlock(&cstWriter->lock);
235 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
239 /*****************************************************************************/
241 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
244 if (!cstWriter) return ORTE_BAD_HANDLE;
245 pthread_rwlock_wrlock(&cstWriter->lock);
246 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
247 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
248 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
249 NtpTime expire,atime=getActualNtpTime();
250 struct timespec wtime;
251 //count max block time
252 NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
253 NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
254 wtime.tv_nsec*=1000; //conver to nano seconds
255 while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
256 pthread_rwlock_unlock(&cstWriter->lock);
257 //wait till a message will be processed
258 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
259 if (cstWriter->condValueCSChangeDestroyed==0) {
260 if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
261 &cstWriter->mutexCSChangeDestroyed,
262 &wtime)==ETIMEDOUT) {
263 debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
264 cstWriter->csChangesCounter);
265 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
266 pthread_rwlock_unlock(&cstWriter->lock);
267 return ORTE_QUEUE_FULL;
270 cstWriter->condValueCSChangeDestroyed=0;
271 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
272 pthread_rwlock_wrlock(&cstWriter->lock);
276 pthread_rwlock_unlock(&cstWriter->lock);
280 /*****************************************************************************/
282 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
283 ORTEPublicationSendParam *psp) {
285 SequenceNumber snNext;
288 if (!cstWriter) return ORTE_BAD_HANDLE;
289 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
290 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
291 if (!CSTRemoteReader_is_empty(cstWriter)) {
292 ORTEGetMaxSizeParam gms;
294 csChange=(CSChange*)MALLOC(sizeof(CSChange));
295 CSChangeAttributes_init_head(csChange);
296 csChange->guid=cstWriter->guid;
297 csChange->alive=ORTE_FALSE;
298 CDR_codec_init_static(&csChange->cdrCodec);
299 csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
302 csChange->cdrCodec.data_endian = psp->data_endian;
303 cstWriter->objectEntryOID->instance=psp->instance;
306 /* determine maximal size */
307 gms.host_endian=csChange->cdrCodec.host_endian;
308 gms.data_endian=csChange->cdrCodec.data_endian;
309 gms.data=cstWriter->objectEntryOID->instance;
310 gms.max_size=cstWriter->typeRegister->maxSize;
313 if (cstWriter->typeRegister->getMaxSize)
314 max_size=cstWriter->typeRegister->getMaxSize(&gms,1);
316 max_size=cstWriter->typeRegister->maxSize;
318 /* prepare csChange */
319 CDR_buffer_init(&csChange->cdrCodec,
320 RTPS_HEADER_LENGTH+12+20+max_size); //HEADER+INFO_TS+ISSUE
321 csChange->cdrCodec.wptr_max=
322 cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
324 /* SN for next issue */
325 SeqNumberInc(snNext,cstWriter->lastSN);
328 RTPSHeaderCreate(&csChange->cdrCodec,
329 cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
330 RTPSInfoTSCreate(&csChange->cdrCodec,
332 RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
333 OID_UNKNOWN,cstWriter->guid.oid,snNext);
335 //serialization routine
336 if (cstWriter->typeRegister->serialize) {
337 cstWriter->typeRegister->serialize(
339 cstWriter->objectEntryOID->instance);
341 //no deserialization -> memcpy
342 CDR_buffer_puts(&csChange->cdrCodec,
343 cstWriter->objectEntryOID->instance,max_size);
346 debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
347 max_size,snNext.low);
349 CSTWriterAddCSChange(cstWriter->domain,
353 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
354 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
358 /*****************************************************************************/
360 ORTEPublicationSendEx(ORTEPublication *cstWriter,
361 ORTEPublicationSendParam *psp) {
364 if (!cstWriter) return ORTE_BAD_HANDLE;
365 //prepare sending queue
366 r=ORTEPublicationPrepareQueue(cstWriter);
370 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
371 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
372 pthread_rwlock_wrlock(&cstWriter->lock);
373 r=ORTEPublicationSendLocked(cstWriter,psp);
374 pthread_rwlock_unlock(&cstWriter->lock);
375 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
376 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
380 /*****************************************************************************/
382 ORTEPublicationSend(ORTEPublication *cstWriter) {
383 return ORTEPublicationSendEx(cstWriter,NULL);
387 /*****************************************************************************/
389 ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
390 return cstWriter->objectEntryOID->instance;