2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 subscriptions, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
41 const char *topic,const char *typeName,void *instance,NtpTime *deadline,
42 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
43 void *recvCallBackParam, IPAddress multicastIPAddress) {
46 CSTReaderParams cstReaderParams;
48 ObjectEntryOID *objectEntryOID;
52 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
53 if (!cstReader) return NULL;
54 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
55 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
56 pthread_rwlock_rdlock(&d->typeEntry.lock);
57 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
58 pthread_rwlock_unlock(&d->typeEntry.lock);
59 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
60 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
61 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
62 ser./deser. function for a given typeName!!!\n");
66 pthread_rwlock_wrlock(&d->subscriptions.lock);
67 // join to multicast group
68 if (IN_MULTICAST(multicastIPAddress)) {
69 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
72 mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
73 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
74 if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
75 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
76 debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
77 IPAddressToString(multicastIPAddress,sIPAddress));
80 //generate new guid of publisher
81 d->subscriptions.counter++;
82 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
83 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
84 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
85 memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
86 strcpy((char *)sp->topic,topic);
87 strcpy((char *)sp->typeName,typeName);
88 sp->deadline=*deadline;
89 sp->minimumSeparation=*minimumSeparation;
90 sp->multicast=multicastIPAddress;
93 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
96 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
100 //insert object to structure objectEntry
101 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
102 objectEntryOID->privateCreated=ORTE_TRUE;
103 objectEntryOID->instance=instance;
104 objectEntryOID->recvCallBack=recvCallBack;
105 objectEntryOID->callBackParam=recvCallBackParam;
106 //create writerSubscription
107 cstReaderParams.delayResponceTimeMin=zNtpTime;
108 cstReaderParams.delayResponceTimeMax=zNtpTime;
109 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
110 cstReaderParams.repeatActiveQueryTime=iNtpTime;
111 cstReaderParams.fullAcknowledge=ORTE_FALSE;
112 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
113 &typeNode->typeRegister);
114 //insert cstWriter to list of subscriberes
115 CSTReader_insert(&d->subscriptions,cstReader);
116 //generate csChange for writerSubscriberes
117 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
118 csChange=(CSChange*)MALLOC(sizeof(CSChange));
119 parameterUpdateCSChangeFromSubscription(csChange,sp);
121 csChange->alive=ORTE_TRUE;
122 CDR_codec_init_static(&csChange->cdrCodec);
123 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
124 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
125 pthread_rwlock_unlock(&d->subscriptions.lock);
126 pthread_rwlock_unlock(&d->typeEntry.lock);
127 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
128 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
133 /*****************************************************************************/
135 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
138 if (!cstReader) return ORTE_BAD_HANDLE;
139 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
140 csChange=(CSChange*)MALLOC(sizeof(CSChange));
141 CSChangeAttributes_init_head(csChange);
142 csChange->guid=cstReader->guid;
143 csChange->alive=ORTE_FALSE;
144 csChange->cdrCodec.buffer=NULL;
145 CSTWriterAddCSChange(cstReader->domain,
146 &cstReader->domain->writerSubscriptions,
148 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
152 /*****************************************************************************/
154 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
156 if (!cstReader) return ORTE_BAD_HANDLE;
157 //generate csChange for writerSubscriptions
158 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
159 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
160 pthread_rwlock_wrlock(&cstReader->lock);
161 r=ORTESubscriptionDestroyLocked(cstReader);
162 pthread_rwlock_unlock(&cstReader->lock);
163 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
164 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
169 /*****************************************************************************/
171 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
172 if (!cstReader) return ORTE_BAD_HANDLE;
173 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
174 pthread_rwlock_rdlock(&cstReader->lock);
175 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
176 pthread_rwlock_unlock(&cstReader->lock);
177 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
181 /*****************************************************************************/
183 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
186 if (!cstReader) return ORTE_BAD_HANDLE;
187 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
188 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
189 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
190 pthread_rwlock_rdlock(&cstReader->lock);
191 csChange=(CSChange*)MALLOC(sizeof(CSChange));
192 parameterUpdateCSChangeFromSubscription(csChange,sp);
193 csChange->guid=cstReader->guid;
194 csChange->alive=ORTE_TRUE;
195 csChange->cdrCodec.buffer=NULL;
196 CSTWriterAddCSChange(cstReader->domain,
197 &cstReader->domain->writerSubscriptions,csChange);
198 pthread_rwlock_unlock(&cstReader->lock);
199 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
200 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
201 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
205 /*****************************************************************************/
207 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
208 unsigned int retries,unsigned int noPublications) {
209 unsigned int wPublications;
212 if (!cstReader) return ORTE_BAD_HANDLE;
213 NtpTimeDisAssembToMs(sec,ms,wait);
215 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
216 pthread_rwlock_rdlock(&cstReader->lock);
217 wPublications=cstReader->cstRemoteWriterCounter;
218 pthread_rwlock_unlock(&cstReader->lock);
219 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
220 if (wPublications>=noPublications)
222 ORTESleepMs(sec*1000+ms);
227 /*****************************************************************************/
229 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
232 if (!cstReader) return ORTE_BAD_HANDLE;
233 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
234 pthread_rwlock_rdlock(&cstReader->lock);
235 status->strict=cstReader->strictReliableCounter;
236 status->bestEffort=cstReader->bestEffortsCounter;
238 ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
240 pthread_rwlock_unlock(&cstReader->lock);
241 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
245 /*****************************************************************************/
247 ORTESubscriptionPull(ORTESubscription *cstReader) {
252 if (!cstReader) return ORTE_BAD_HANDLE;
253 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
254 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
255 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
256 pthread_rwlock_wrlock(&cstReader->lock);
257 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
258 if ((sp->mode==PULLED) &&
259 (cstReader->objectEntryOID->recvCallBack)) {
262 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
263 memset(&info,0,sizeof(info));
264 info.status=DEADLINE;
265 info.topic=(char*)sp->topic;
266 info.type=(char*)sp->typeName;
267 cstReader->objectEntryOID->recvCallBack(&info,
268 cstReader->objectEntryOID->instance,
269 cstReader->objectEntryOID->callBackParam);
271 (getActualNtpTime()),
273 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
275 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
277 pthread_rwlock_unlock(&cstReader->lock);
278 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
279 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
280 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
285 /*****************************************************************************/
287 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
288 return cstReader->objectEntryOID->instance;