2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 subscriptions, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31 const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33 void *recvCallBackParam) {
36 CSTReaderParams cstReaderParams;
38 ObjectEntryOID *objectEntryOID;
42 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43 if (!cstReader) return NULL;
44 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46 pthread_rwlock_rdlock(&d->typeEntry.lock);
47 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48 pthread_rwlock_unlock(&d->typeEntry.lock);
49 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52 ser./deser. function for a given typeName!!!\n");
55 pthread_rwlock_wrlock(&d->subscriptions.lock);
56 //generate new guid of publisher
57 d->subscriptions.counter++;
58 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
60 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
61 memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
62 strcpy(sp->topic,topic);
63 strcpy(sp->typeName,typeName);
64 sp->deadline=*deadline;
65 sp->minimumSeparation=*minimumSeparation;
68 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
71 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
75 //insert object to structure objectEntry
76 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
77 objectEntryOID->privateCreated=ORTE_TRUE;
78 objectEntryOID->instance=instance;
79 objectEntryOID->recvCallBack=recvCallBack;
80 objectEntryOID->callBackParam=recvCallBackParam;
81 //create writerSubscription
82 cstReaderParams.delayResponceTimeMin=zNtpTime;
83 cstReaderParams.delayResponceTimeMax=zNtpTime;
84 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
85 cstReaderParams.repeatActiveQueryTime=iNtpTime;
86 cstReaderParams.fullAcknowledge=ORTE_FALSE;
87 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
88 &typeNode->typeRegister);
89 //insert cstWriter to list of subscriberes
90 CSTReader_insert(&d->subscriptions,cstReader);
91 //generate csChange for writerSubscriberes
92 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
93 csChange=(CSChange*)MALLOC(sizeof(CSChange));
94 parameterUpdateCSChangeFromSubscription(csChange,sp);
96 csChange->alive=ORTE_TRUE;
97 csChange->cdrStream.buffer=NULL;
98 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
99 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
100 pthread_rwlock_unlock(&d->subscriptions.lock);
101 pthread_rwlock_unlock(&d->typeEntry.lock);
102 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
103 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
107 /*****************************************************************************/
109 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
112 if (!cstReader) return ORTE_BAD_HANDLE;
113 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
114 csChange=(CSChange*)MALLOC(sizeof(CSChange));
115 CSChangeAttributes_init_head(csChange);
116 csChange->guid=cstReader->guid;
117 csChange->alive=ORTE_FALSE;
118 csChange->cdrStream.buffer=NULL;
119 CSTWriterAddCSChange(cstReader->domain,
120 &cstReader->domain->writerSubscriptions,
122 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
126 /*****************************************************************************/
128 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
130 if (!cstReader) return ORTE_BAD_HANDLE;
131 //generate csChange for writerSubscriptions
132 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
133 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
134 pthread_rwlock_wrlock(&cstReader->lock);
135 r=ORTESubscriptionDestroyLocked(cstReader);
136 pthread_rwlock_unlock(&cstReader->lock);
137 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
138 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
143 /*****************************************************************************/
145 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
146 if (!cstReader) return ORTE_BAD_HANDLE;
147 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
148 pthread_rwlock_rdlock(&cstReader->lock);
149 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
150 pthread_rwlock_unlock(&cstReader->lock);
151 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
155 /*****************************************************************************/
157 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
160 if (!cstReader) return ORTE_BAD_HANDLE;
161 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
162 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
163 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
164 pthread_rwlock_rdlock(&cstReader->lock);
165 csChange=(CSChange*)MALLOC(sizeof(CSChange));
166 parameterUpdateCSChangeFromSubscription(csChange,sp);
167 csChange->guid=cstReader->guid;
168 csChange->alive=ORTE_TRUE;
169 csChange->cdrStream.buffer=NULL;
170 CSTWriterAddCSChange(cstReader->domain,
171 &cstReader->domain->writerSubscriptions,csChange);
172 pthread_rwlock_unlock(&cstReader->lock);
173 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
174 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
175 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
179 /*****************************************************************************/
181 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
182 unsigned int retries,unsigned int noPublications) {
183 unsigned int wPublications;
186 if (!cstReader) return ORTE_BAD_HANDLE;
187 NtpTimeDisAssembToMs(sec,ms,wait);
189 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
190 pthread_rwlock_rdlock(&cstReader->lock);
191 wPublications=cstReader->cstRemoteWriterCounter;
192 pthread_rwlock_unlock(&cstReader->lock);
193 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
194 if (wPublications>=noPublications)
196 ORTESleepMs(sec*1000+ms);
201 /*****************************************************************************/
203 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
206 if (!cstReader) return ORTE_BAD_HANDLE;
207 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
208 pthread_rwlock_rdlock(&cstReader->lock);
209 status->strict=cstReader->strictReliableCounter;
210 status->bestEffort=cstReader->bestEffortsCounter;
212 ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
214 pthread_rwlock_unlock(&cstReader->lock);
215 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
219 /*****************************************************************************/
221 ORTESubscriptionPull(ORTESubscription *cstReader) {
226 if (!cstReader) return ORTE_BAD_HANDLE;
227 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
228 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
229 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
230 pthread_rwlock_wrlock(&cstReader->lock);
231 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
232 if ((sp->mode==PULLED) &&
233 (cstReader->objectEntryOID->recvCallBack)) {
236 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
237 memset(&info,0,sizeof(info));
238 info.status=DEADLINE;
239 info.topic=sp->topic;
240 info.type=sp->typeName;
241 cstReader->objectEntryOID->recvCallBack(&info,
242 cstReader->objectEntryOID->instance,
243 cstReader->objectEntryOID->callBackParam);
245 (getActualNtpTime()),
247 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
249 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
251 pthread_rwlock_unlock(&cstReader->lock);
252 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
253 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
254 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);