]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        subscriptions, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTESubscription *
40 ORTESubscriptionCreate(ORTEDomain *d, SubscriptionMode mode, SubscriptionType sType,
41                        const char *topic, const char *typeName, void *instance, NtpTime *deadline,
42                        NtpTime *minimumSeparation, ORTERecvCallBack recvCallBack,
43                        void *recvCallBackParam, IPAddress multicastIPAddress)
44 {
45   GUID_RTPS             guid;
46   CSTReader             *cstReader;
47   CSTReaderParams       cstReaderParams;
48   ORTESubsProp          *sp;
49   ObjectEntryOID        *objectEntryOID;
50   CSChange              *csChange;
51   TypeNode              *typeNode;
52
53   cstReader = (CSTReader *)MALLOC(sizeof(CSTReader));
54   if (!cstReader)
55     return NULL;
56   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58   pthread_rwlock_rdlock(&d->typeEntry.lock);
59   if (!(typeNode = ORTEType_find(&d->typeEntry, &typeName))) {
60     pthread_rwlock_unlock(&d->typeEntry.lock);
61     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
64             ser./deser. function for a given typeName!!!\n");
65     FREE(cstReader);
66     return NULL;
67   }
68   pthread_rwlock_wrlock(&d->subscriptions.lock);
69   // join to multicast group
70   if (IN_MULTICAST(multicastIPAddress)) {
71     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
72     struct ip_mreq mreq;
73
74     mreq.imr_multiaddr.s_addr = htonl(multicastIPAddress);
75     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
76     if (sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
77                         IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
78       debug(33, 2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
79                     IPAddressToString(multicastIPAddress, sIPAddress));
80     }
81   }
82   //generate new guid of publisher
83   d->subscriptions.counter++;
84   guid.hid = d->guid.hid;
85   guid.aid = d->guid.aid;
86   guid.oid = (d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
87   sp = (ORTESubsProp *)MALLOC(sizeof(ORTESubsProp));
88   memcpy(sp, &d->domainProp.subsPropDefault, sizeof(ORTESubsProp));
89   strcpy((char *)sp->topic, topic);
90   strcpy((char *)sp->typeName, typeName);
91   sp->deadline = *deadline;
92   sp->minimumSeparation = *minimumSeparation;
93   sp->multicast = multicastIPAddress;
94   switch (sType) {
95     case BEST_EFFORTS:
96       sp->reliabilityRequested = PID_VALUE_RELIABILITY_BEST_EFFORTS;
97       break;
98     case STRICT_RELIABLE:
99       sp->reliabilityRequested = PID_VALUE_RELIABILITY_STRICT;
100       break;
101   }
102   sp->mode = mode;
103   //insert object to structure objectEntry
104   objectEntryOID = objectEntryAdd(d, &guid, (void *)sp);
105   objectEntryOID->privateCreated = ORTE_TRUE;
106   objectEntryOID->instance = instance;
107   objectEntryOID->recvCallBack = recvCallBack;
108   objectEntryOID->callBackParam = recvCallBackParam;
109   //create writerSubscription
110   cstReaderParams.delayResponceTimeMin = zNtpTime;
111   cstReaderParams.delayResponceTimeMax = zNtpTime;
112   cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
113   cstReaderParams.repeatActiveQueryTime = iNtpTime;
114   cstReaderParams.fullAcknowledge = ORTE_FALSE;
115   CSTReaderInit(d, cstReader, objectEntryOID, guid.oid, &cstReaderParams,
116                 &typeNode->typeRegister);
117   //insert cstWriter to list of subscriberes
118   CSTReader_insert(&d->subscriptions, cstReader);
119   //generate csChange for writerSubscriberes
120   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
121   csChange = (CSChange *)MALLOC(sizeof(CSChange));
122   parameterUpdateCSChangeFromSubscription(csChange, sp);
123   csChange->guid = guid;
124   csChange->alive = ORTE_TRUE;
125   CDR_codec_init_static(&csChange->cdrCodec);
126   CSTWriterAddCSChange(d, &d->writerSubscriptions, csChange);
127   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
128   pthread_rwlock_unlock(&d->subscriptions.lock);
129   pthread_rwlock_unlock(&d->typeEntry.lock);
130   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
131   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
132
133   return cstReader;
134 }
135
136 /*****************************************************************************/
137 int
138 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader)
139 {
140   CSChange              *csChange;
141
142   if (!cstReader)
143     return ORTE_BAD_HANDLE;
144   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
145   csChange = (CSChange *)MALLOC(sizeof(CSChange));
146   CSChangeAttributes_init_head(csChange);
147   csChange->guid = cstReader->guid;
148   csChange->alive = ORTE_FALSE;
149   csChange->cdrCodec.buffer = NULL;
150   CSTWriterAddCSChange(cstReader->domain,
151                        &cstReader->domain->writerSubscriptions,
152                        csChange);
153   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
154   return ORTE_OK;
155 }
156
157 /*****************************************************************************/
158 int
159 ORTESubscriptionDestroy(ORTESubscription *cstReader)
160 {
161   int r;
162
163   if (!cstReader)
164     return ORTE_BAD_HANDLE;
165   //generate csChange for writerSubscriptions
166   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
167   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
168   pthread_rwlock_wrlock(&cstReader->lock);
169   r = ORTESubscriptionDestroyLocked(cstReader);
170   pthread_rwlock_unlock(&cstReader->lock);
171   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
172   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
173   return r;
174 }
175
176
177 /*****************************************************************************/
178 int
179 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader, ORTESubsProp *sp)
180 {
181   if (!cstReader)
182     return ORTE_BAD_HANDLE;
183   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
184   pthread_rwlock_rdlock(&cstReader->lock);
185   *sp = *(ORTESubsProp *)cstReader->objectEntryOID->attributes;
186   pthread_rwlock_unlock(&cstReader->lock);
187   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
188   return ORTE_OK;
189 }
190
191 /*****************************************************************************/
192 int
193 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader, ORTESubsProp *sp)
194 {
195   CSChange              *csChange;
196
197   if (!cstReader)
198     return ORTE_BAD_HANDLE;
199   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
200   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
201   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
202   pthread_rwlock_rdlock(&cstReader->lock);
203   csChange = (CSChange *)MALLOC(sizeof(CSChange));
204   parameterUpdateCSChangeFromSubscription(csChange, sp);
205   csChange->guid = cstReader->guid;
206   csChange->alive = ORTE_TRUE;
207   csChange->cdrCodec.buffer = NULL;
208   CSTWriterAddCSChange(cstReader->domain,
209                        &cstReader->domain->writerSubscriptions, csChange);
210   pthread_rwlock_unlock(&cstReader->lock);
211   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
212   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
213   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
214   return ORTE_OK;
215 }
216
217 /*****************************************************************************/
218 int
219 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader, NtpTime wait,
220                                     unsigned int retries, unsigned int noPublications)
221 {
222   unsigned int wPublications;
223   uint32_t sec, ms;
224
225   if (!cstReader)
226     return ORTE_BAD_HANDLE;
227   NtpTimeDisAssembToMs(sec, ms, wait);
228   do {
229     pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
230     pthread_rwlock_rdlock(&cstReader->lock);
231     wPublications = cstReader->cstRemoteWriterCounter;
232     pthread_rwlock_unlock(&cstReader->lock);
233     pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
234     if (wPublications >= noPublications)
235       return ORTE_OK;
236     ORTESleepMs(sec*1000+ms);
237   } while (retries--);
238   return ORTE_TIMEOUT;
239 }
240
241 /*****************************************************************************/
242 int
243 ORTESubscriptionGetStatus(ORTESubscription *cstReader, ORTESubsStatus *status)
244 {
245   CSChange *csChange;
246
247   if (!cstReader)
248     return ORTE_BAD_HANDLE;
249   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
250   pthread_rwlock_rdlock(&cstReader->lock);
251   status->strict = cstReader->strictReliableCounter;
252   status->bestEffort = cstReader->bestEffortsCounter;
253   status->issues = 0;
254   ul_list_for_each(CSTReaderCSChange, cstReader, csChange)
255   status->issues++;
256   pthread_rwlock_unlock(&cstReader->lock);
257   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
258   return ORTE_OK;
259 }
260
261 /*****************************************************************************/
262 int
263 ORTESubscriptionPull(ORTESubscription *cstReader)
264 {
265   ORTESubsProp         *sp;
266   ORTERecvInfo         info;
267   NtpTime              timeNext;
268
269   if (!cstReader)
270     return ORTE_BAD_HANDLE;
271   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
272   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
273   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
274   pthread_rwlock_wrlock(&cstReader->lock);
275   sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
276   if ((sp->mode == PULLED) &&
277       (cstReader->objectEntryOID->recvCallBack)) {
278     if (NtpTimeCmp(
279           getActualNtpTime(),
280           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer)) >= 0) {
281       memset(&info, 0, sizeof(info));
282       info.status = DEADLINE;
283       info.topic = (char *)sp->topic;
284       info.type = (char *)sp->typeName;
285       cstReader->objectEntryOID->recvCallBack(&info,
286                                               cstReader->objectEntryOID->instance,
287                                               cstReader->objectEntryOID->callBackParam);
288       NtpTimeAdd(timeNext,
289                  (getActualNtpTime()),
290                  sp->deadline);
291       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer, timeNext);
292     }
293     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed, ORTE_TRUE);
294   }
295   pthread_rwlock_unlock(&cstReader->lock);
296   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
297   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
298   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
299   return ORTE_OK;
300 }
301
302
303 /*****************************************************************************/
304 inline void *
305 ORTESubscriptionGetInstance(ORTESubscription *cstReader)
306 {
307   return cstReader->objectEntryOID->instance;
308 }