]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
3cbc8596a20b7cea3902b2940291f4110e5e18ee
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        subscriptions, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTESubscription * 
40 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
41     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
42     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
43     void *recvCallBackParam, IPAddress multicastIPAddress) {
44   GUID_RTPS             guid;
45   CSTReader             *cstReader;
46   CSTReaderParams       cstReaderParams;
47   ORTESubsProp          *sp;
48   ObjectEntryOID        *objectEntryOID;   
49   CSChange              *csChange;
50   TypeNode              *typeNode;
51   
52   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
53   if (!cstReader) return NULL;
54   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
55   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
56   pthread_rwlock_rdlock(&d->typeEntry.lock);    
57   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
58     pthread_rwlock_unlock(&d->typeEntry.lock);    
59     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
60     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
61     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
62             ser./deser. function for a given typeName!!!\n");
63     FREE(cstReader);
64     return NULL;
65   }  
66   pthread_rwlock_wrlock(&d->subscriptions.lock);
67   // join to multicast group
68   if (IN_MULTICAST(multicastIPAddress)) {
69     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
70     struct ip_mreq mreq;
71
72     mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
73     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
74     if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
75           IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
76         debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
77                       IPAddressToString(multicastIPAddress,sIPAddress));
78     }
79   }
80   //generate new guid of publisher
81   d->subscriptions.counter++;
82   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
83   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
84   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
85   memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
86   strcpy((char *)sp->topic,topic);
87   strcpy((char *)sp->typeName,typeName);
88   sp->deadline=*deadline;
89   sp->minimumSeparation=*minimumSeparation;
90   sp->multicast=multicastIPAddress;
91   switch (sType) {
92     case BEST_EFFORTS:
93       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
94       break;
95     case STRICT_RELIABLE:
96       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
97       break;
98   }
99   sp->mode=mode;
100   //insert object to structure objectEntry
101   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
102   objectEntryOID->privateCreated=ORTE_TRUE;
103   objectEntryOID->instance=instance;
104   objectEntryOID->recvCallBack=recvCallBack;
105   objectEntryOID->callBackParam=recvCallBackParam;
106   //create writerSubscription
107   cstReaderParams.delayResponceTimeMin=zNtpTime;
108   cstReaderParams.delayResponceTimeMax=zNtpTime;
109   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
110   cstReaderParams.repeatActiveQueryTime=iNtpTime;
111   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
112   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
113                 &typeNode->typeRegister);
114   //insert cstWriter to list of subscriberes
115   CSTReader_insert(&d->subscriptions,cstReader);
116   //generate csChange for writerSubscriberes
117   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
118   csChange=(CSChange*)MALLOC(sizeof(CSChange));
119   parameterUpdateCSChangeFromSubscription(csChange,sp);
120   csChange->guid=guid;
121   csChange->alive=ORTE_TRUE;
122   CDR_codec_init_static(&csChange->cdrCodec);
123   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
124   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
125   pthread_rwlock_unlock(&d->subscriptions.lock);
126   pthread_rwlock_unlock(&d->typeEntry.lock);    
127   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
128   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
129
130   return cstReader;
131 }
132
133 /*****************************************************************************/
134 int
135 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
136   CSChange              *csChange;
137   
138   if (!cstReader) return ORTE_BAD_HANDLE;
139   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
140   csChange=(CSChange*)MALLOC(sizeof(CSChange));
141   CSChangeAttributes_init_head(csChange);
142   csChange->guid=cstReader->guid;
143   csChange->alive=ORTE_FALSE;
144   csChange->cdrCodec.buffer=NULL;
145   CSTWriterAddCSChange(cstReader->domain,
146                        &cstReader->domain->writerSubscriptions,
147                        csChange);
148   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
149   return ORTE_OK;
150 }
151
152 /*****************************************************************************/
153 int
154 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
155   int r;
156   if (!cstReader) return ORTE_BAD_HANDLE;
157   //generate csChange for writerSubscriptions
158   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
159   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
160   pthread_rwlock_wrlock(&cstReader->lock);
161   r=ORTESubscriptionDestroyLocked(cstReader);
162   pthread_rwlock_unlock(&cstReader->lock);
163   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
164   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
165   return r;
166 }
167
168
169 /*****************************************************************************/
170 int
171 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
172   if (!cstReader) return ORTE_BAD_HANDLE;
173   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
174   pthread_rwlock_rdlock(&cstReader->lock);
175   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
176   pthread_rwlock_unlock(&cstReader->lock);
177   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
178   return ORTE_OK;
179 }
180
181 /*****************************************************************************/
182 int
183 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
184   CSChange              *csChange;
185
186   if (!cstReader) return ORTE_BAD_HANDLE;
187   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
188   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
189   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
190   pthread_rwlock_rdlock(&cstReader->lock);
191   csChange=(CSChange*)MALLOC(sizeof(CSChange));
192   parameterUpdateCSChangeFromSubscription(csChange,sp);
193   csChange->guid=cstReader->guid;
194   csChange->alive=ORTE_TRUE;
195   csChange->cdrCodec.buffer=NULL;
196   CSTWriterAddCSChange(cstReader->domain,
197       &cstReader->domain->writerSubscriptions,csChange);
198   pthread_rwlock_unlock(&cstReader->lock);
199   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
200   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
201   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
202   return ORTE_OK;
203 }
204
205 /*****************************************************************************/
206 int
207 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
208     unsigned int retries,unsigned int noPublications) {
209   unsigned int wPublications;
210   uint32_t sec,ms;
211
212   if (!cstReader) return ORTE_BAD_HANDLE;
213   NtpTimeDisAssembToMs(sec,ms,wait);
214   do {
215     pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
216     pthread_rwlock_rdlock(&cstReader->lock);
217     wPublications=cstReader->cstRemoteWriterCounter;
218     pthread_rwlock_unlock(&cstReader->lock);
219     pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
220     if (wPublications>=noPublications)
221       return ORTE_OK;
222     ORTESleepMs(sec*1000+ms);
223   } while (retries--);
224   return ORTE_TIMEOUT;  
225 }
226
227 /*****************************************************************************/
228 int
229 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
230   CSChange *csChange;
231
232   if (!cstReader) return ORTE_BAD_HANDLE;
233   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
234   pthread_rwlock_rdlock(&cstReader->lock);
235   status->strict=cstReader->strictReliableCounter;
236   status->bestEffort=cstReader->bestEffortsCounter;
237   status->issues=0;
238   ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
239     status->issues++;
240   pthread_rwlock_unlock(&cstReader->lock);
241   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
242   return ORTE_OK;
243 }
244
245 /*****************************************************************************/
246 int
247 ORTESubscriptionPull(ORTESubscription *cstReader) {
248   ORTESubsProp         *sp;
249   ORTERecvInfo         info;
250   NtpTime              timeNext;
251   
252   if (!cstReader) return ORTE_BAD_HANDLE;
253   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
254   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
255   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
256   pthread_rwlock_wrlock(&cstReader->lock);
257   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
258   if ((sp->mode==PULLED) && 
259       (cstReader->objectEntryOID->recvCallBack)) {
260     if (NtpTimeCmp(
261           getActualNtpTime(),
262           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
263       memset(&info,0,sizeof(info));
264       info.status=DEADLINE;
265       info.topic=(char*)sp->topic;
266       info.type=(char*)sp->typeName;
267       cstReader->objectEntryOID->recvCallBack(&info,
268           cstReader->objectEntryOID->instance,
269           cstReader->objectEntryOID->callBackParam);
270       NtpTimeAdd(timeNext,
271                 (getActualNtpTime()),
272                 sp->deadline);
273       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
274     }
275     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
276   }
277   pthread_rwlock_unlock(&cstReader->lock);
278   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
279   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
280   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
281   return ORTE_OK;
282 }
283
284
285 /*****************************************************************************/
286 inline void *
287 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
288   return cstReader->objectEntryOID->instance;
289 }
290