]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEPublication.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 GAVL_CUST_NODE_INT_IMP(PublicationList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        publications, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTEPublication * 
30 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
31     void *instance,NtpTime *persistence,int strength,
32     ORTESendCallBack sendCallBack,void *sendCallBackParam,
33     NtpTime *sendCallBackDelay) {
34   GUID_RTPS             guid;
35   CSTWriter             *cstWriter;
36   CSTWriterParams       cstWriterParams;
37   ORTEPublProp          *pp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   debug(31,10) ("ORTEPublicationCreate: start\n");
43   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
44   if (!cstWriter) return NULL;
45   debug(31,10) ("ORTEPublicationCreate: memory OK\n");
46   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
47   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
48   pthread_rwlock_rdlock(&d->typeEntry.lock);    
49   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
50     pthread_rwlock_unlock(&d->typeEntry.lock);    
51     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
52     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
53     printf("before call ORTEPublicationCreate is necessary to register \n\
54             ser./deser. function for a given typeName!!!\n");
55     return NULL;
56   }  
57   pthread_rwlock_wrlock(&d->publications.lock);
58   //generate new guid of publisher
59   d->publications.counter++;
60   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
61   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
62   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
63   memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
64   strcpy(pp->topic,topic);
65   strcpy(pp->typeName,typeName);
66   pp->persistence=*persistence;
67   pp->strength=strength;
68   pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS | 
69                          PID_VALUE_RELIABILITY_STRICT;
70   //insert object to structure objectEntry
71   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
72   objectEntryOID->privateCreated=ORTE_TRUE;
73   objectEntryOID->instance=instance;
74   objectEntryOID->sendCallBack=sendCallBack;
75   objectEntryOID->callBackParam=sendCallBackParam;
76   if (objectEntryOID->sendCallBack!=NULL) {  
77     if (sendCallBackDelay!=NULL) {
78       objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
79       eventAdd(d,
80           objectEntryOID->objectEntryAID,
81           &objectEntryOID->sendCallBackDelayTimer,
82           0,   
83           "PublicationCallBackTimer",
84           PublicationCallBackTimer,
85           &cstWriter->lock,
86           cstWriter,
87           &objectEntryOID->sendCallBackDelay);               
88     }
89   }
90   //create writerPublication
91   cstWriterParams.registrationRetries=0; 
92   NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
93   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
94   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
95   cstWriterParams.refreshPeriod=iNtpTime;  //can't refresh csChange(s)
96   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
97   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
98   cstWriterParams.fullAcknowledge=ORTE_TRUE;
99   CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
100                 &typeNode->typeRegister);
101   //insert cstWriter to list of publications
102   CSTWriter_insert(&d->publications,cstWriter);
103   //generate csChange for writerPublisher
104   pthread_rwlock_wrlock(&d->writerPublications.lock);
105   csChange=(CSChange*)MALLOC(sizeof(CSChange));
106   parameterUpdateCSChangeFromPublication(csChange,pp);
107   csChange->guid=guid;
108   csChange->alive=ORTE_TRUE;
109   csChange->cdrCodec.buffer=NULL;
110   debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
111   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
112   pthread_rwlock_unlock(&d->writerPublications.lock);
113   pthread_rwlock_unlock(&d->publications.lock);
114   pthread_rwlock_unlock(&d->typeEntry.lock);    
115   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
116   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
117   debug(31,10) ("ORTEPublicationCreate: finished\n");
118   return cstWriter;
119 }
120
121 /*****************************************************************************/
122 int
123 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
124   CSChange              *csChange;
125
126   if (!cstWriter) return ORTE_BAD_HANDLE;
127   //generate csChange for writerPublisher
128   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
129   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
130   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
131   if (cstWriter->objectEntryOID->sendCallBack!=NULL) {  
132     eventDetach(cstWriter->domain,
133         cstWriter->objectEntryOID->objectEntryAID,
134         &cstWriter->objectEntryOID->sendCallBackDelayTimer,
135         0);
136   }
137   csChange=(CSChange*)MALLOC(sizeof(CSChange));
138   CSChangeAttributes_init_head(csChange);
139   csChange->cdrCodec.buffer=NULL;
140   csChange->guid=cstWriter->guid;
141   csChange->alive=ORTE_FALSE;
142   CSTWriterAddCSChange(cstWriter->domain,
143                        &cstWriter->domain->writerPublications,
144                        csChange);
145   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
146   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
147   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
148   return ORTE_OK;
149 }
150
151
152 /*****************************************************************************/
153 int
154 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
155   if (!cstWriter) return ORTE_BAD_HANDLE;
156   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
157   pthread_rwlock_rdlock(&cstWriter->lock);
158   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
159   pthread_rwlock_unlock(&cstWriter->lock);
160   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
161   return ORTE_OK;
162 }
163
164 /*****************************************************************************/
165 int
166 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
167   CSChange              *csChange;
168
169   if (!cstWriter) return ORTE_BAD_HANDLE;
170   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
171   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
172   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
173   pthread_rwlock_rdlock(&cstWriter->lock);
174   csChange=(CSChange*)MALLOC(sizeof(CSChange));
175   parameterUpdateCSChangeFromPublication(csChange,pp);
176   csChange->guid=cstWriter->guid;
177   csChange->alive=ORTE_TRUE;
178   csChange->cdrCodec.buffer=NULL;
179   CSTWriterAddCSChange(cstWriter->domain,
180       &cstWriter->domain->writerPublications,csChange);
181   pthread_rwlock_unlock(&cstWriter->lock);
182   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
183   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
184   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
185   return ORTE_OK;
186 }
187
188 /*****************************************************************************/
189 int
190 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
191     unsigned int retries,unsigned int noSubscriptions) {
192   unsigned int rSubscriptions;
193   uint32_t sec,ms;
194
195   if (!cstWriter) return ORTE_BAD_HANDLE;
196   NtpTimeDisAssembToMs(sec,ms,wait);
197   do {
198     pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
199     pthread_rwlock_rdlock(&cstWriter->lock);
200     rSubscriptions=cstWriter->cstRemoteReaderCounter;
201     pthread_rwlock_unlock(&cstWriter->lock);
202     pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
203     if (rSubscriptions>=noSubscriptions)
204       return ORTE_OK;
205     ORTESleepMs(sec*1000+ms);
206   } while (retries--);
207   return ORTE_TIMEOUT;  
208 }
209
210 /*****************************************************************************/
211 int
212 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
213   CSChange *csChange;
214
215   if (!cstWriter) return ORTE_BAD_HANDLE;
216   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
217   pthread_rwlock_rdlock(&cstWriter->lock);
218   status->strict=cstWriter->strictReliableCounter;
219   status->bestEffort=cstWriter->bestEffortsCounter;
220   status->issues=0;
221   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
222     status->issues++;
223   pthread_rwlock_unlock(&cstWriter->lock);
224   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
225   return ORTE_OK;
226 }
227
228 /*****************************************************************************/
229 int
230 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
231   ORTEPublProp          *pp;
232   
233   if (!cstWriter) return ORTE_BAD_HANDLE;
234   pthread_rwlock_wrlock(&cstWriter->lock);
235   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
236   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
237     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
238       NtpTime             expire,atime=getActualNtpTime();
239       struct timespec     wtime; 
240       //count max block time
241       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
242       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
243       wtime.tv_nsec*=1000;  //conver to nano seconds
244       while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
245         pthread_rwlock_unlock(&cstWriter->lock);    
246         //wait till a message will be processed
247         pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
248         if (cstWriter->condValueCSChangeDestroyed==0) {
249           if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
250                                  &cstWriter->mutexCSChangeDestroyed,
251                                  &wtime)==ETIMEDOUT) {
252             debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
253                           cstWriter->csChangesCounter);
254             pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
255             pthread_rwlock_unlock(&cstWriter->lock);
256             return ORTE_QUEUE_FULL;
257           }
258         }
259         cstWriter->condValueCSChangeDestroyed=0;
260         pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
261         pthread_rwlock_wrlock(&cstWriter->lock);    
262       }
263     }
264   }
265   pthread_rwlock_unlock(&cstWriter->lock);
266   return ORTE_OK;
267 }
268
269 /*****************************************************************************/
270 int
271 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
272     ORTEPublicationSendParam *psp) {
273   CSChange              *csChange;
274   SequenceNumber        snNext;
275   int                   max_size;
276   
277   if (!cstWriter) return ORTE_BAD_HANDLE;
278   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
279   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
280   if (!CSTRemoteReader_is_empty(cstWriter)) {
281     ORTEGetMaxSizeParam gms;
282
283     csChange=(CSChange*)MALLOC(sizeof(CSChange));
284     CSChangeAttributes_init_head(csChange);
285     csChange->guid=cstWriter->guid;
286     csChange->alive=ORTE_FALSE;
287     CDR_codec_init_static(&csChange->cdrCodec);
288     csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
289
290     if (psp) {
291       csChange->cdrCodec.data_endian = psp->data_endian;
292       cstWriter->objectEntryOID->instance=psp->instance;
293     }
294
295     /* determine maximal size */
296     gms.host_endian=csChange->cdrCodec.host_endian;
297     gms.data_endian=csChange->cdrCodec.data_endian;
298     gms.data=cstWriter->objectEntryOID->instance;
299     gms.max_size=cstWriter->typeRegister->maxSize;
300     gms.recv_size=-1;
301     gms.csize=0;
302     if (cstWriter->typeRegister->getMaxSize)
303       max_size=cstWriter->typeRegister->getMaxSize(&gms);
304     else
305       max_size=cstWriter->typeRegister->maxSize;
306     
307     /* prepare csChange */
308     CDR_buffer_init(&csChange->cdrCodec,
309                     RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
310     csChange->cdrCodec.wptr_max=
311         cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
312
313     /* SN for next issue */
314     SeqNumberInc(snNext,cstWriter->lastSN);
315
316     /* prepare data */
317     RTPSHeaderCreate(&csChange->cdrCodec,
318                      cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
319     RTPSInfoTSCreate(&csChange->cdrCodec,
320                      getActualNtpTime());
321     RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
322                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
323
324     //serialization routine
325     if (cstWriter->typeRegister->serialize) {
326       cstWriter->typeRegister->serialize(
327           &csChange->cdrCodec,
328           cstWriter->objectEntryOID->instance);
329     } else {
330       //no deserialization -> memcpy
331       CDR_buffer_puts(&csChange->cdrCodec,
332                       cstWriter->objectEntryOID->instance,max_size);
333     }
334
335     debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
336                    max_size,snNext.low);
337                   
338     CSTWriterAddCSChange(cstWriter->domain,
339                         cstWriter,
340                         csChange);
341   }
342   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
343   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
344   return ORTE_OK;
345 }
346
347 /*****************************************************************************/
348 int
349 ORTEPublicationSendEx(ORTEPublication *cstWriter,
350     ORTEPublicationSendParam *psp) {
351   int             r;
352
353   if (!cstWriter) return ORTE_BAD_HANDLE;
354   //prepare sending queue
355   r=ORTEPublicationPrepareQueue(cstWriter);
356   if (r<0) 
357     return r;
358   //send
359   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
360   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
361   pthread_rwlock_wrlock(&cstWriter->lock);
362   r=ORTEPublicationSendLocked(cstWriter,psp);
363   pthread_rwlock_unlock(&cstWriter->lock);
364   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
365   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
366   return r;
367 }
368
369 /*****************************************************************************/
370 inline int
371 ORTEPublicationSend(ORTEPublication *cstWriter) {
372   return ORTEPublicationSendEx(cstWriter,NULL);
373 }
374