2 * $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 31 Functions working over publication
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(PublicationList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 publications, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
31 void *instance,NtpTime *persistence,int strength,
32 ORTESendCallBack sendCallBack,void *sendCallBackParam,
33 NtpTime *sendCallBackDelay) {
36 CSTWriterParams cstWriterParams;
38 ObjectEntryOID *objectEntryOID;
42 cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
43 if (!cstWriter) return NULL;
44 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46 pthread_rwlock_rdlock(&d->typeEntry.lock);
47 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48 pthread_rwlock_unlock(&d->typeEntry.lock);
49 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51 printf("before call ORTEPublicationCreate is necessary to register \n\
52 ser./deser. function for a given typeName!!!\n");
55 pthread_rwlock_wrlock(&d->publications.lock);
56 //generate new guid of publisher
57 d->publications.counter++;
58 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59 guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
60 pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
61 memcpy(pp,&d->publPropDefault,sizeof(ORTEPublProp));
62 strcpy(pp->topic,topic);
63 strcpy(pp->typeName,typeName);
64 pp->persistence=*persistence;
65 pp->strength=strength;
66 pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS |
67 PID_VALUE_RELIABILITY_STRICT;
68 //insert object to structure objectEntry
69 objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
70 objectEntryOID->private=ORTE_TRUE;
71 objectEntryOID->instance=instance;
72 objectEntryOID->sendCallBack=sendCallBack;
73 objectEntryOID->callBackParam=sendCallBackParam;
74 if (objectEntryOID->sendCallBack!=NULL) {
75 if (sendCallBackDelay!=NULL) {
76 objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
78 objectEntryOID->objectEntryAID,
79 &objectEntryOID->sendCallBackDelayTimer,
81 "PublicationCallBackTimer",
82 PublicationCallBackTimer,
85 &objectEntryOID->sendCallBackDelay);
88 //create writerPublication
89 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
90 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
91 cstWriterParams.refreshPeriod=iNtpTime; //cann't refresh csChange(s)
92 cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
93 cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
94 cstWriterParams.fullAcknowledge=ORTE_TRUE;
95 CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
96 &typeNode->typeRegister);
97 //insert cstWriter to list of publications
98 CSTWriter_insert(&d->publications,cstWriter);
99 //generate csChange for writerPublisher
100 pthread_rwlock_wrlock(&d->writerPublications.lock);
101 csChange=(CSChange*)MALLOC(sizeof(CSChange));
102 parameterUpdateCSChangeFromPublication(csChange,pp);
104 csChange->alive=ORTE_TRUE;
105 csChange->cdrStream.buffer=NULL;
106 CSTWriterAddCSChange(d,&d->writerPublications,csChange);
107 pthread_rwlock_unlock(&d->writerPublications.lock);
108 pthread_rwlock_unlock(&d->publications.lock);
109 pthread_rwlock_unlock(&d->typeEntry.lock);
110 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
111 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
115 /*****************************************************************************/
117 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
120 if (!cstWriter) return -1;
121 //generate csChange for writerPublisher
122 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
123 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
124 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
125 if (cstWriter->objectEntryOID->sendCallBack!=NULL) {
126 eventDetach(cstWriter->domain,
127 cstWriter->objectEntryOID->objectEntryAID,
128 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
131 csChange=(CSChange*)MALLOC(sizeof(CSChange));
132 CSChangeAttributes_init_head(csChange);
133 csChange->cdrStream.buffer=NULL;
134 csChange->guid=cstWriter->guid;
135 csChange->alive=ORTE_FALSE;
136 CSTWriterAddCSChange(cstWriter->domain,
137 &cstWriter->domain->writerPublications,
139 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
140 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
141 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
146 /*****************************************************************************/
148 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
149 pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
150 pthread_rwlock_rdlock(&cstWriter->lock);
151 *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
152 pthread_rwlock_unlock(&cstWriter->lock);
153 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
157 /*****************************************************************************/
159 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
162 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
163 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
164 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
165 pthread_rwlock_rdlock(&cstWriter->lock);
166 csChange=(CSChange*)MALLOC(sizeof(CSChange));
167 parameterUpdateCSChangeFromPublication(csChange,pp);
168 csChange->guid=cstWriter->guid;
169 csChange->alive=ORTE_TRUE;
170 csChange->cdrStream.buffer=NULL;
171 CSTWriterAddCSChange(cstWriter->domain,
172 &cstWriter->domain->writerPublications,csChange);
173 pthread_rwlock_unlock(&cstWriter->lock);
174 pthread_rwlock_unlock(&cstWriter->domain->publications.lock);
175 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
176 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
180 /*****************************************************************************/
182 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
183 unsigned int retries,unsigned int noSubscriptions) {
187 /*****************************************************************************/
189 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
193 /*****************************************************************************/
195 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
198 if (!cstWriter) return -1;
199 pthread_rwlock_wrlock(&cstWriter->lock);
200 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
201 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
202 if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
203 NtpTime expire,atime=getActualNtpTime();
204 struct timespec wtime;
205 //Count max block time
206 NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
207 NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
208 wtime.tv_nsec*=1000; //conver to nano seconds
209 pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
210 pthread_rwlock_unlock(&cstWriter->lock);
211 pthread_mutex_timedlock(
212 &cstWriter->mutexCSChangeDestroyed,
214 pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
215 pthread_rwlock_wrlock(&cstWriter->lock);
216 pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
217 if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
218 debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
219 cstWriter->csChangesCounter);
220 pthread_rwlock_unlock(&cstWriter->lock);
225 pthread_rwlock_unlock(&cstWriter->lock);
229 /*****************************************************************************/
231 ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
233 SequenceNumber snNext;
235 if (!cstWriter) return -1;
236 pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
237 pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
238 if (!CSTRemoteReader_is_empty(cstWriter)) {
239 csChange=(CSChange*)MALLOC(sizeof(CSChange));
240 CSChangeAttributes_init_head(csChange);
241 csChange->guid=cstWriter->guid;
242 csChange->alive=ORTE_FALSE;
243 csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+ //HEADER+INFO_TS+ISSUE
244 +20+cstWriter->typeRegister->getMaxSize;
245 csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
246 csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
247 SeqNumberInc(snNext,cstWriter->lastSN);
248 RTPSHeaderCreate(csChange->cdrStream.buffer,
249 cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
250 RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
251 12,getActualNtpTime());
252 RTPSIssueCreateHeader(csChange->cdrStream.buffer+
253 RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
254 OID_UNKNOWN,cstWriter->guid.oid,snNext);
255 //serialization routine
256 if (cstWriter->typeRegister->serialize) {
257 cstWriter->typeRegister->serialize(
258 &csChange->cdrStream,
259 cstWriter->objectEntryOID->instance);
261 //no deserialization -> memcpy
262 memcpy(csChange->cdrStream.bufferPtr,
263 cstWriter->objectEntryOID->instance,
264 cstWriter->typeRegister->getMaxSize);
265 csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
267 csChange->cdrStream.needByteSwap=ORTE_FALSE;
268 CSTWriterAddCSChange(cstWriter->domain,
272 pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
273 pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
277 /*****************************************************************************/
279 ORTEPublicationSend(ORTEPublication *cstWriter) {
282 if (!cstWriter) return -1;
283 //PrepareSendingQueue
284 if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
286 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
287 pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
288 pthread_rwlock_wrlock(&cstWriter->lock);
289 r=ORTEPublicationSendLocked(cstWriter);
290 pthread_rwlock_unlock(&cstWriter->lock);
291 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
292 pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);