2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 subscriptions, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTESubscriptionCreate(ORTEDomain *d, SubscriptionMode mode, SubscriptionType sType,
41 const char *topic, const char *typeName, void *instance, NtpTime *deadline,
42 NtpTime *minimumSeparation, ORTERecvCallBack recvCallBack,
43 void *recvCallBackParam, IPAddress multicastIPAddress)
47 CSTReaderParams cstReaderParams;
49 ObjectEntryOID *objectEntryOID;
53 cstReader = (CSTReader *)MALLOC(sizeof(CSTReader));
56 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
57 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
58 pthread_rwlock_rdlock(&d->typeEntry.lock);
59 if (!(typeNode = ORTEType_find(&d->typeEntry, &typeName))) {
60 pthread_rwlock_unlock(&d->typeEntry.lock);
61 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
62 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
63 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
64 ser./deser. function for a given typeName!!!\n");
68 pthread_rwlock_wrlock(&d->subscriptions.lock);
69 // join to multicast group
70 if (IN_MULTICAST(multicastIPAddress)) {
71 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
74 mreq.imr_multiaddr.s_addr = htonl(multicastIPAddress);
75 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
76 if (sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
77 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
78 debug(33, 2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
79 IPAddressToString(multicastIPAddress, sIPAddress));
82 //generate new guid of publisher
83 d->subscriptions.counter++;
84 guid.hid = d->guid.hid;
85 guid.aid = d->guid.aid;
86 guid.oid = (d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
87 sp = (ORTESubsProp *)MALLOC(sizeof(ORTESubsProp));
88 memcpy(sp, &d->domainProp.subsPropDefault, sizeof(ORTESubsProp));
89 strcpy((char *)sp->topic, topic);
90 strcpy((char *)sp->typeName, typeName);
91 sp->deadline = *deadline;
92 sp->minimumSeparation = *minimumSeparation;
93 sp->multicast = multicastIPAddress;
96 sp->reliabilityRequested = PID_VALUE_RELIABILITY_BEST_EFFORTS;
99 sp->reliabilityRequested = PID_VALUE_RELIABILITY_STRICT;
103 //insert object to structure objectEntry
104 objectEntryOID = objectEntryAdd(d, &guid, (void *)sp);
105 objectEntryOID->privateCreated = ORTE_TRUE;
106 objectEntryOID->instance = instance;
107 objectEntryOID->recvCallBack = recvCallBack;
108 objectEntryOID->callBackParam = recvCallBackParam;
109 //create writerSubscription
110 cstReaderParams.delayResponceTimeMin = zNtpTime;
111 cstReaderParams.delayResponceTimeMax = zNtpTime;
112 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
113 cstReaderParams.repeatActiveQueryTime = iNtpTime;
114 cstReaderParams.fullAcknowledge = ORTE_FALSE;
115 CSTReaderInit(d, cstReader, objectEntryOID, guid.oid, &cstReaderParams,
116 &typeNode->typeRegister);
117 //insert cstWriter to list of subscriberes
118 CSTReader_insert(&d->subscriptions, cstReader);
119 //generate csChange for writerSubscriberes
120 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
121 csChange = (CSChange *)MALLOC(sizeof(CSChange));
122 parameterUpdateCSChangeFromSubscription(csChange, sp);
123 csChange->guid = guid;
124 csChange->alive = ORTE_TRUE;
125 CDR_codec_init_static(&csChange->cdrCodec);
126 CSTWriterAddCSChange(d, &d->writerSubscriptions, csChange);
127 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
128 pthread_rwlock_unlock(&d->subscriptions.lock);
129 pthread_rwlock_unlock(&d->typeEntry.lock);
130 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
131 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
136 /*****************************************************************************/
138 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader)
143 return ORTE_BAD_HANDLE;
144 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
145 csChange = (CSChange *)MALLOC(sizeof(CSChange));
146 CSChangeAttributes_init_head(csChange);
147 csChange->guid = cstReader->guid;
148 csChange->alive = ORTE_FALSE;
149 csChange->cdrCodec.buffer = NULL;
150 CSTWriterAddCSChange(cstReader->domain,
151 &cstReader->domain->writerSubscriptions,
153 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
157 /*****************************************************************************/
159 ORTESubscriptionDestroy(ORTESubscription *cstReader)
164 return ORTE_BAD_HANDLE;
165 //generate csChange for writerSubscriptions
166 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
167 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
168 pthread_rwlock_wrlock(&cstReader->lock);
169 r = ORTESubscriptionDestroyLocked(cstReader);
170 pthread_rwlock_unlock(&cstReader->lock);
171 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
172 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
177 /*****************************************************************************/
179 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader, ORTESubsProp *sp)
182 return ORTE_BAD_HANDLE;
183 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
184 pthread_rwlock_rdlock(&cstReader->lock);
185 *sp = *(ORTESubsProp *)cstReader->objectEntryOID->attributes;
186 pthread_rwlock_unlock(&cstReader->lock);
187 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
191 /*****************************************************************************/
193 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader, ORTESubsProp *sp)
198 return ORTE_BAD_HANDLE;
199 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
200 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
201 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
202 pthread_rwlock_rdlock(&cstReader->lock);
203 csChange = (CSChange *)MALLOC(sizeof(CSChange));
204 parameterUpdateCSChangeFromSubscription(csChange, sp);
205 csChange->guid = cstReader->guid;
206 csChange->alive = ORTE_TRUE;
207 csChange->cdrCodec.buffer = NULL;
208 CSTWriterAddCSChange(cstReader->domain,
209 &cstReader->domain->writerSubscriptions, csChange);
210 pthread_rwlock_unlock(&cstReader->lock);
211 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
212 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
213 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
217 /*****************************************************************************/
219 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader, NtpTime wait,
220 unsigned int retries, unsigned int noPublications)
222 unsigned int wPublications;
226 return ORTE_BAD_HANDLE;
227 NtpTimeDisAssembToMs(sec, ms, wait);
229 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
230 pthread_rwlock_rdlock(&cstReader->lock);
231 wPublications = cstReader->cstRemoteWriterCounter;
232 pthread_rwlock_unlock(&cstReader->lock);
233 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
234 if (wPublications >= noPublications)
236 ORTESleepMs(sec*1000+ms);
241 /*****************************************************************************/
243 ORTESubscriptionGetStatus(ORTESubscription *cstReader, ORTESubsStatus *status)
248 return ORTE_BAD_HANDLE;
249 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
250 pthread_rwlock_rdlock(&cstReader->lock);
251 status->strict = cstReader->strictReliableCounter;
252 status->bestEffort = cstReader->bestEffortsCounter;
254 ul_list_for_each(CSTReaderCSChange, cstReader, csChange)
256 pthread_rwlock_unlock(&cstReader->lock);
257 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
261 /*****************************************************************************/
263 ORTESubscriptionPull(ORTESubscription *cstReader)
270 return ORTE_BAD_HANDLE;
271 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
272 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
273 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
274 pthread_rwlock_wrlock(&cstReader->lock);
275 sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
276 if ((sp->mode == PULLED) &&
277 (cstReader->objectEntryOID->recvCallBack)) {
280 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer)) >= 0) {
281 memset(&info, 0, sizeof(info));
282 info.status = DEADLINE;
283 info.topic = (char *)sp->topic;
284 info.type = (char *)sp->typeName;
285 cstReader->objectEntryOID->recvCallBack(&info,
286 cstReader->objectEntryOID->instance,
287 cstReader->objectEntryOID->callBackParam);
289 (getActualNtpTime()),
291 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer, timeNext);
293 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed, ORTE_TRUE);
295 pthread_rwlock_unlock(&cstReader->lock);
296 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
297 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
298 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
303 /*****************************************************************************/
305 ORTESubscriptionGetInstance(ORTESubscription *cstReader)
307 return cstReader->objectEntryOID->instance;