2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(PublicationList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 publications, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTEPublicationCreate(ORTEDomain *d, const char *topic, const char *typeName,
41 void *instance, NtpTime *persistence, int strength,
42 ORTESendCallBack sendCallBack, void *sendCallBackParam,
43 NtpTime *sendCallBackDelay)
47 CSTWriterParams cstWriterParams;
49 ObjectEntryOID *objectEntryOID;
53 debug(31, 10) ("ORTEPublicationCreate: start\n");
54 cstWriter = (CSTWriter *)MALLOC(sizeof(CSTWriter));
57 debug(31, 10) ("ORTEPublicationCreate: memory OK\n");
58 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
59 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
60 pthread_rwlock_rdlock(&d->typeEntry.lock);
61 if (!(typeNode = ORTEType_find(&d->typeEntry, &typeName))) {
62 pthread_rwlock_unlock(&d->typeEntry.lock);
63 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
64 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
65 printf("before call ORTEPublicationCreate is necessary to register \n\
66 ser./deser. function for a given typeName!!!\n");
70 pthread_rwlock_wrlock(&d->publications.lock);
71 //generate new guid of publisher
72 d->publications.counter++;
73 guid.hid = d->guid.hid;
74 guid.aid = d->guid.aid;
75 guid.oid = (d->publications.counter<<8)|OID_PUBLICATION;
76 pp = (ORTEPublProp *)MALLOC(sizeof(ORTEPublProp));
77 memcpy(pp, &d->domainProp.publPropDefault, sizeof(ORTEPublProp));
78 strcpy((char *)pp->topic, topic);
79 strcpy((char *)pp->typeName, typeName);
80 pp->persistence = *persistence;
81 pp->strength = strength;
82 pp->reliabilityOffered = PID_VALUE_RELIABILITY_BEST_EFFORTS |
83 PID_VALUE_RELIABILITY_STRICT;
84 //insert object to structure objectEntry
85 objectEntryOID = objectEntryAdd(d, &guid, (void *)pp);
86 objectEntryOID->privateCreated = ORTE_TRUE;
87 objectEntryOID->instance = instance;
88 objectEntryOID->sendCallBack = sendCallBack;
89 objectEntryOID->callBackParam = sendCallBackParam;
90 if (objectEntryOID->sendCallBack != NULL) {
91 if (sendCallBackDelay != NULL) {
92 objectEntryOID->sendCallBackDelay = *sendCallBackDelay;
94 objectEntryOID->objectEntryAID,
95 &objectEntryOID->sendCallBackDelayTimer,
97 "PublicationCallBackTimer",
98 PublicationCallBackTimer,
101 &objectEntryOID->sendCallBackDelay);
104 //create writerPublication
105 cstWriterParams.registrationRetries = 0;
106 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
107 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
108 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
109 cstWriterParams.refreshPeriod = iNtpTime; //can't refresh csChange(s)
110 cstWriterParams.repeatAnnounceTime = pp->HBNornalRate;
111 cstWriterParams.HBMaxRetries = pp->HBMaxRetries;
112 cstWriterParams.fullAcknowledge = ORTE_TRUE;
113 CSTWriterInit(d, cstWriter, objectEntryOID, guid.oid, &cstWriterParams,
114 &typeNode->typeRegister);
115 //insert cstWriter to list of publications
116 CSTWriter_insert(&d->publications, cstWriter);
117 //generate csChange for writerPublisher
118 pthread_rwlock_wrlock(&d->writerPublications.lock);
119 csChange = (CSChange *)MALLOC(sizeof(CSChange));
120 parameterUpdateCSChangeFromPublication(csChange, pp);
121 csChange->guid = guid;
122 csChange->alive = ORTE_TRUE;
123 csChange->cdrCodec.buffer = NULL;
124 debug(31, 10) ("ORTEPublicationCreate: add CSChange\n");
125 CSTWriterAddCSChange(d, &d->writerPublications, csChange);
126 pthread_rwlock_unlock(&d->writerPublications.lock);
127 pthread_rwlock_unlock(&d->publications.lock);
128 pthread_rwlock_unlock(&d->typeEntry.lock);
129 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
130 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
131 debug(31, 10) ("ORTEPublicationCreate: finished\n");
135 /*****************************************************************************/
137 ORTEPublicationDestroy(ORTEPublication *cstWriter)
142 return ORTE_BAD_HANDLE;
143 //generate csChange for writerPublisher
144 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
145 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
146 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
147 if (cstWriter->objectEntryOID->sendCallBack != NULL) {
148 eventDetach(cstWriter->domain,
149 cstWriter->objectEntryOID->objectEntryAID,
150 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
153 csChange = (CSChange *)MALLOC(sizeof(CSChange));
154 CSChangeAttributes_init_head(csChange);
155 csChange->cdrCodec.buffer = NULL;
156 csChange->guid = cstWriter->guid;
157 csChange->alive = ORTE_FALSE;
158 CSTWriterAddCSChange(cstWriter->domain,
159 &cstWriter->domain->writerPublications,
161 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
162 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
163 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
168 /*****************************************************************************/
170 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter, ORTEPublProp *pp)
173 return ORTE_BAD_HANDLE;
174 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
175 pthread_rwlock_rdlock(&cstWriter->lock);
176 *pp = *(ORTEPublProp *)cstWriter->objectEntryOID->attributes;
177 pthread_rwlock_unlock(&cstWriter->lock);
178 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
182 /*****************************************************************************/
184 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter, ORTEPublProp *pp)
189 return ORTE_BAD_HANDLE;
190 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
191 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
192 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
193 pthread_rwlock_rdlock(&cstWriter->lock);
194 csChange = (CSChange *)MALLOC(sizeof(CSChange));
195 parameterUpdateCSChangeFromPublication(csChange, pp);
196 csChange->guid = cstWriter->guid;
197 csChange->alive = ORTE_TRUE;
198 csChange->cdrCodec.buffer = NULL;
199 CSTWriterAddCSChange(cstWriter->domain,
200 &cstWriter->domain->writerPublications, csChange);
201 pthread_rwlock_unlock(&cstWriter->lock);
202 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
203 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
204 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
208 /*****************************************************************************/
210 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter, NtpTime wait,
211 unsigned int retries, unsigned int noSubscriptions)
213 unsigned int rSubscriptions;
217 return ORTE_BAD_HANDLE;
218 NtpTimeDisAssembToMs(sec, ms, wait);
220 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
221 pthread_rwlock_rdlock(&cstWriter->lock);
222 rSubscriptions = cstWriter->cstRemoteReaderCounter;
223 pthread_rwlock_unlock(&cstWriter->lock);
224 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
225 if (rSubscriptions >= noSubscriptions)
227 ORTESleepMs(sec*1000+ms);
232 /*****************************************************************************/
234 ORTEPublicationGetStatus(ORTEPublication *cstWriter, ORTEPublStatus *status)
239 return ORTE_BAD_HANDLE;
240 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
241 pthread_rwlock_rdlock(&cstWriter->lock);
242 status->strict = cstWriter->strictReliableCounter;
243 status->bestEffort = cstWriter->bestEffortsCounter;
245 ul_list_for_each(CSTWriterCSChange, cstWriter, csChange)
247 pthread_rwlock_unlock(&cstWriter->lock);
248 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
252 /*****************************************************************************/
254 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter)
259 return ORTE_BAD_HANDLE;
260 pthread_rwlock_wrlock(&cstWriter->lock);
261 pp = (ORTEPublProp *)cstWriter->objectEntryOID->attributes;
262 if (cstWriter->csChangesCounter >= pp->sendQueueSize) {
263 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
264 NtpTime expire, atime = getActualNtpTime();
265 struct timespec wtime;
266 //count max block time
267 NtpTimeAdd(expire, atime, cstWriter->domain->domainProp.baseProp.maxBlockTime);
268 NtpTimeDisAssembToUs(wtime.tv_sec, wtime.tv_nsec, expire);
269 wtime.tv_nsec *= 1000; //conver to nano seconds
270 while (cstWriter->csChangesCounter >= pp->sendQueueSize) {
271 pthread_rwlock_unlock(&cstWriter->lock);
272 //wait till a message will be processed
273 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
274 if (cstWriter->condValueCSChangeDestroyed == 0) {
275 if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
276 &cstWriter->mutexCSChangeDestroyed,
277 &wtime) == ETIMEDOUT) {
278 debug(31, 5) ("Publication: queue level (%d), queue full!!!\n",
279 cstWriter->csChangesCounter);
280 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
281 pthread_rwlock_unlock(&cstWriter->lock);
282 return ORTE_QUEUE_FULL;
285 cstWriter->condValueCSChangeDestroyed = 0;
286 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
287 pthread_rwlock_wrlock(&cstWriter->lock);
291 pthread_rwlock_unlock(&cstWriter->lock);
295 /*****************************************************************************/
297 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
298 ORTEPublicationSendParam *psp)
301 SequenceNumber snNext;
305 return ORTE_BAD_HANDLE;
306 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
307 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
308 if (!CSTRemoteReader_is_empty(cstWriter)) {
309 ORTEGetMaxSizeParam gms;
311 csChange = (CSChange *)MALLOC(sizeof(CSChange));
312 CSChangeAttributes_init_head(csChange);
313 csChange->guid = cstWriter->guid;
314 csChange->alive = ORTE_FALSE;
315 CDR_codec_init_static(&csChange->cdrCodec);
316 csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
319 csChange->cdrCodec.data_endian = psp->data_endian;
320 cstWriter->objectEntryOID->instance = psp->instance;
323 /* determine maximal size */
324 gms.host_endian = csChange->cdrCodec.host_endian;
325 gms.data_endian = csChange->cdrCodec.data_endian;
326 gms.data = cstWriter->objectEntryOID->instance;
327 gms.max_size = cstWriter->typeRegister->maxSize;
330 if (cstWriter->typeRegister->getMaxSize)
331 max_size = cstWriter->typeRegister->getMaxSize(&gms, 1);
333 max_size = cstWriter->typeRegister->maxSize;
335 /* prepare csChange */
336 CDR_buffer_init(&csChange->cdrCodec,
337 RTPS_HEADER_LENGTH+12+20+max_size); //HEADER+INFO_TS+ISSUE
338 csChange->cdrCodec.wptr_max =
339 cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
341 /* SN for next issue */
342 SeqNumberInc(snNext, cstWriter->lastSN);
345 RTPSHeaderCreate(&csChange->cdrCodec,
346 cstWriter->domain->guid.hid, cstWriter->domain->guid.aid);
347 RTPSInfoTSCreate(&csChange->cdrCodec,
349 RTPSIssueCreateHeader(&csChange->cdrCodec, 16+max_size,
350 OID_UNKNOWN, cstWriter->guid.oid, snNext);
352 //serialization routine
353 if (cstWriter->typeRegister->serialize) {
354 cstWriter->typeRegister->serialize(
356 cstWriter->objectEntryOID->instance);
358 //no deserialization -> memcpy
359 CDR_buffer_puts(&csChange->cdrCodec,
360 cstWriter->objectEntryOID->instance, max_size);
363 debug(31, 10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
364 max_size, snNext.low);
366 CSTWriterAddCSChange(cstWriter->domain,
370 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
371 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
375 /*****************************************************************************/
377 ORTEPublicationSendEx(ORTEPublication *cstWriter,
378 ORTEPublicationSendParam *psp)
383 return ORTE_BAD_HANDLE;
384 //prepare sending queue
385 r = ORTEPublicationPrepareQueue(cstWriter);
389 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
390 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
391 pthread_rwlock_wrlock(&cstWriter->lock);
392 r = ORTEPublicationSendLocked(cstWriter, psp);
393 pthread_rwlock_unlock(&cstWriter->lock);
394 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
395 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
399 /*****************************************************************************/
401 ORTEPublicationSend(ORTEPublication *cstWriter)
403 return ORTEPublicationSendEx(cstWriter, NULL);
407 /*****************************************************************************/
409 ORTEPublicationGetInstance(ORTEPublication *cstWriter)
411 return cstWriter->objectEntryOID->instance;