2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 subscriptions, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31 const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33 void *recvCallBackParam) {
36 CSTReaderParams cstReaderParams;
38 ObjectEntryOID *objectEntryOID;
42 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43 if (!cstReader) return NULL;
44 pthread_rwlock_rdlock(&d->typeEntry.lock);
45 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
46 pthread_rwlock_unlock(&d->typeEntry.lock);
47 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
48 ser./deser. function for a given typeName!!!\n");
51 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
52 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
53 pthread_rwlock_wrlock(&d->subscriptions.lock);
54 //generate new guid of publisher
55 d->subscriptions.counter++;
56 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
57 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
58 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
59 memcpy(sp,&d->subsPropDefault,sizeof(ORTESubsProp));
60 strcpy(sp->topic,topic);
61 strcpy(sp->typeName,typeName);
62 sp->deadline=*deadline;
63 sp->minimumSeparation=*minimumSeparation;
66 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
69 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
73 //insert object to structure objectEntry
74 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
75 objectEntryOID->private=ORTE_TRUE;
76 objectEntryOID->instance=instance;
77 objectEntryOID->recvCallBack=recvCallBack;
78 objectEntryOID->callBackParam=recvCallBackParam;
79 //create writerSubscription
80 cstReaderParams.delayResponceTimeMin=zNtpTime;
81 cstReaderParams.delayResponceTimeMax=zNtpTime;
82 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
83 cstReaderParams.repeatActiveQueryTime=iNtpTime;
84 cstReaderParams.fullAcknowledge=ORTE_FALSE;
85 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
86 &typeNode->typeRegister);
87 //insert cstWriter to list of subscriberes
88 CSTReader_insert(&d->subscriptions,cstReader);
89 //generate csChange for writerSubscriberes
90 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
91 csChange=(CSChange*)MALLOC(sizeof(CSChange));
92 parameterUpdateCSChangeFromSubscription(csChange,sp);
94 csChange->alive=ORTE_TRUE;
95 csChange->cdrStream.buffer=NULL;
96 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
97 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
98 pthread_rwlock_unlock(&d->subscriptions.lock);
99 pthread_rwlock_unlock(&d->typeEntry.lock);
100 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
101 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
105 /*****************************************************************************/
107 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
110 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
111 csChange=(CSChange*)MALLOC(sizeof(CSChange));
112 CSChangeAttributes_init_head(csChange);
113 csChange->guid=cstReader->guid;
114 csChange->alive=ORTE_FALSE;
115 csChange->cdrStream.buffer=NULL;
116 CSTWriterAddCSChange(cstReader->domain,
117 &cstReader->domain->writerSubscriptions,
119 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
123 /*****************************************************************************/
125 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
127 if (!cstReader) return -1;
128 //generate csChange for writerSubscriptions
129 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
130 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
131 r=ORTESubscriptionDestroyLocked(cstReader);
132 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
133 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
138 /*****************************************************************************/
140 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
141 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
142 pthread_rwlock_rdlock(&cstReader->lock);
143 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
144 pthread_rwlock_unlock(&cstReader->lock);
145 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
149 /*****************************************************************************/
151 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
154 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
155 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
156 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
157 pthread_rwlock_rdlock(&cstReader->lock);
158 csChange=(CSChange*)MALLOC(sizeof(CSChange));
159 parameterUpdateCSChangeFromSubscription(csChange,sp);
160 csChange->guid=cstReader->guid;
161 csChange->alive=ORTE_TRUE;
162 csChange->cdrStream.buffer=NULL;
163 CSTWriterAddCSChange(cstReader->domain,
164 &cstReader->domain->writerSubscriptions,csChange);
165 pthread_rwlock_unlock(&cstReader->lock);
166 pthread_rwlock_unlock(&cstReader->domain->subscriptions.lock);
167 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
168 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
172 /*****************************************************************************/
174 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
175 u_int32_t retries,u_int32_t noPublications) {
179 /*****************************************************************************/
181 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
185 /*****************************************************************************/
187 ORTESubscriptionPull(ORTESubscription *cstReader) {
192 if (!cstReader) return -1;
193 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
194 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
195 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
196 pthread_rwlock_wrlock(&cstReader->lock);
197 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
198 if (sp->mode==PULLED) {
201 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
202 memset(&info,0,sizeof(info));
203 info.status=DEADLINE;
204 info.topic=sp->topic;
205 info.type=sp->typeName;
206 cstReader->objectEntryOID->recvCallBack(&info,
207 cstReader->objectEntryOID->instance,
208 cstReader->objectEntryOID->callBackParam);
210 (getActualNtpTime()),
212 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
214 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
216 pthread_rwlock_unlock(&cstReader->lock);
217 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
218 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
219 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);