2 * $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
4 * DEBUG: section 33 Functions working over subscription
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList,
25 PSEntry, ObjectEntryOID, GUID_RTPS,
26 subscriptions, psNode, guid, gavl_cmp_guid);
28 /*****************************************************************************/
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31 const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32 NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33 void *recvCallBackParam, IPAddress multicastIPAddress) {
36 CSTReaderParams cstReaderParams;
38 ObjectEntryOID *objectEntryOID;
42 cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43 if (!cstReader) return NULL;
44 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46 pthread_rwlock_rdlock(&d->typeEntry.lock);
47 if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48 pthread_rwlock_unlock(&d->typeEntry.lock);
49 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51 printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52 ser./deser. function for a given typeName!!!\n");
55 pthread_rwlock_wrlock(&d->subscriptions.lock);
56 // join to multicast group
57 if (IN_MULTICAST(multicastIPAddress)) {
58 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
61 mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
62 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
63 if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
64 IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
65 debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
66 IPAddressToString(multicastIPAddress,sIPAddress));
69 //generate new guid of publisher
70 d->subscriptions.counter++;
71 guid.hid=d->guid.hid;guid.aid=d->guid.aid;
72 guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
73 sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
74 memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
75 strcpy(sp->topic,topic);
76 strcpy(sp->typeName,typeName);
77 sp->deadline=*deadline;
78 sp->minimumSeparation=*minimumSeparation;
79 sp->multicast=multicastIPAddress;
82 sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
85 sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
89 //insert object to structure objectEntry
90 objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
91 objectEntryOID->privateCreated=ORTE_TRUE;
92 objectEntryOID->instance=instance;
93 objectEntryOID->recvCallBack=recvCallBack;
94 objectEntryOID->callBackParam=recvCallBackParam;
95 //create writerSubscription
96 cstReaderParams.delayResponceTimeMin=zNtpTime;
97 cstReaderParams.delayResponceTimeMax=zNtpTime;
98 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
99 cstReaderParams.repeatActiveQueryTime=iNtpTime;
100 cstReaderParams.fullAcknowledge=ORTE_FALSE;
101 CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
102 &typeNode->typeRegister);
103 //insert cstWriter to list of subscriberes
104 CSTReader_insert(&d->subscriptions,cstReader);
105 //generate csChange for writerSubscriberes
106 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
107 csChange=(CSChange*)MALLOC(sizeof(CSChange));
108 parameterUpdateCSChangeFromSubscription(csChange,sp);
110 csChange->alive=ORTE_TRUE;
111 CDR_codec_init_static(&csChange->cdrCodec);
112 CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
113 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
114 pthread_rwlock_unlock(&d->subscriptions.lock);
115 pthread_rwlock_unlock(&d->typeEntry.lock);
116 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
117 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
122 /*****************************************************************************/
124 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
127 if (!cstReader) return ORTE_BAD_HANDLE;
128 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
129 csChange=(CSChange*)MALLOC(sizeof(CSChange));
130 CSChangeAttributes_init_head(csChange);
131 csChange->guid=cstReader->guid;
132 csChange->alive=ORTE_FALSE;
133 csChange->cdrCodec.buffer=NULL;
134 CSTWriterAddCSChange(cstReader->domain,
135 &cstReader->domain->writerSubscriptions,
137 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
141 /*****************************************************************************/
143 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
145 if (!cstReader) return ORTE_BAD_HANDLE;
146 //generate csChange for writerSubscriptions
147 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
148 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
149 pthread_rwlock_wrlock(&cstReader->lock);
150 r=ORTESubscriptionDestroyLocked(cstReader);
151 pthread_rwlock_unlock(&cstReader->lock);
152 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
153 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
158 /*****************************************************************************/
160 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
161 if (!cstReader) return ORTE_BAD_HANDLE;
162 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
163 pthread_rwlock_rdlock(&cstReader->lock);
164 *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
165 pthread_rwlock_unlock(&cstReader->lock);
166 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
170 /*****************************************************************************/
172 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
175 if (!cstReader) return ORTE_BAD_HANDLE;
176 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
177 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
178 pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
179 pthread_rwlock_rdlock(&cstReader->lock);
180 csChange=(CSChange*)MALLOC(sizeof(CSChange));
181 parameterUpdateCSChangeFromSubscription(csChange,sp);
182 csChange->guid=cstReader->guid;
183 csChange->alive=ORTE_TRUE;
184 csChange->cdrCodec.buffer=NULL;
185 CSTWriterAddCSChange(cstReader->domain,
186 &cstReader->domain->writerSubscriptions,csChange);
187 pthread_rwlock_unlock(&cstReader->lock);
188 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
189 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
190 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
194 /*****************************************************************************/
196 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
197 unsigned int retries,unsigned int noPublications) {
198 unsigned int wPublications;
201 if (!cstReader) return ORTE_BAD_HANDLE;
202 NtpTimeDisAssembToMs(sec,ms,wait);
204 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
205 pthread_rwlock_rdlock(&cstReader->lock);
206 wPublications=cstReader->cstRemoteWriterCounter;
207 pthread_rwlock_unlock(&cstReader->lock);
208 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
209 if (wPublications>=noPublications)
211 ORTESleepMs(sec*1000+ms);
216 /*****************************************************************************/
218 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
221 if (!cstReader) return ORTE_BAD_HANDLE;
222 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
223 pthread_rwlock_rdlock(&cstReader->lock);
224 status->strict=cstReader->strictReliableCounter;
225 status->bestEffort=cstReader->bestEffortsCounter;
227 ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
229 pthread_rwlock_unlock(&cstReader->lock);
230 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
234 /*****************************************************************************/
236 ORTESubscriptionPull(ORTESubscription *cstReader) {
241 if (!cstReader) return ORTE_BAD_HANDLE;
242 pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
243 pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
244 pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
245 pthread_rwlock_wrlock(&cstReader->lock);
246 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
247 if ((sp->mode==PULLED) &&
248 (cstReader->objectEntryOID->recvCallBack)) {
251 htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
252 memset(&info,0,sizeof(info));
253 info.status=DEADLINE;
254 info.topic=sp->topic;
255 info.type=sp->typeName;
256 cstReader->objectEntryOID->recvCallBack(&info,
257 cstReader->objectEntryOID->instance,
258 cstReader->objectEntryOID->callBackParam);
260 (getActualNtpTime()),
262 htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
264 CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
266 pthread_rwlock_unlock(&cstReader->lock);
267 pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
268 pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
269 pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
274 /*****************************************************************************/
276 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
277 return cstReader->objectEntryOID->instance;