]> rtime.felk.cvut.cz Git - orte/eurobot.git/blob - orte/liborte/ORTEPublication.c
updated email address - petr@smoliku.cz
[orte/eurobot.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(PublicationList, 
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        publications, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTEPublication * 
40 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
41     void *instance,NtpTime *persistence,int strength,
42     ORTESendCallBack sendCallBack,void *sendCallBackParam,
43     NtpTime *sendCallBackDelay) {
44   GUID_RTPS             guid;
45   CSTWriter             *cstWriter;
46   CSTWriterParams       cstWriterParams;
47   ORTEPublProp          *pp;
48   ObjectEntryOID        *objectEntryOID;   
49   CSChange              *csChange;
50   TypeNode              *typeNode;
51   
52   debug(31,10) ("ORTEPublicationCreate: start\n");
53   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
54   if (!cstWriter) return NULL;
55   debug(31,10) ("ORTEPublicationCreate: memory OK\n");
56   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58   pthread_rwlock_rdlock(&d->typeEntry.lock);    
59   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
60     pthread_rwlock_unlock(&d->typeEntry.lock);    
61     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63     printf("before call ORTEPublicationCreate is necessary to register \n\
64             ser./deser. function for a given typeName!!!\n");
65     return NULL;
66   }  
67   pthread_rwlock_wrlock(&d->publications.lock);
68   //generate new guid of publisher
69   d->publications.counter++;
70   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
71   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
72   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
73   memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
74   strcpy((char *)pp->topic,topic);
75   strcpy((char *)pp->typeName,typeName);
76   pp->persistence=*persistence;
77   pp->strength=strength;
78   pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS | 
79                          PID_VALUE_RELIABILITY_STRICT;
80   //insert object to structure objectEntry
81   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
82   objectEntryOID->privateCreated=ORTE_TRUE;
83   objectEntryOID->instance=instance;
84   objectEntryOID->sendCallBack=sendCallBack;
85   objectEntryOID->callBackParam=sendCallBackParam;
86   if (objectEntryOID->sendCallBack!=NULL) {  
87     if (sendCallBackDelay!=NULL) {
88       objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
89       eventAdd(d,
90           objectEntryOID->objectEntryAID,
91           &objectEntryOID->sendCallBackDelayTimer,
92           0,   
93           "PublicationCallBackTimer",
94           PublicationCallBackTimer,
95           &cstWriter->lock,
96           cstWriter,
97           &objectEntryOID->sendCallBackDelay);               
98     }
99   }
100   //create writerPublication
101   cstWriterParams.registrationRetries=0; 
102   NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
103   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
104   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
105   cstWriterParams.refreshPeriod=iNtpTime;  //can't refresh csChange(s)
106   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
107   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
108   cstWriterParams.fullAcknowledge=ORTE_TRUE;
109   CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
110                 &typeNode->typeRegister);
111   //insert cstWriter to list of publications
112   CSTWriter_insert(&d->publications,cstWriter);
113   //generate csChange for writerPublisher
114   pthread_rwlock_wrlock(&d->writerPublications.lock);
115   csChange=(CSChange*)MALLOC(sizeof(CSChange));
116   parameterUpdateCSChangeFromPublication(csChange,pp);
117   csChange->guid=guid;
118   csChange->alive=ORTE_TRUE;
119   csChange->cdrCodec.buffer=NULL;
120   debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
121   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
122   pthread_rwlock_unlock(&d->writerPublications.lock);
123   pthread_rwlock_unlock(&d->publications.lock);
124   pthread_rwlock_unlock(&d->typeEntry.lock);    
125   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
126   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
127   debug(31,10) ("ORTEPublicationCreate: finished\n");
128   return cstWriter;
129 }
130
131 /*****************************************************************************/
132 int
133 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
134   CSChange              *csChange;
135
136   if (!cstWriter) return ORTE_BAD_HANDLE;
137   //generate csChange for writerPublisher
138   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
139   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
140   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
141   if (cstWriter->objectEntryOID->sendCallBack!=NULL) {  
142     eventDetach(cstWriter->domain,
143         cstWriter->objectEntryOID->objectEntryAID,
144         &cstWriter->objectEntryOID->sendCallBackDelayTimer,
145         0);
146   }
147   csChange=(CSChange*)MALLOC(sizeof(CSChange));
148   CSChangeAttributes_init_head(csChange);
149   csChange->cdrCodec.buffer=NULL;
150   csChange->guid=cstWriter->guid;
151   csChange->alive=ORTE_FALSE;
152   CSTWriterAddCSChange(cstWriter->domain,
153                        &cstWriter->domain->writerPublications,
154                        csChange);
155   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
156   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
157   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
158   return ORTE_OK;
159 }
160
161
162 /*****************************************************************************/
163 int
164 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
165   if (!cstWriter) return ORTE_BAD_HANDLE;
166   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
167   pthread_rwlock_rdlock(&cstWriter->lock);
168   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
169   pthread_rwlock_unlock(&cstWriter->lock);
170   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
171   return ORTE_OK;
172 }
173
174 /*****************************************************************************/
175 int
176 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
177   CSChange              *csChange;
178
179   if (!cstWriter) return ORTE_BAD_HANDLE;
180   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
181   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
182   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
183   pthread_rwlock_rdlock(&cstWriter->lock);
184   csChange=(CSChange*)MALLOC(sizeof(CSChange));
185   parameterUpdateCSChangeFromPublication(csChange,pp);
186   csChange->guid=cstWriter->guid;
187   csChange->alive=ORTE_TRUE;
188   csChange->cdrCodec.buffer=NULL;
189   CSTWriterAddCSChange(cstWriter->domain,
190       &cstWriter->domain->writerPublications,csChange);
191   pthread_rwlock_unlock(&cstWriter->lock);
192   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
193   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
194   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
195   return ORTE_OK;
196 }
197
198 /*****************************************************************************/
199 int
200 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
201     unsigned int retries,unsigned int noSubscriptions) {
202   unsigned int rSubscriptions;
203   uint32_t sec,ms;
204
205   if (!cstWriter) return ORTE_BAD_HANDLE;
206   NtpTimeDisAssembToMs(sec,ms,wait);
207   do {
208     pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
209     pthread_rwlock_rdlock(&cstWriter->lock);
210     rSubscriptions=cstWriter->cstRemoteReaderCounter;
211     pthread_rwlock_unlock(&cstWriter->lock);
212     pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
213     if (rSubscriptions>=noSubscriptions)
214       return ORTE_OK;
215     ORTESleepMs(sec*1000+ms);
216   } while (retries--);
217   return ORTE_TIMEOUT;  
218 }
219
220 /*****************************************************************************/
221 int
222 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
223   CSChange *csChange;
224
225   if (!cstWriter) return ORTE_BAD_HANDLE;
226   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
227   pthread_rwlock_rdlock(&cstWriter->lock);
228   status->strict=cstWriter->strictReliableCounter;
229   status->bestEffort=cstWriter->bestEffortsCounter;
230   status->issues=0;
231   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
232     status->issues++;
233   pthread_rwlock_unlock(&cstWriter->lock);
234   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
235   return ORTE_OK;
236 }
237
238 /*****************************************************************************/
239 int
240 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
241   ORTEPublProp          *pp;
242   
243   if (!cstWriter) return ORTE_BAD_HANDLE;
244   pthread_rwlock_wrlock(&cstWriter->lock);
245   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
246   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
247     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
248       NtpTime             expire,atime=getActualNtpTime();
249       struct timespec     wtime; 
250       //count max block time
251       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
252       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
253       wtime.tv_nsec*=1000;  //conver to nano seconds
254       while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
255         pthread_rwlock_unlock(&cstWriter->lock);    
256         //wait till a message will be processed
257         pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
258         if (cstWriter->condValueCSChangeDestroyed==0) {
259           if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
260                                  &cstWriter->mutexCSChangeDestroyed,
261                                  &wtime)==ETIMEDOUT) {
262             debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
263                           cstWriter->csChangesCounter);
264             pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
265             pthread_rwlock_unlock(&cstWriter->lock);
266             return ORTE_QUEUE_FULL;
267           }
268         }
269         cstWriter->condValueCSChangeDestroyed=0;
270         pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
271         pthread_rwlock_wrlock(&cstWriter->lock);    
272       }
273     }
274   }
275   pthread_rwlock_unlock(&cstWriter->lock);
276   return ORTE_OK;
277 }
278
279 /*****************************************************************************/
280 int
281 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
282     ORTEPublicationSendParam *psp) {
283   CSChange              *csChange;
284   SequenceNumber        snNext;
285   int                   max_size;
286   
287   if (!cstWriter) return ORTE_BAD_HANDLE;
288   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
289   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
290   if (!CSTRemoteReader_is_empty(cstWriter)) {
291     ORTEGetMaxSizeParam gms;
292
293     csChange=(CSChange*)MALLOC(sizeof(CSChange));
294     CSChangeAttributes_init_head(csChange);
295     csChange->guid=cstWriter->guid;
296     csChange->alive=ORTE_FALSE;
297     CDR_codec_init_static(&csChange->cdrCodec);
298     csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
299
300     if (psp) {
301       csChange->cdrCodec.data_endian = psp->data_endian;
302       cstWriter->objectEntryOID->instance=psp->instance;
303     }
304
305     /* determine maximal size */
306     gms.host_endian=csChange->cdrCodec.host_endian;
307     gms.data_endian=csChange->cdrCodec.data_endian;
308     gms.data=cstWriter->objectEntryOID->instance;
309     gms.max_size=cstWriter->typeRegister->maxSize;
310     gms.recv_size=-1;
311     gms.csize=0;
312     if (cstWriter->typeRegister->getMaxSize)
313       max_size=cstWriter->typeRegister->getMaxSize(&gms,1);
314     else
315       max_size=cstWriter->typeRegister->maxSize;
316     
317     /* prepare csChange */
318     CDR_buffer_init(&csChange->cdrCodec,
319                     RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
320     csChange->cdrCodec.wptr_max=
321         cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
322
323     /* SN for next issue */
324     SeqNumberInc(snNext,cstWriter->lastSN);
325
326     /* prepare data */
327     RTPSHeaderCreate(&csChange->cdrCodec,
328                      cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
329     RTPSInfoTSCreate(&csChange->cdrCodec,
330                      getActualNtpTime());
331     RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
332                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
333
334     //serialization routine
335     if (cstWriter->typeRegister->serialize) {
336       cstWriter->typeRegister->serialize(
337           &csChange->cdrCodec,
338           cstWriter->objectEntryOID->instance);
339     } else {
340       //no deserialization -> memcpy
341       CDR_buffer_puts(&csChange->cdrCodec,
342                       cstWriter->objectEntryOID->instance,max_size);
343     }
344
345     debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
346                    max_size,snNext.low);
347                   
348     CSTWriterAddCSChange(cstWriter->domain,
349                         cstWriter,
350                         csChange);
351   }
352   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
353   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
354   return ORTE_OK;
355 }
356
357 /*****************************************************************************/
358 int
359 ORTEPublicationSendEx(ORTEPublication *cstWriter,
360     ORTEPublicationSendParam *psp) {
361   int             r;
362
363   if (!cstWriter) return ORTE_BAD_HANDLE;
364   //prepare sending queue
365   r=ORTEPublicationPrepareQueue(cstWriter);
366   if (r<0) 
367     return r;
368   //send
369   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
370   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
371   pthread_rwlock_wrlock(&cstWriter->lock);
372   r=ORTEPublicationSendLocked(cstWriter,psp);
373   pthread_rwlock_unlock(&cstWriter->lock);
374   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
375   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
376   return r;
377 }
378
379 /*****************************************************************************/
380 inline int
381 ORTEPublicationSend(ORTEPublication *cstWriter) {
382   return ORTEPublicationSendEx(cstWriter,NULL);
383 }
384
385
386 /*****************************************************************************/
387 inline void *
388 ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
389   return cstWriter->objectEntryOID->instance;
390 }