2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 subscriptions, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31 char *topic,char *typeName,void *instance,NtpTime *deadline,
32 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33 void *recvCallBackParam) {
36 CSTReaderParams cstReaderParams;
38 ObjectEntryOID *objectEntryOID;
42 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43 if (!cstReader) return NULL;
44 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46 pthread_rwlock_rdlock(&d->typeEntry.lock);
47 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48 pthread_rwlock_unlock(&d->typeEntry.lock);
49 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52 ser./deser. function for a given typeName!!!\n");
55 pthread_rwlock_wrlock(&d->subscriptions.lock);
56 //generate new guid of publisher
57 d->subscriptions.counter++;
58 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
60 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
61 memcpy(sp,&d->subsPropDefault,sizeof(ORTESubsProp));
62 strcpy(sp->topic,topic);
63 strcpy(sp->typeName,typeName);
64 sp->deadline=*deadline;
65 sp->minimumSeparation=*minimumSeparation;
68 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
71 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
75 //insert object to structure objectEntry
76 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
77 objectEntryOID->private=ORTE_TRUE;
78 objectEntryOID->instance=instance;
79 objectEntryOID->recvCallBack=recvCallBack;
80 objectEntryOID->callBackParam=recvCallBackParam;
81 //create writerSubscription
82 cstReaderParams.delayResponceTimeMin=zNtpTime;
83 cstReaderParams.delayResponceTimeMax=zNtpTime;
84 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
85 cstReaderParams.repeatActiveQueryTime=iNtpTime;
86 cstReaderParams.fullAcknowledge=ORTE_FALSE;
87 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
88 &typeNode->typeRegister);
89 //insert cstWriter to list of subscriberes
90 CSTReader_insert(&d->subscriptions,cstReader);
91 //generate csChange for writerSubscriberes
92 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
93 csChange=(CSChange*)MALLOC(sizeof(CSChange));
94 parameterUpdateCSChangeFromSubscription(csChange,sp);
96 csChange->alive=ORTE_TRUE;
97 csChange->cdrStream.buffer=NULL;
98 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
99 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
100 pthread_rwlock_unlock(&d->subscriptions.lock);
101 pthread_rwlock_unlock(&d->typeEntry.lock);
102 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
103 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
107 /*****************************************************************************/
109 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
112 if (!cstReader) return -1;
113 //generate csChange for writerSubscriptions
114 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
115 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
116 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
117 csChange=(CSChange*)MALLOC(sizeof(CSChange));
118 CSChangeAttributes_init_head(csChange);
119 csChange->guid=cstReader->guid;
120 csChange->alive=ORTE_FALSE;
121 csChange->cdrStream.buffer=NULL;
122 CSTWriterAddCSChange(cstReader->domain,
123 &cstReader->domain->writerSubscriptions,
125 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
126 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
127 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
132 /*****************************************************************************/
134 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
135 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
136 pthread_rwlock_rdlock(&cstReader->lock);
137 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
138 pthread_rwlock_unlock(&cstReader->lock);
139 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
143 /*****************************************************************************/
145 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
148 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
149 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
150 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
151 pthread_rwlock_rdlock(&cstReader->lock);
152 csChange=(CSChange*)MALLOC(sizeof(CSChange));
153 parameterUpdateCSChangeFromSubscription(csChange,sp);
154 csChange->guid=cstReader->guid;
155 csChange->alive=ORTE_TRUE;
156 csChange->cdrStream.buffer=NULL;
157 CSTWriterAddCSChange(cstReader->domain,
158 &cstReader->domain->writerSubscriptions,csChange);
159 pthread_rwlock_unlock(&cstReader->lock);
160 pthread_rwlock_unlock(&cstReader->domain->subscriptions.lock);
161 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
162 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
166 /*****************************************************************************/
168 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
169 u_int32_t retries,u_int32_t noPublications) {
173 /*****************************************************************************/
175 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
179 /*****************************************************************************/
181 ORTESubscriptionPull(ORTESubscription *cstReader) {
186 if (!cstReader) return -1;
187 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
188 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
189 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
190 pthread_rwlock_wrlock(&cstReader->lock);
191 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
192 if (sp->mode==PULLED) {
195 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
196 memset(&info,0,sizeof(info));
197 info.status=DEADLINE;
198 info.topic=sp->topic;
199 info.type=sp->typeName;
200 cstReader->objectEntryOID->recvCallBack(&info,
201 cstReader->objectEntryOID->instance,
202 cstReader->objectEntryOID->callBackParam);
204 (getActualNtpTime()),
206 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
208 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
210 pthread_rwlock_unlock(&cstReader->lock);
211 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
212 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
213 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);