]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEPublication.c
version 0.2.2 (mac, solaris patch)
[orte.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(PublicationList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        publications, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTEPublication * 
30 ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
31     void *instance,NtpTime *persistence,int strength,
32     ORTESendCallBack sendCallBack,void *sendCallBackParam,
33     NtpTime *sendCallBackDelay) {
34   GUID_RTPS             guid;
35   CSTWriter             *cstWriter;
36   CSTWriterParams       cstWriterParams;
37   ORTEPublProp          *pp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   debug(31,10) ("ORTEPublicationCreate: start\n");
43   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
44   if (!cstWriter) return NULL;
45   debug(31,10) ("ORTEPublicationCreate: memory OK\n");
46   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
47   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
48   pthread_rwlock_rdlock(&d->typeEntry.lock);    
49   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
50     pthread_rwlock_unlock(&d->typeEntry.lock);    
51     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
52     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
53     printf("before call ORTEPublicationCreate is necessary to register \n\
54             ser./deser. function for a given typeName!!!\n");
55     return NULL;
56   }  
57   pthread_rwlock_wrlock(&d->publications.lock);
58   //generate new guid of publisher
59   d->publications.counter++;
60   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
61   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
62   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
63   memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
64   strcpy(pp->topic,topic);
65   strcpy(pp->typeName,typeName);
66   pp->persistence=*persistence;
67   pp->strength=strength;
68   pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS | 
69                          PID_VALUE_RELIABILITY_STRICT;
70   //insert object to structure objectEntry
71   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
72   objectEntryOID->privateCreated=ORTE_TRUE;
73   objectEntryOID->instance=instance;
74   objectEntryOID->sendCallBack=sendCallBack;
75   objectEntryOID->callBackParam=sendCallBackParam;
76   if (objectEntryOID->sendCallBack!=NULL) {  
77     if (sendCallBackDelay!=NULL) {
78       objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
79       eventAdd(d,
80           objectEntryOID->objectEntryAID,
81           &objectEntryOID->sendCallBackDelayTimer,
82           0,   
83           "PublicationCallBackTimer",
84           PublicationCallBackTimer,
85           &cstWriter->lock,
86           cstWriter,
87           &objectEntryOID->sendCallBackDelay);               
88     }
89   }
90   //create writerPublication
91   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
92   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
93   cstWriterParams.refreshPeriod=iNtpTime;  //cann't refresh csChange(s)
94   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
95   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
96   cstWriterParams.fullAcknowledge=ORTE_TRUE;
97   CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
98                 &typeNode->typeRegister);
99   //insert cstWriter to list of publications
100   CSTWriter_insert(&d->publications,cstWriter);
101   //generate csChange for writerPublisher
102   pthread_rwlock_wrlock(&d->writerPublications.lock);
103   csChange=(CSChange*)MALLOC(sizeof(CSChange));
104   parameterUpdateCSChangeFromPublication(csChange,pp);
105   csChange->guid=guid;
106   csChange->alive=ORTE_TRUE;
107   csChange->cdrStream.buffer=NULL;
108   debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
109   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
110   pthread_rwlock_unlock(&d->writerPublications.lock);
111   pthread_rwlock_unlock(&d->publications.lock);
112   pthread_rwlock_unlock(&d->typeEntry.lock);    
113   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
114   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
115   debug(31,10) ("ORTEPublicationCreate: finished\n");
116   return cstWriter;
117 }
118
119 /*****************************************************************************/
120 int
121 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
122   CSChange              *csChange;
123
124   if (!cstWriter) return ORTE_BAD_HANDLE;
125   //generate csChange for writerPublisher
126   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
127   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
128   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
129   if (cstWriter->objectEntryOID->sendCallBack!=NULL) {  
130     eventDetach(cstWriter->domain,
131         cstWriter->objectEntryOID->objectEntryAID,
132         &cstWriter->objectEntryOID->sendCallBackDelayTimer,
133         0);
134   }
135   csChange=(CSChange*)MALLOC(sizeof(CSChange));
136   CSChangeAttributes_init_head(csChange);
137   csChange->cdrStream.buffer=NULL;
138   csChange->guid=cstWriter->guid;
139   csChange->alive=ORTE_FALSE;
140   CSTWriterAddCSChange(cstWriter->domain,
141                        &cstWriter->domain->writerPublications,
142                        csChange);
143   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
144   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
145   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
146   return ORTE_OK;
147 }
148
149
150 /*****************************************************************************/
151 int
152 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
153   if (!cstWriter) return ORTE_BAD_HANDLE;
154   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
155   pthread_rwlock_rdlock(&cstWriter->lock);
156   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
157   pthread_rwlock_unlock(&cstWriter->lock);
158   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
159   return ORTE_OK;
160 }
161
162 /*****************************************************************************/
163 int
164 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
165   CSChange              *csChange;
166
167   if (!cstWriter) return ORTE_BAD_HANDLE;
168   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
169   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
170   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
171   pthread_rwlock_rdlock(&cstWriter->lock);
172   csChange=(CSChange*)MALLOC(sizeof(CSChange));
173   parameterUpdateCSChangeFromPublication(csChange,pp);
174   csChange->guid=cstWriter->guid;
175   csChange->alive=ORTE_TRUE;
176   csChange->cdrStream.buffer=NULL;
177   CSTWriterAddCSChange(cstWriter->domain,
178       &cstWriter->domain->writerPublications,csChange);
179   pthread_rwlock_unlock(&cstWriter->lock);
180   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
181   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
182   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
183   return ORTE_OK;
184 }
185
186 /*****************************************************************************/
187 int
188 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
189     unsigned int retries,unsigned int noSubscriptions) {
190   unsigned int rSubscriptions;
191   uint32_t sec,ms;
192
193   if (!cstWriter) return ORTE_BAD_HANDLE;
194   NtpTimeDisAssembToMs(sec,ms,wait);
195   do {
196     pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
197     pthread_rwlock_rdlock(&cstWriter->lock);
198     rSubscriptions=cstWriter->cstRemoteReaderCounter;
199     pthread_rwlock_unlock(&cstWriter->lock);
200     pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
201     if (rSubscriptions>=noSubscriptions)
202       return ORTE_OK;
203     ORTESleepMs(sec*1000+ms);
204   } while (retries--);
205   return ORTE_TIMEOUT;  
206 }
207
208 /*****************************************************************************/
209 int
210 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
211   CSChange *csChange;
212
213   if (!cstWriter) return ORTE_BAD_HANDLE;
214   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
215   pthread_rwlock_rdlock(&cstWriter->lock);
216   status->strict=cstWriter->strictReliableCounter;
217   status->bestEffort=cstWriter->bestEffortsCounter;
218   status->issues=0;
219   ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
220     status->issues++;
221   pthread_rwlock_unlock(&cstWriter->lock);
222   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
223   return ORTE_OK;
224 }
225
226 /*****************************************************************************/
227 int
228 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
229   ORTEPublProp          *pp;
230   
231   if (!cstWriter) return ORTE_BAD_HANDLE;
232   pthread_rwlock_wrlock(&cstWriter->lock);
233   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
234   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
235     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
236       NtpTime             expire,atime=getActualNtpTime();
237       struct timespec     wtime; 
238       //count max block time
239       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
240       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
241       wtime.tv_nsec*=1000;  //conver to nano seconds
242       pthread_rwlock_unlock(&cstWriter->lock);    
243       //wait till a message will be processed
244       pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
245       if (cstWriter->condValueCSChangeDestroyed==0) {
246         pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
247                                &cstWriter->mutexCSChangeDestroyed,
248                                &wtime);
249       }
250       cstWriter->condValueCSChangeDestroyed=0;
251       pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
252
253       pthread_rwlock_wrlock(&cstWriter->lock);    
254       pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
255       if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
256         debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
257                       cstWriter->csChangesCounter);
258         pthread_rwlock_unlock(&cstWriter->lock);
259         return ORTE_QUEUE_FULL;
260       }
261     }
262   }
263   pthread_rwlock_unlock(&cstWriter->lock);
264   return ORTE_OK;
265 }
266
267 /*****************************************************************************/
268 int
269 ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
270   CSChange              *csChange;
271   SequenceNumber        snNext;
272   
273   if (!cstWriter) return ORTE_BAD_HANDLE;
274   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
275   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
276   if (!CSTRemoteReader_is_empty(cstWriter)) {
277     csChange=(CSChange*)MALLOC(sizeof(CSChange));
278     CSChangeAttributes_init_head(csChange);
279     csChange->guid=cstWriter->guid;
280     csChange->alive=ORTE_FALSE;
281     csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
282                               +20+cstWriter->typeRegister->getMaxSize;
283     csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
284     csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
285     SeqNumberInc(snNext,cstWriter->lastSN);
286     RTPSHeaderCreate(csChange->cdrStream.buffer,
287                     cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
288     RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
289                     12,getActualNtpTime());
290     RTPSIssueCreateHeader(csChange->cdrStream.buffer+
291                     RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
292                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
293     //serialization routine
294     if (cstWriter->typeRegister->serialize) {
295       cstWriter->typeRegister->serialize(
296           &csChange->cdrStream,
297           cstWriter->objectEntryOID->instance);
298     } else {
299       //no deserialization -> memcpy
300       memcpy(csChange->cdrStream.bufferPtr,
301             cstWriter->objectEntryOID->instance,
302             cstWriter->typeRegister->getMaxSize);
303       csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
304     }
305     csChange->cdrStream.needByteSwap=ORTE_FALSE;
306     debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
307                    cstWriter->typeRegister->getMaxSize);
308     CSTWriterAddCSChange(cstWriter->domain,
309                         cstWriter,
310                         csChange);
311   }
312   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
313   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
314   return ORTE_OK;
315 }
316
317 /*****************************************************************************/
318 int
319 ORTEPublicationSend(ORTEPublication *cstWriter) {
320   int             r;
321
322   if (!cstWriter) return ORTE_BAD_HANDLE;
323   //prepare sending queue
324   if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
325   //send
326   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
327   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
328   pthread_rwlock_wrlock(&cstWriter->lock);
329   r=ORTEPublicationSendLocked(cstWriter);
330   pthread_rwlock_unlock(&cstWriter->lock);
331   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
332   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
333   return r;
334 }