2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(PublicationList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 publications, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
31 void *instance,NtpTime *persistence,int strength,
32 ORTESendCallBack sendCallBack,void *sendCallBackParam,
33 NtpTime *sendCallBackDelay) {
36 CSTWriterParams cstWriterParams;
38 ObjectEntryOID *objectEntryOID;
42 debug(31,10) ("ORTEPublicationCreate: start\n");
43 cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
44 if (!cstWriter) return NULL;
45 debug(31,10) ("ORTEPublicationCreate: memory OK\n");
46 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
47 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
48 pthread_rwlock_rdlock(&d->typeEntry.lock);
49 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
50 pthread_rwlock_unlock(&d->typeEntry.lock);
51 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
52 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
53 printf("before call ORTEPublicationCreate is necessary to register \n\
54 ser./deser. function for a given typeName!!!\n");
57 pthread_rwlock_wrlock(&d->publications.lock);
58 //generate new guid of publisher
59 d->publications.counter++;
60 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
61 guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
62 pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
63 memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
64 strcpy(pp->topic,topic);
65 strcpy(pp->typeName,typeName);
66 pp->persistence=*persistence;
67 pp->strength=strength;
68 pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS |
69 PID_VALUE_RELIABILITY_STRICT;
70 //insert object to structure objectEntry
71 objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
72 objectEntryOID->privateCreated=ORTE_TRUE;
73 objectEntryOID->instance=instance;
74 objectEntryOID->sendCallBack=sendCallBack;
75 objectEntryOID->callBackParam=sendCallBackParam;
76 if (objectEntryOID->sendCallBack!=NULL) {
77 if (sendCallBackDelay!=NULL) {
78 objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
80 objectEntryOID->objectEntryAID,
81 &objectEntryOID->sendCallBackDelayTimer,
83 "PublicationCallBackTimer",
84 PublicationCallBackTimer,
87 &objectEntryOID->sendCallBackDelay);
90 //create writerPublication
91 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
92 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
93 cstWriterParams.refreshPeriod=iNtpTime; //cann't refresh csChange(s)
94 cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
95 cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
96 cstWriterParams.fullAcknowledge=ORTE_TRUE;
97 CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
98 &typeNode->typeRegister);
99 //insert cstWriter to list of publications
100 CSTWriter_insert(&d->publications,cstWriter);
101 //generate csChange for writerPublisher
102 pthread_rwlock_wrlock(&d->writerPublications.lock);
103 csChange=(CSChange*)MALLOC(sizeof(CSChange));
104 parameterUpdateCSChangeFromPublication(csChange,pp);
106 csChange->alive=ORTE_TRUE;
107 csChange->cdrStream.buffer=NULL;
108 debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
109 CSTWriterAddCSChange(d,&d->writerPublications,csChange);
110 pthread_rwlock_unlock(&d->writerPublications.lock);
111 pthread_rwlock_unlock(&d->publications.lock);
112 pthread_rwlock_unlock(&d->typeEntry.lock);
113 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
114 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
115 debug(31,10) ("ORTEPublicationCreate: finished\n");
119 /*****************************************************************************/
121 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
124 if (!cstWriter) return ORTE_BAD_HANDLE;
125 //generate csChange for writerPublisher
126 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
127 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
128 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
129 if (cstWriter->objectEntryOID->sendCallBack!=NULL) {
130 eventDetach(cstWriter->domain,
131 cstWriter->objectEntryOID->objectEntryAID,
132 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
135 csChange=(CSChange*)MALLOC(sizeof(CSChange));
136 CSChangeAttributes_init_head(csChange);
137 csChange->cdrStream.buffer=NULL;
138 csChange->guid=cstWriter->guid;
139 csChange->alive=ORTE_FALSE;
140 CSTWriterAddCSChange(cstWriter->domain,
141 &cstWriter->domain->writerPublications,
143 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
144 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
145 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
150 /*****************************************************************************/
152 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
153 if (!cstWriter) return ORTE_BAD_HANDLE;
154 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
155 pthread_rwlock_rdlock(&cstWriter->lock);
156 *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
157 pthread_rwlock_unlock(&cstWriter->lock);
158 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
162 /*****************************************************************************/
164 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
167 if (!cstWriter) return ORTE_BAD_HANDLE;
168 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
169 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
170 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
171 pthread_rwlock_rdlock(&cstWriter->lock);
172 csChange=(CSChange*)MALLOC(sizeof(CSChange));
173 parameterUpdateCSChangeFromPublication(csChange,pp);
174 csChange->guid=cstWriter->guid;
175 csChange->alive=ORTE_TRUE;
176 csChange->cdrStream.buffer=NULL;
177 CSTWriterAddCSChange(cstWriter->domain,
178 &cstWriter->domain->writerPublications,csChange);
179 pthread_rwlock_unlock(&cstWriter->lock);
180 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
181 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
182 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
186 /*****************************************************************************/
188 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
189 unsigned int retries,unsigned int noSubscriptions) {
190 unsigned int rSubscriptions;
193 if (!cstWriter) return ORTE_BAD_HANDLE;
194 NtpTimeDisAssembToMs(sec,ms,wait);
196 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
197 pthread_rwlock_rdlock(&cstWriter->lock);
198 rSubscriptions=cstWriter->cstRemoteReaderCounter;
199 pthread_rwlock_unlock(&cstWriter->lock);
200 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
201 if (rSubscriptions>=noSubscriptions)
203 ORTESleepMs(sec*1000+ms);
208 /*****************************************************************************/
210 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
213 if (!cstWriter) return ORTE_BAD_HANDLE;
214 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
215 pthread_rwlock_rdlock(&cstWriter->lock);
216 status->strict=cstWriter->strictReliableCounter;
217 status->bestEffort=cstWriter->bestEffortsCounter;
219 ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
221 pthread_rwlock_unlock(&cstWriter->lock);
222 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
226 /*****************************************************************************/
228 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
231 if (!cstWriter) return ORTE_BAD_HANDLE;
232 pthread_rwlock_wrlock(&cstWriter->lock);
233 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
234 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
235 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
236 NtpTime expire,atime=getActualNtpTime();
237 struct timespec wtime;
238 //count max block time
239 NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
240 NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
241 wtime.tv_nsec*=1000; //conver to nano seconds
242 pthread_rwlock_unlock(&cstWriter->lock);
243 //wait till a message will be processed
244 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
245 if (cstWriter->condValueCSChangeDestroyed==0) {
246 pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
247 &cstWriter->mutexCSChangeDestroyed,
250 cstWriter->condValueCSChangeDestroyed=0;
251 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
253 pthread_rwlock_wrlock(&cstWriter->lock);
254 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
255 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
256 debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
257 cstWriter->csChangesCounter);
258 pthread_rwlock_unlock(&cstWriter->lock);
259 return ORTE_QUEUE_FULL;
263 pthread_rwlock_unlock(&cstWriter->lock);
267 /*****************************************************************************/
269 ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
271 SequenceNumber snNext;
273 if (!cstWriter) return ORTE_BAD_HANDLE;
274 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
275 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
276 if (!CSTRemoteReader_is_empty(cstWriter)) {
277 csChange=(CSChange*)MALLOC(sizeof(CSChange));
278 CSChangeAttributes_init_head(csChange);
279 csChange->guid=cstWriter->guid;
280 csChange->alive=ORTE_FALSE;
281 csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+ //HEADER+INFO_TS+ISSUE
282 +20+cstWriter->typeRegister->getMaxSize;
283 csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
284 csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
285 SeqNumberInc(snNext,cstWriter->lastSN);
286 RTPSHeaderCreate(csChange->cdrStream.buffer,
287 cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
288 RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
289 12,getActualNtpTime());
290 RTPSIssueCreateHeader(csChange->cdrStream.buffer+
291 RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
292 OID_UNKNOWN,cstWriter->guid.oid,snNext);
293 //serialization routine
294 if (cstWriter->typeRegister->serialize) {
295 cstWriter->typeRegister->serialize(
296 &csChange->cdrStream,
297 cstWriter->objectEntryOID->instance);
299 //no deserialization -> memcpy
300 memcpy(csChange->cdrStream.bufferPtr,
301 cstWriter->objectEntryOID->instance,
302 cstWriter->typeRegister->getMaxSize);
303 csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
305 csChange->cdrStream.needByteSwap=ORTE_FALSE;
306 debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
307 cstWriter->typeRegister->getMaxSize);
308 CSTWriterAddCSChange(cstWriter->domain,
312 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
313 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
317 /*****************************************************************************/
319 ORTEPublicationSend(ORTEPublication *cstWriter) {
322 if (!cstWriter) return ORTE_BAD_HANDLE;
323 //prepare sending queue
324 if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
326 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
327 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
328 pthread_rwlock_wrlock(&cstWriter->lock);
329 r=ORTEPublicationSendLocked(cstWriter);
330 pthread_rwlock_unlock(&cstWriter->lock);
331 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
332 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);