]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEPublication.c
Add shell.nix
[orte.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(PublicationList,
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        publications, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTEPublication *
40 ORTEPublicationCreate(ORTEDomain *d, const char *topic, const char *typeName,
41                       void *instance, NtpTime *persistence, int strength,
42                       ORTESendCallBack sendCallBack, void *sendCallBackParam,
43                       NtpTime *sendCallBackDelay)
44 {
45   GUID_RTPS             guid;
46   CSTWriter             *cstWriter;
47   CSTWriterParams       cstWriterParams;
48   ORTEPublProp          *pp;
49   ObjectEntryOID        *objectEntryOID;
50   CSChange              *csChange;
51   TypeNode              *typeNode;
52
53   debug(31, 10) ("ORTEPublicationCreate: start\n");
54   cstWriter = (CSTWriter *)MALLOC(sizeof(CSTWriter));
55   if (!cstWriter)
56     return NULL;
57   debug(31, 10) ("ORTEPublicationCreate: memory OK\n");
58   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
59   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
60   pthread_rwlock_rdlock(&d->typeEntry.lock);
61   if (!(typeNode = ORTEType_find(&d->typeEntry, &typeName))) {
62     pthread_rwlock_unlock(&d->typeEntry.lock);
63     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
64     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
65     printf("before call ORTEPublicationCreate is necessary to register \n\
66             ser./deser. function for a given typeName!!!\n");
67     FREE(cstWriter);
68     return NULL;
69   }
70   pthread_rwlock_wrlock(&d->publications.lock);
71   //generate new guid of publisher
72   d->publications.counter++;
73   guid.hid = d->guid.hid;
74   guid.aid = d->guid.aid;
75   guid.oid = (d->publications.counter<<8)|OID_PUBLICATION;
76   pp = (ORTEPublProp *)MALLOC(sizeof(ORTEPublProp));
77   memcpy(pp, &d->domainProp.publPropDefault, sizeof(ORTEPublProp));
78   strcpy((char *)pp->topic, topic);
79   strcpy((char *)pp->typeName, typeName);
80   pp->persistence = *persistence;
81   pp->strength = strength;
82   pp->reliabilityOffered = PID_VALUE_RELIABILITY_BEST_EFFORTS |
83                            PID_VALUE_RELIABILITY_STRICT;
84   //insert object to structure objectEntry
85   objectEntryOID = objectEntryAdd(d, &guid, (void *)pp);
86   objectEntryOID->privateCreated = ORTE_TRUE;
87   objectEntryOID->instance = instance;
88   objectEntryOID->sendCallBack = sendCallBack;
89   objectEntryOID->callBackParam = sendCallBackParam;
90   if (objectEntryOID->sendCallBack != NULL) {
91     if (sendCallBackDelay != NULL) {
92       objectEntryOID->sendCallBackDelay = *sendCallBackDelay;
93       eventAdd(d,
94                objectEntryOID->objectEntryAID,
95                &objectEntryOID->sendCallBackDelayTimer,
96                0,
97                "PublicationCallBackTimer",
98                PublicationCallBackTimer,
99                &cstWriter->lock,
100                cstWriter,
101                &objectEntryOID->sendCallBackDelay);
102     }
103   }
104   //create writerPublication
105   cstWriterParams.registrationRetries = 0;
106   NTPTIME_ZERO(cstWriterParams.registrationPeriod);
107   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
108   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
109   cstWriterParams.refreshPeriod = iNtpTime;  //can't refresh csChange(s)
110   cstWriterParams.repeatAnnounceTime = pp->HBNornalRate;
111   cstWriterParams.HBMaxRetries = pp->HBMaxRetries;
112   cstWriterParams.fullAcknowledge = ORTE_TRUE;
113   CSTWriterInit(d, cstWriter, objectEntryOID, guid.oid, &cstWriterParams,
114                 &typeNode->typeRegister);
115   //insert cstWriter to list of publications
116   CSTWriter_insert(&d->publications, cstWriter);
117   //generate csChange for writerPublisher
118   pthread_rwlock_wrlock(&d->writerPublications.lock);
119   csChange = (CSChange *)MALLOC(sizeof(CSChange));
120   parameterUpdateCSChangeFromPublication(csChange, pp);
121   csChange->guid = guid;
122   csChange->alive = ORTE_TRUE;
123   csChange->cdrCodec.buffer = NULL;
124   debug(31, 10) ("ORTEPublicationCreate: add CSChange\n");
125   CSTWriterAddCSChange(d, &d->writerPublications, csChange);
126   pthread_rwlock_unlock(&d->writerPublications.lock);
127   pthread_rwlock_unlock(&d->publications.lock);
128   pthread_rwlock_unlock(&d->typeEntry.lock);
129   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
130   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
131   debug(31, 10) ("ORTEPublicationCreate: finished\n");
132   return cstWriter;
133 }
134
135 /*****************************************************************************/
136 int
137 ORTEPublicationDestroy(ORTEPublication *cstWriter)
138 {
139   CSChange              *csChange;
140
141   if (!cstWriter)
142     return ORTE_BAD_HANDLE;
143   //generate csChange for writerPublisher
144   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
145   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
146   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
147   if (cstWriter->objectEntryOID->sendCallBack != NULL) {
148     eventDetach(cstWriter->domain,
149                 cstWriter->objectEntryOID->objectEntryAID,
150                 &cstWriter->objectEntryOID->sendCallBackDelayTimer,
151                 0);
152   }
153   csChange = (CSChange *)MALLOC(sizeof(CSChange));
154   CSChangeAttributes_init_head(csChange);
155   csChange->cdrCodec.buffer = NULL;
156   csChange->guid = cstWriter->guid;
157   csChange->alive = ORTE_FALSE;
158   CSTWriterAddCSChange(cstWriter->domain,
159                        &cstWriter->domain->writerPublications,
160                        csChange);
161   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
162   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
163   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
164   return ORTE_OK;
165 }
166
167
168 /*****************************************************************************/
169 int
170 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter, ORTEPublProp *pp)
171 {
172   if (!cstWriter)
173     return ORTE_BAD_HANDLE;
174   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
175   pthread_rwlock_rdlock(&cstWriter->lock);
176   *pp = *(ORTEPublProp *)cstWriter->objectEntryOID->attributes;
177   pthread_rwlock_unlock(&cstWriter->lock);
178   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
179   return ORTE_OK;
180 }
181
182 /*****************************************************************************/
183 int
184 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter, ORTEPublProp *pp)
185 {
186   CSChange              *csChange;
187
188   if (!cstWriter)
189     return ORTE_BAD_HANDLE;
190   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
191   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
192   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
193   pthread_rwlock_rdlock(&cstWriter->lock);
194   csChange = (CSChange *)MALLOC(sizeof(CSChange));
195   parameterUpdateCSChangeFromPublication(csChange, pp);
196   csChange->guid = cstWriter->guid;
197   csChange->alive = ORTE_TRUE;
198   csChange->cdrCodec.buffer = NULL;
199   CSTWriterAddCSChange(cstWriter->domain,
200                        &cstWriter->domain->writerPublications, csChange);
201   pthread_rwlock_unlock(&cstWriter->lock);
202   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
203   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
204   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
205   return ORTE_OK;
206 }
207
208 /*****************************************************************************/
209 int
210 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter, NtpTime wait,
211                                     unsigned int retries, unsigned int noSubscriptions)
212 {
213   unsigned int rSubscriptions;
214   uint32_t sec, ms;
215
216   if (!cstWriter)
217     return ORTE_BAD_HANDLE;
218   NtpTimeDisAssembToMs(sec, ms, wait);
219   do {
220     pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
221     pthread_rwlock_rdlock(&cstWriter->lock);
222     rSubscriptions = cstWriter->cstRemoteReaderCounter;
223     pthread_rwlock_unlock(&cstWriter->lock);
224     pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
225     if (rSubscriptions >= noSubscriptions)
226       return ORTE_OK;
227     ORTESleepMs(sec*1000+ms);
228   } while (retries--);
229   return ORTE_TIMEOUT;
230 }
231
232 /*****************************************************************************/
233 int
234 ORTEPublicationGetStatus(ORTEPublication *cstWriter, ORTEPublStatus *status)
235 {
236   CSChange *csChange;
237
238   if (!cstWriter)
239     return ORTE_BAD_HANDLE;
240   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
241   pthread_rwlock_rdlock(&cstWriter->lock);
242   status->strict = cstWriter->strictReliableCounter;
243   status->bestEffort = cstWriter->bestEffortsCounter;
244   status->issues = 0;
245   ul_list_for_each(CSTWriterCSChange, cstWriter, csChange)
246   status->issues++;
247   pthread_rwlock_unlock(&cstWriter->lock);
248   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
249   return ORTE_OK;
250 }
251
252 /*****************************************************************************/
253 int
254 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter)
255 {
256   ORTEPublProp          *pp;
257
258   if (!cstWriter)
259     return ORTE_BAD_HANDLE;
260   pthread_rwlock_wrlock(&cstWriter->lock);
261   pp = (ORTEPublProp *)cstWriter->objectEntryOID->attributes;
262   if (cstWriter->csChangesCounter >= pp->sendQueueSize) {
263     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
264       NtpTime             expire, atime = getActualNtpTime();
265       struct timespec     wtime;
266       //count max block time
267       NtpTimeAdd(expire, atime, cstWriter->domain->domainProp.baseProp.maxBlockTime);
268       NtpTimeDisAssembToUs(wtime.tv_sec, wtime.tv_nsec, expire);
269       wtime.tv_nsec *= 1000;  //conver to nano seconds
270       while (cstWriter->csChangesCounter >= pp->sendQueueSize) {
271         pthread_rwlock_unlock(&cstWriter->lock);
272         //wait till a message will be processed
273         pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
274         if (cstWriter->condValueCSChangeDestroyed == 0) {
275           if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
276                                      &cstWriter->mutexCSChangeDestroyed,
277                                      &wtime) == ETIMEDOUT) {
278             debug(31, 5) ("Publication: queue level (%d), queue full!!!\n",
279                           cstWriter->csChangesCounter);
280             pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
281             pthread_rwlock_unlock(&cstWriter->lock);
282             return ORTE_QUEUE_FULL;
283           }
284         }
285         cstWriter->condValueCSChangeDestroyed = 0;
286         pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
287         pthread_rwlock_wrlock(&cstWriter->lock);
288       }
289     }
290   }
291   pthread_rwlock_unlock(&cstWriter->lock);
292   return ORTE_OK;
293 }
294
295 /*****************************************************************************/
296 int
297 ORTEPublicationSendLocked(ORTEPublication *cstWriter,
298                           ORTEPublicationSendParam *psp)
299 {
300   CSChange              *csChange;
301   SequenceNumber        snNext;
302   int                   max_size;
303
304   if (!cstWriter)
305     return ORTE_BAD_HANDLE;
306   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
307   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
308   if (!CSTRemoteReader_is_empty(cstWriter)) {
309     ORTEGetMaxSizeParam gms;
310
311     csChange = (CSChange *)MALLOC(sizeof(CSChange));
312     CSChangeAttributes_init_head(csChange);
313     csChange->guid = cstWriter->guid;
314     csChange->alive = ORTE_FALSE;
315     CDR_codec_init_static(&csChange->cdrCodec);
316     csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
317
318     if (psp) {
319       csChange->cdrCodec.data_endian = psp->data_endian;
320       cstWriter->objectEntryOID->instance = psp->instance;
321     }
322
323     /* determine maximal size */
324     gms.host_endian = csChange->cdrCodec.host_endian;
325     gms.data_endian = csChange->cdrCodec.data_endian;
326     gms.data = cstWriter->objectEntryOID->instance;
327     gms.max_size = cstWriter->typeRegister->maxSize;
328     gms.recv_size = -1;
329     gms.csize = 0;
330     if (cstWriter->typeRegister->getMaxSize)
331       max_size = cstWriter->typeRegister->getMaxSize(&gms, 1);
332     else
333       max_size = cstWriter->typeRegister->maxSize;
334
335     /* prepare csChange */
336     CDR_buffer_init(&csChange->cdrCodec,
337                     RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
338     csChange->cdrCodec.wptr_max =
339       cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
340
341     /* SN for next issue */
342     SeqNumberInc(snNext, cstWriter->lastSN);
343
344     /* prepare data */
345     RTPSHeaderCreate(&csChange->cdrCodec,
346                      cstWriter->domain->guid.hid, cstWriter->domain->guid.aid);
347     RTPSInfoTSCreate(&csChange->cdrCodec,
348                      getActualNtpTime());
349     RTPSIssueCreateHeader(&csChange->cdrCodec, 16+max_size,
350                           OID_UNKNOWN, cstWriter->guid.oid, snNext);
351
352     //serialization routine
353     if (cstWriter->typeRegister->serialize) {
354       cstWriter->typeRegister->serialize(
355         &csChange->cdrCodec,
356         cstWriter->objectEntryOID->instance);
357     } else {
358       //no deserialization -> memcpy
359       CDR_buffer_puts(&csChange->cdrCodec,
360                       cstWriter->objectEntryOID->instance, max_size);
361     }
362
363     debug(31, 10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
364                    max_size, snNext.low);
365
366     CSTWriterAddCSChange(cstWriter->domain,
367                          cstWriter,
368                          csChange);
369   }
370   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
371   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
372   return ORTE_OK;
373 }
374
375 /*****************************************************************************/
376 int
377 ORTEPublicationSendEx(ORTEPublication *cstWriter,
378                       ORTEPublicationSendParam *psp)
379 {
380   int             r;
381
382   if (!cstWriter)
383     return ORTE_BAD_HANDLE;
384   //prepare sending queue
385   r = ORTEPublicationPrepareQueue(cstWriter);
386   if (r < 0)
387     return r;
388   //send
389   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
390   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
391   pthread_rwlock_wrlock(&cstWriter->lock);
392   r = ORTEPublicationSendLocked(cstWriter, psp);
393   pthread_rwlock_unlock(&cstWriter->lock);
394   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
395   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
396   return r;
397 }
398
399 /*****************************************************************************/
400 inline int
401 ORTEPublicationSend(ORTEPublication *cstWriter)
402 {
403   return ORTEPublicationSendEx(cstWriter, NULL);
404 }
405
406
407 /*****************************************************************************/
408 inline void *
409 ORTEPublicationGetInstance(ORTEPublication *cstWriter)
410 {
411   return cstWriter->objectEntryOID->instance;
412 }