]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEPublication.c
Add missing FREE()
[orte.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(PublicationList, 
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        publications, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTEPublication * 
40 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
41     void *instance,NtpTime *persistence,int strength,
42     ORTESendCallBack sendCallBack,void *sendCallBackParam,
43     NtpTime *sendCallBackDelay) {
44   GUID_RTPS             guid;
45   CSTWriter             *cstWriter;
46   CSTWriterParams       cstWriterParams;
47   ORTEPublProp          *pp;
48   ObjectEntryOID        *objectEntryOID;   
49   CSChange              *csChange;
50   TypeNode              *typeNode;
51   
52   debug(31,10) ("ORTEPublicationCreate: start\n");
53   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
54   if (!cstWriter) return NULL;
55   debug(31,10) ("ORTEPublicationCreate: memory OK\n");
56   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58   pthread_rwlock_rdlock(&d->typeEntry.lock);    
59   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
60     pthread_rwlock_unlock(&d->typeEntry.lock);    
61     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63     printf("before call ORTEPublicationCreate is necessary to register \n\
64             ser./deser. function for a given typeName!!!\n");
65     FREE(cstWriter);
66     return NULL;
67   }  
68   pthread_rwlock_wrlock(&d->publications.lock);
69   //generate new guid of publisher
70   d->publications.counter++;
71   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
72   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
73   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
74   memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
75   strcpy((char *)pp->topic,topic);
76   strcpy((char *)pp->typeName,typeName);
77   pp->persistence=*persistence;
78   pp->strength=strength;
79   pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS | 
80                          PID_VALUE_RELIABILITY_STRICT;
81   //insert object to structure objectEntry
82   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
83   objectEntryOID->privateCreated=ORTE_TRUE;
84   objectEntryOID->instance=instance;
85   objectEntryOID->sendCallBack=sendCallBack;
86   objectEntryOID->callBackParam=sendCallBackParam;
87   if (objectEntryOID->sendCallBack!=NULL) {  
88     if (sendCallBackDelay!=NULL) {
89       objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
90       eventAdd(d,
91           objectEntryOID->objectEntryAID,
92           &objectEntryOID->sendCallBackDelayTimer,
93           0,   
94           "PublicationCallBackTimer",
95           PublicationCallBackTimer,
96           &cstWriter->lock,
97           cstWriter,
98           &objectEntryOID->sendCallBackDelay);               
99     }
100   }
101   //create writerPublication
102   cstWriterParams.registrationRetries=0; 
103   NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
104   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
105   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
106   cstWriterParams.refreshPeriod=iNtpTime;  //can't refresh csChange(s)
107   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
108   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
109   cstWriterParams.fullAcknowledge=ORTE_TRUE;
110   CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
111                 &typeNode->typeRegister);
112   //insert cstWriter to list of publications
113   CSTWriter_insert(&d->publications,cstWriter);
114   //generate csChange for writerPublisher
115   pthread_rwlock_wrlock(&d->writerPublications.lock);
116   csChange=(CSChange*)MALLOC(sizeof(CSChange));
117   parameterUpdateCSChangeFromPublication(csChange,pp);
118   csChange->guid=guid;
119   csChange->alive=ORTE_TRUE;
120   csChange->cdrCodec.buffer=NULL;
121   debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
122   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
123   pthread_rwlock_unlock(&d->writerPublications.lock);
124   pthread_rwlock_unlock(&d->publications.lock);
125   pthread_rwlock_unlock(&d->typeEntry.lock);    
126   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
127   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
128   debug(31,10) ("ORTEPublicationCreate: finished\n");
129   return cstWriter;
130 }
131
132 /*****************************************************************************/
133 int
134 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
135   CSChange              *csChange;
136
137   if (!cstWriter) return ORTE_BAD_HANDLE;
138   //generate csChange for writerPublisher
139   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
140   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
141   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
142   if (cstWriter->objectEntryOID->sendCallBack!=NULL) {  
143     eventDetach(cstWriter->domain,
144         cstWriter->objectEntryOID->objectEntryAID,
145         &cstWriter->objectEntryOID->sendCallBackDelayTimer,
146         0);
147   }
148   csChange=(CSChange*)MALLOC(sizeof(CSChange));
149   CSChangeAttributes_init_head(csChange);
150   csChange->cdrCodec.buffer=NULL;
151   csChange->guid=cstWriter->guid;
152   csChange->alive=ORTE_FALSE;
153   CSTWriterAddCSChange(cstWriter->domain,
154                        &cstWriter->domain->writerPublications,
155                        csChange);
156   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
157   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
158   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
159   return ORTE_OK;
160 }
161
162
163 /*****************************************************************************/
164 int
165 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
166   if (!cstWriter) return ORTE_BAD_HANDLE;
167   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
168   pthread_rwlock_rdlock(&cstWriter->lock);
169   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
170   pthread_rwlock_unlock(&cstWriter->lock);
171   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
172   return ORTE_OK;
173 }
174
175 /*****************************************************************************/
176 int
177 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
178   CSChange              *csChange;
179
180   if (!cstWriter) return ORTE_BAD_HANDLE;
181   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
182   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
183   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
184   pthread_rwlock_rdlock(&cstWriter->lock);
185   csChange=(CSChange*)MALLOC(sizeof(CSChange));
186   parameterUpdateCSChangeFromPublication(csChange,pp);
187   csChange->guid=cstWriter->guid;
188   csChange->alive=ORTE_TRUE;
189   csChange->cdrCodec.buffer=NULL;
190   CSTWriterAddCSChange(cstWriter->domain,
191       &cstWriter->domain->writerPublications,csChange);
192   pthread_rwlock_unlock(&cstWriter->lock);
193   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
194   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
195   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
196   return ORTE_OK;
197 }
198
199 /*****************************************************************************/
200 int
201 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
202     unsigned int retries,unsigned int noSubscriptions) {
203   unsigned int rSubscriptions;
204   uint32_t sec,ms;
205
206   if (!cstWriter) return ORTE_BAD_HANDLE;
207   NtpTimeDisAssembToMs(sec,ms,wait);
208   do {
209     pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
210     pthread_rwlock_rdlock(&cstWriter->lock);
211     rSubscriptions=cstWriter->cstRemoteReaderCounter;
212     pthread_rwlock_unlock(&cstWriter->lock);
213     pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
214     if (rSubscriptions>=noSubscriptions)
215       return ORTE_OK;
216     ORTESleepMs(sec*1000+ms);
217   } while (retries--);
218   return ORTE_TIMEOUT;  
219 }
220
221 /*****************************************************************************/
222 int
223 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
224   CSChange *csChange;
225
226   if (!cstWriter) return ORTE_BAD_HANDLE;
227   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
228   pthread_rwlock_rdlock(&cstWriter->lock);
229   status->strict=cstWriter->strictReliableCounter;
230   status->bestEffort=cstWriter->bestEffortsCounter;
231   status->issues=0;
232   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
233     status->issues++;
234   pthread_rwlock_unlock(&cstWriter->lock);
235   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
236   return ORTE_OK;
237 }
238
239 /*****************************************************************************/
240 int
241 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
242   ORTEPublProp          *pp;
243   
244   if (!cstWriter) return ORTE_BAD_HANDLE;
245   pthread_rwlock_wrlock(&cstWriter->lock);
246   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
247   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
248     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
249       NtpTime             expire,atime=getActualNtpTime();
250       struct timespec     wtime; 
251       //count max block time
252       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
253       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
254       wtime.tv_nsec*=1000;  //conver to nano seconds
255       while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
256         pthread_rwlock_unlock(&cstWriter->lock);    
257         //wait till a message will be processed
258         pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
259         if (cstWriter->condValueCSChangeDestroyed==0) {
260           if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
261                                  &cstWriter->mutexCSChangeDestroyed,
262                                  &wtime)==ETIMEDOUT) {
263             debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
264                           cstWriter->csChangesCounter);
265             pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
266             pthread_rwlock_unlock(&cstWriter->lock);
267             return ORTE_QUEUE_FULL;
268           }
269         }
270         cstWriter->condValueCSChangeDestroyed=0;
271         pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
272         pthread_rwlock_wrlock(&cstWriter->lock);    
273       }
274     }
275   }
276   pthread_rwlock_unlock(&cstWriter->lock);
277   return ORTE_OK;
278 }
279
280 /*****************************************************************************/
281 int
282 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
283     ORTEPublicationSendParam *psp) {
284   CSChange              *csChange;
285   SequenceNumber        snNext;
286   int                   max_size;
287   
288   if (!cstWriter) return ORTE_BAD_HANDLE;
289   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
290   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
291   if (!CSTRemoteReader_is_empty(cstWriter)) {
292     ORTEGetMaxSizeParam gms;
293
294     csChange=(CSChange*)MALLOC(sizeof(CSChange));
295     CSChangeAttributes_init_head(csChange);
296     csChange->guid=cstWriter->guid;
297     csChange->alive=ORTE_FALSE;
298     CDR_codec_init_static(&csChange->cdrCodec);
299     csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
300
301     if (psp) {
302       csChange->cdrCodec.data_endian = psp->data_endian;
303       cstWriter->objectEntryOID->instance=psp->instance;
304     }
305
306     /* determine maximal size */
307     gms.host_endian=csChange->cdrCodec.host_endian;
308     gms.data_endian=csChange->cdrCodec.data_endian;
309     gms.data=cstWriter->objectEntryOID->instance;
310     gms.max_size=cstWriter->typeRegister->maxSize;
311     gms.recv_size=-1;
312     gms.csize=0;
313     if (cstWriter->typeRegister->getMaxSize)
314       max_size=cstWriter->typeRegister->getMaxSize(&gms,1);
315     else
316       max_size=cstWriter->typeRegister->maxSize;
317     
318     /* prepare csChange */
319     CDR_buffer_init(&csChange->cdrCodec,
320                     RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
321     csChange->cdrCodec.wptr_max=
322         cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
323
324     /* SN for next issue */
325     SeqNumberInc(snNext,cstWriter->lastSN);
326
327     /* prepare data */
328     RTPSHeaderCreate(&csChange->cdrCodec,
329                      cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
330     RTPSInfoTSCreate(&csChange->cdrCodec,
331                      getActualNtpTime());
332     RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
333                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
334
335     //serialization routine
336     if (cstWriter->typeRegister->serialize) {
337       cstWriter->typeRegister->serialize(
338           &csChange->cdrCodec,
339           cstWriter->objectEntryOID->instance);
340     } else {
341       //no deserialization -> memcpy
342       CDR_buffer_puts(&csChange->cdrCodec,
343                       cstWriter->objectEntryOID->instance,max_size);
344     }
345
346     debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
347                    max_size,snNext.low);
348                   
349     CSTWriterAddCSChange(cstWriter->domain,
350                         cstWriter,
351                         csChange);
352   }
353   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
354   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
355   return ORTE_OK;
356 }
357
358 /*****************************************************************************/
359 int
360 ORTEPublicationSendEx(ORTEPublication *cstWriter,
361     ORTEPublicationSendParam *psp) {
362   int             r;
363
364   if (!cstWriter) return ORTE_BAD_HANDLE;
365   //prepare sending queue
366   r=ORTEPublicationPrepareQueue(cstWriter);
367   if (r<0) 
368     return r;
369   //send
370   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
371   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
372   pthread_rwlock_wrlock(&cstWriter->lock);
373   r=ORTEPublicationSendLocked(cstWriter,psp);
374   pthread_rwlock_unlock(&cstWriter->lock);
375   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
376   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
377   return r;
378 }
379
380 /*****************************************************************************/
381 inline int
382 ORTEPublicationSend(ORTEPublication *cstWriter) {
383   return ORTEPublicationSendEx(cstWriter,NULL);
384 }
385
386
387 /*****************************************************************************/
388 inline void *
389 ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
390   return cstWriter->objectEntryOID->instance;
391 }