2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(PublicationList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 publications, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
41 void *instance,NtpTime *persistence,int strength,
42 ORTESendCallBack sendCallBack,void *sendCallBackParam,
43 NtpTime *sendCallBackDelay) {
46 CSTWriterParams cstWriterParams;
48 ObjectEntryOID *objectEntryOID;
52 debug(31,10) ("ORTEPublicationCreate: start\n");
53 cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
54 if (!cstWriter) return NULL;
55 debug(31,10) ("ORTEPublicationCreate: memory OK\n");
56 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58 pthread_rwlock_rdlock(&d->typeEntry.lock);
59 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
60 pthread_rwlock_unlock(&d->typeEntry.lock);
61 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63 printf("before call ORTEPublicationCreate is necessary to register \n\
64 ser./deser. function for a given typeName!!!\n");
67 pthread_rwlock_wrlock(&d->publications.lock);
68 //generate new guid of publisher
69 d->publications.counter++;
70 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
71 guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
72 pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
73 memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
74 strcpy((char *)pp->topic,topic);
75 strcpy((char *)pp->typeName,typeName);
76 pp->persistence=*persistence;
77 pp->strength=strength;
78 pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS |
79 PID_VALUE_RELIABILITY_STRICT;
80 //insert object to structure objectEntry
81 objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
82 objectEntryOID->privateCreated=ORTE_TRUE;
83 objectEntryOID->instance=instance;
84 objectEntryOID->sendCallBack=sendCallBack;
85 objectEntryOID->callBackParam=sendCallBackParam;
86 if (objectEntryOID->sendCallBack!=NULL) {
87 if (sendCallBackDelay!=NULL) {
88 objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
90 objectEntryOID->objectEntryAID,
91 &objectEntryOID->sendCallBackDelayTimer,
93 "PublicationCallBackTimer",
94 PublicationCallBackTimer,
97 &objectEntryOID->sendCallBackDelay);
100 //create writerPublication
101 cstWriterParams.registrationRetries=0;
102 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
103 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
104 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
105 cstWriterParams.refreshPeriod=iNtpTime; //can't refresh csChange(s)
106 cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
107 cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
108 cstWriterParams.fullAcknowledge=ORTE_TRUE;
109 CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
110 &typeNode->typeRegister);
111 //insert cstWriter to list of publications
112 CSTWriter_insert(&d->publications,cstWriter);
113 //generate csChange for writerPublisher
114 pthread_rwlock_wrlock(&d->writerPublications.lock);
115 csChange=(CSChange*)MALLOC(sizeof(CSChange));
116 parameterUpdateCSChangeFromPublication(csChange,pp);
118 csChange->alive=ORTE_TRUE;
119 csChange->cdrCodec.buffer=NULL;
120 debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
121 CSTWriterAddCSChange(d,&d->writerPublications,csChange);
122 pthread_rwlock_unlock(&d->writerPublications.lock);
123 pthread_rwlock_unlock(&d->publications.lock);
124 pthread_rwlock_unlock(&d->typeEntry.lock);
125 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
126 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
127 debug(31,10) ("ORTEPublicationCreate: finished\n");
131 /*****************************************************************************/
133 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
136 if (!cstWriter) return ORTE_BAD_HANDLE;
137 //generate csChange for writerPublisher
138 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
139 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
140 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
141 if (cstWriter->objectEntryOID->sendCallBack!=NULL) {
142 eventDetach(cstWriter->domain,
143 cstWriter->objectEntryOID->objectEntryAID,
144 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
147 csChange=(CSChange*)MALLOC(sizeof(CSChange));
148 CSChangeAttributes_init_head(csChange);
149 csChange->cdrCodec.buffer=NULL;
150 csChange->guid=cstWriter->guid;
151 csChange->alive=ORTE_FALSE;
152 CSTWriterAddCSChange(cstWriter->domain,
153 &cstWriter->domain->writerPublications,
155 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
156 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
157 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
162 /*****************************************************************************/
164 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
165 if (!cstWriter) return ORTE_BAD_HANDLE;
166 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
167 pthread_rwlock_rdlock(&cstWriter->lock);
168 *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
169 pthread_rwlock_unlock(&cstWriter->lock);
170 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
174 /*****************************************************************************/
176 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
179 if (!cstWriter) return ORTE_BAD_HANDLE;
180 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
181 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
182 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
183 pthread_rwlock_rdlock(&cstWriter->lock);
184 csChange=(CSChange*)MALLOC(sizeof(CSChange));
185 parameterUpdateCSChangeFromPublication(csChange,pp);
186 csChange->guid=cstWriter->guid;
187 csChange->alive=ORTE_TRUE;
188 csChange->cdrCodec.buffer=NULL;
189 CSTWriterAddCSChange(cstWriter->domain,
190 &cstWriter->domain->writerPublications,csChange);
191 pthread_rwlock_unlock(&cstWriter->lock);
192 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
193 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
194 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
198 /*****************************************************************************/
200 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
201 unsigned int retries,unsigned int noSubscriptions) {
202 unsigned int rSubscriptions;
205 if (!cstWriter) return ORTE_BAD_HANDLE;
206 NtpTimeDisAssembToMs(sec,ms,wait);
208 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
209 pthread_rwlock_rdlock(&cstWriter->lock);
210 rSubscriptions=cstWriter->cstRemoteReaderCounter;
211 pthread_rwlock_unlock(&cstWriter->lock);
212 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
213 if (rSubscriptions>=noSubscriptions)
215 ORTESleepMs(sec*1000+ms);
220 /*****************************************************************************/
222 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
225 if (!cstWriter) return ORTE_BAD_HANDLE;
226 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
227 pthread_rwlock_rdlock(&cstWriter->lock);
228 status->strict=cstWriter->strictReliableCounter;
229 status->bestEffort=cstWriter->bestEffortsCounter;
231 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
233 pthread_rwlock_unlock(&cstWriter->lock);
234 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
238 /*****************************************************************************/
240 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
243 if (!cstWriter) return ORTE_BAD_HANDLE;
244 pthread_rwlock_wrlock(&cstWriter->lock);
245 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
246 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
247 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
248 NtpTime expire,atime=getActualNtpTime();
249 struct timespec wtime;
250 //count max block time
251 NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
252 NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
253 wtime.tv_nsec*=1000; //conver to nano seconds
254 while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
255 pthread_rwlock_unlock(&cstWriter->lock);
256 //wait till a message will be processed
257 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
258 if (cstWriter->condValueCSChangeDestroyed==0) {
259 if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
260 &cstWriter->mutexCSChangeDestroyed,
261 &wtime)==ETIMEDOUT) {
262 debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
263 cstWriter->csChangesCounter);
264 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
265 pthread_rwlock_unlock(&cstWriter->lock);
266 return ORTE_QUEUE_FULL;
269 cstWriter->condValueCSChangeDestroyed=0;
270 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
271 pthread_rwlock_wrlock(&cstWriter->lock);
275 pthread_rwlock_unlock(&cstWriter->lock);
279 /*****************************************************************************/
281 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
282 ORTEPublicationSendParam *psp) {
284 SequenceNumber snNext;
287 if (!cstWriter) return ORTE_BAD_HANDLE;
288 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
289 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
290 if (!CSTRemoteReader_is_empty(cstWriter)) {
291 ORTEGetMaxSizeParam gms;
293 csChange=(CSChange*)MALLOC(sizeof(CSChange));
294 CSChangeAttributes_init_head(csChange);
295 csChange->guid=cstWriter->guid;
296 csChange->alive=ORTE_FALSE;
297 CDR_codec_init_static(&csChange->cdrCodec);
298 csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
301 csChange->cdrCodec.data_endian = psp->data_endian;
302 cstWriter->objectEntryOID->instance=psp->instance;
305 /* determine maximal size */
306 gms.host_endian=csChange->cdrCodec.host_endian;
307 gms.data_endian=csChange->cdrCodec.data_endian;
308 gms.data=cstWriter->objectEntryOID->instance;
309 gms.max_size=cstWriter->typeRegister->maxSize;
312 if (cstWriter->typeRegister->getMaxSize)
313 max_size=cstWriter->typeRegister->getMaxSize(&gms,1);
315 max_size=cstWriter->typeRegister->maxSize;
317 /* prepare csChange */
318 CDR_buffer_init(&csChange->cdrCodec,
319 RTPS_HEADER_LENGTH+12+20+max_size); //HEADER+INFO_TS+ISSUE
320 csChange->cdrCodec.wptr_max=
321 cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
323 /* SN for next issue */
324 SeqNumberInc(snNext,cstWriter->lastSN);
327 RTPSHeaderCreate(&csChange->cdrCodec,
328 cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
329 RTPSInfoTSCreate(&csChange->cdrCodec,
331 RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
332 OID_UNKNOWN,cstWriter->guid.oid,snNext);
334 //serialization routine
335 if (cstWriter->typeRegister->serialize) {
336 cstWriter->typeRegister->serialize(
338 cstWriter->objectEntryOID->instance);
340 //no deserialization -> memcpy
341 CDR_buffer_puts(&csChange->cdrCodec,
342 cstWriter->objectEntryOID->instance,max_size);
345 debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
346 max_size,snNext.low);
348 CSTWriterAddCSChange(cstWriter->domain,
352 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
353 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
357 /*****************************************************************************/
359 ORTEPublicationSendEx(ORTEPublication *cstWriter,
360 ORTEPublicationSendParam *psp) {
363 if (!cstWriter) return ORTE_BAD_HANDLE;
364 //prepare sending queue
365 r=ORTEPublicationPrepareQueue(cstWriter);
369 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
370 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
371 pthread_rwlock_wrlock(&cstWriter->lock);
372 r=ORTEPublicationSendLocked(cstWriter,psp);
373 pthread_rwlock_unlock(&cstWriter->lock);
374 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
375 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
379 /*****************************************************************************/
381 ORTEPublicationSend(ORTEPublication *cstWriter) {
382 return ORTEPublicationSendEx(cstWriter,NULL);
386 /*****************************************************************************/
388 ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
389 return cstWriter->objectEntryOID->instance;