2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
35 PSEntry, ObjectEntryOID, GUID_RTPS,
36 subscriptions, psNode, guid, gavl_cmp_guid);
38 /*****************************************************************************/
40 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
41 const char *topic,const char *typeName,void *instance,NtpTime *deadline,
42 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
43 void *recvCallBackParam, IPAddress multicastIPAddress) {
46 CSTReaderParams cstReaderParams;
48 ObjectEntryOID *objectEntryOID;
52 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
53 if (!cstReader) return NULL;
54 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
55 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
56 pthread_rwlock_rdlock(&d->typeEntry.lock);
57 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
58 pthread_rwlock_unlock(&d->typeEntry.lock);
59 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
60 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
61 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
62 ser./deser. function for a given typeName!!!\n");
65 pthread_rwlock_wrlock(&d->subscriptions.lock);
66 // join to multicast group
67 if (IN_MULTICAST(multicastIPAddress)) {
68 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
71 mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
72 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
73 if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
74 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
75 debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
76 IPAddressToString(multicastIPAddress,sIPAddress));
79 //generate new guid of publisher
80 d->subscriptions.counter++;
81 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
82 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
83 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
84 memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
85 strcpy((char *)sp->topic,topic);
86 strcpy((char *)sp->typeName,typeName);
87 sp->deadline=*deadline;
88 sp->minimumSeparation=*minimumSeparation;
89 sp->multicast=multicastIPAddress;
92 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
95 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
99 //insert object to structure objectEntry
100 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
101 objectEntryOID->privateCreated=ORTE_TRUE;
102 objectEntryOID->instance=instance;
103 objectEntryOID->recvCallBack=recvCallBack;
104 objectEntryOID->callBackParam=recvCallBackParam;
105 //create writerSubscription
106 cstReaderParams.delayResponceTimeMin=zNtpTime;
107 cstReaderParams.delayResponceTimeMax=zNtpTime;
108 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
109 cstReaderParams.repeatActiveQueryTime=iNtpTime;
110 cstReaderParams.fullAcknowledge=ORTE_FALSE;
111 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
112 &typeNode->typeRegister);
113 //insert cstWriter to list of subscriberes
114 CSTReader_insert(&d->subscriptions,cstReader);
115 //generate csChange for writerSubscriberes
116 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
117 csChange=(CSChange*)MALLOC(sizeof(CSChange));
118 parameterUpdateCSChangeFromSubscription(csChange,sp);
120 csChange->alive=ORTE_TRUE;
121 CDR_codec_init_static(&csChange->cdrCodec);
122 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
123 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
124 pthread_rwlock_unlock(&d->subscriptions.lock);
125 pthread_rwlock_unlock(&d->typeEntry.lock);
126 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
127 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
132 /*****************************************************************************/
134 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
137 if (!cstReader) return ORTE_BAD_HANDLE;
138 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
139 csChange=(CSChange*)MALLOC(sizeof(CSChange));
140 CSChangeAttributes_init_head(csChange);
141 csChange->guid=cstReader->guid;
142 csChange->alive=ORTE_FALSE;
143 csChange->cdrCodec.buffer=NULL;
144 CSTWriterAddCSChange(cstReader->domain,
145 &cstReader->domain->writerSubscriptions,
147 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
151 /*****************************************************************************/
153 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
155 if (!cstReader) return ORTE_BAD_HANDLE;
156 //generate csChange for writerSubscriptions
157 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
158 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
159 pthread_rwlock_wrlock(&cstReader->lock);
160 r=ORTESubscriptionDestroyLocked(cstReader);
161 pthread_rwlock_unlock(&cstReader->lock);
162 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
163 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
168 /*****************************************************************************/
170 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
171 if (!cstReader) return ORTE_BAD_HANDLE;
172 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
173 pthread_rwlock_rdlock(&cstReader->lock);
174 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
175 pthread_rwlock_unlock(&cstReader->lock);
176 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
180 /*****************************************************************************/
182 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
185 if (!cstReader) return ORTE_BAD_HANDLE;
186 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
187 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
188 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
189 pthread_rwlock_rdlock(&cstReader->lock);
190 csChange=(CSChange*)MALLOC(sizeof(CSChange));
191 parameterUpdateCSChangeFromSubscription(csChange,sp);
192 csChange->guid=cstReader->guid;
193 csChange->alive=ORTE_TRUE;
194 csChange->cdrCodec.buffer=NULL;
195 CSTWriterAddCSChange(cstReader->domain,
196 &cstReader->domain->writerSubscriptions,csChange);
197 pthread_rwlock_unlock(&cstReader->lock);
198 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
199 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
200 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
204 /*****************************************************************************/
206 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
207 unsigned int retries,unsigned int noPublications) {
208 unsigned int wPublications;
211 if (!cstReader) return ORTE_BAD_HANDLE;
212 NtpTimeDisAssembToMs(sec,ms,wait);
214 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
215 pthread_rwlock_rdlock(&cstReader->lock);
216 wPublications=cstReader->cstRemoteWriterCounter;
217 pthread_rwlock_unlock(&cstReader->lock);
218 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
219 if (wPublications>=noPublications)
221 ORTESleepMs(sec*1000+ms);
226 /*****************************************************************************/
228 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
231 if (!cstReader) return ORTE_BAD_HANDLE;
232 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
233 pthread_rwlock_rdlock(&cstReader->lock);
234 status->strict=cstReader->strictReliableCounter;
235 status->bestEffort=cstReader->bestEffortsCounter;
237 ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
239 pthread_rwlock_unlock(&cstReader->lock);
240 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
244 /*****************************************************************************/
246 ORTESubscriptionPull(ORTESubscription *cstReader) {
251 if (!cstReader) return ORTE_BAD_HANDLE;
252 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
253 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
254 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
255 pthread_rwlock_wrlock(&cstReader->lock);
256 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
257 if ((sp->mode==PULLED) &&
258 (cstReader->objectEntryOID->recvCallBack)) {
261 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
262 memset(&info,0,sizeof(info));
263 info.status=DEADLINE;
264 info.topic=(char*)sp->topic;
265 info.type=(char*)sp->typeName;
266 cstReader->objectEntryOID->recvCallBack(&info,
267 cstReader->objectEntryOID->instance,
268 cstReader->objectEntryOID->callBackParam);
270 (getActualNtpTime()),
272 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
274 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
276 pthread_rwlock_unlock(&cstReader->lock);
277 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
278 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
279 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
284 /*****************************************************************************/
286 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
287 return cstReader->objectEntryOID->instance;