]> rtime.felk.cvut.cz Git - orte/eurobot.git/blob - orte/liborte/ORTESubscription.c
f5c00b343db58e070aa954a40682e6ef77b9f7b5
[orte/eurobot.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
35                        PSEntry, ObjectEntryOID, GUID_RTPS,
36                        subscriptions, psNode, guid, gavl_cmp_guid);
37
38 /*****************************************************************************/
39 ORTESubscription * 
40 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
41     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
42     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
43     void *recvCallBackParam, IPAddress multicastIPAddress) {
44   GUID_RTPS             guid;
45   CSTReader             *cstReader;
46   CSTReaderParams       cstReaderParams;
47   ORTESubsProp          *sp;
48   ObjectEntryOID        *objectEntryOID;   
49   CSChange              *csChange;
50   TypeNode              *typeNode;
51   
52   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
53   if (!cstReader) return NULL;
54   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
55   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
56   pthread_rwlock_rdlock(&d->typeEntry.lock);    
57   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
58     pthread_rwlock_unlock(&d->typeEntry.lock);    
59     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
60     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
61     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
62             ser./deser. function for a given typeName!!!\n");
63     return NULL;
64   }  
65   pthread_rwlock_wrlock(&d->subscriptions.lock);
66   // join to multicast group
67   if (IN_MULTICAST(multicastIPAddress)) {
68     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
69     struct ip_mreq mreq;
70
71     mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
72     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
73     if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
74           IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
75         debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
76                       IPAddressToString(multicastIPAddress,sIPAddress));
77     }
78   }
79   //generate new guid of publisher
80   d->subscriptions.counter++;
81   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
82   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
83   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
84   memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
85   strcpy((char *)sp->topic,topic);
86   strcpy((char *)sp->typeName,typeName);
87   sp->deadline=*deadline;
88   sp->minimumSeparation=*minimumSeparation;
89   sp->multicast=multicastIPAddress;
90   switch (sType) {
91     case BEST_EFFORTS:
92       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
93       break;
94     case STRICT_RELIABLE:
95       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
96       break;
97   }
98   sp->mode=mode;
99   //insert object to structure objectEntry
100   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
101   objectEntryOID->privateCreated=ORTE_TRUE;
102   objectEntryOID->instance=instance;
103   objectEntryOID->recvCallBack=recvCallBack;
104   objectEntryOID->callBackParam=recvCallBackParam;
105   //create writerSubscription
106   cstReaderParams.delayResponceTimeMin=zNtpTime;
107   cstReaderParams.delayResponceTimeMax=zNtpTime;
108   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
109   cstReaderParams.repeatActiveQueryTime=iNtpTime;
110   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
111   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
112                 &typeNode->typeRegister);
113   //insert cstWriter to list of subscriberes
114   CSTReader_insert(&d->subscriptions,cstReader);
115   //generate csChange for writerSubscriberes
116   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
117   csChange=(CSChange*)MALLOC(sizeof(CSChange));
118   parameterUpdateCSChangeFromSubscription(csChange,sp);
119   csChange->guid=guid;
120   csChange->alive=ORTE_TRUE;
121   CDR_codec_init_static(&csChange->cdrCodec);
122   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
123   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
124   pthread_rwlock_unlock(&d->subscriptions.lock);
125   pthread_rwlock_unlock(&d->typeEntry.lock);    
126   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
127   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
128
129   return cstReader;
130 }
131
132 /*****************************************************************************/
133 int
134 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
135   CSChange              *csChange;
136   
137   if (!cstReader) return ORTE_BAD_HANDLE;
138   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
139   csChange=(CSChange*)MALLOC(sizeof(CSChange));
140   CSChangeAttributes_init_head(csChange);
141   csChange->guid=cstReader->guid;
142   csChange->alive=ORTE_FALSE;
143   csChange->cdrCodec.buffer=NULL;
144   CSTWriterAddCSChange(cstReader->domain,
145                        &cstReader->domain->writerSubscriptions,
146                        csChange);
147   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
148   return ORTE_OK;
149 }
150
151 /*****************************************************************************/
152 int
153 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
154   int r;
155   if (!cstReader) return ORTE_BAD_HANDLE;
156   //generate csChange for writerSubscriptions
157   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
158   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
159   pthread_rwlock_wrlock(&cstReader->lock);
160   r=ORTESubscriptionDestroyLocked(cstReader);
161   pthread_rwlock_unlock(&cstReader->lock);
162   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
163   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
164   return r;
165 }
166
167
168 /*****************************************************************************/
169 int
170 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
171   if (!cstReader) return ORTE_BAD_HANDLE;
172   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
173   pthread_rwlock_rdlock(&cstReader->lock);
174   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
175   pthread_rwlock_unlock(&cstReader->lock);
176   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
177   return ORTE_OK;
178 }
179
180 /*****************************************************************************/
181 int
182 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
183   CSChange              *csChange;
184
185   if (!cstReader) return ORTE_BAD_HANDLE;
186   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
187   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
188   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
189   pthread_rwlock_rdlock(&cstReader->lock);
190   csChange=(CSChange*)MALLOC(sizeof(CSChange));
191   parameterUpdateCSChangeFromSubscription(csChange,sp);
192   csChange->guid=cstReader->guid;
193   csChange->alive=ORTE_TRUE;
194   csChange->cdrCodec.buffer=NULL;
195   CSTWriterAddCSChange(cstReader->domain,
196       &cstReader->domain->writerSubscriptions,csChange);
197   pthread_rwlock_unlock(&cstReader->lock);
198   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
199   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
200   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
201   return ORTE_OK;
202 }
203
204 /*****************************************************************************/
205 int
206 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
207     unsigned int retries,unsigned int noPublications) {
208   unsigned int wPublications;
209   uint32_t sec,ms;
210
211   if (!cstReader) return ORTE_BAD_HANDLE;
212   NtpTimeDisAssembToMs(sec,ms,wait);
213   do {
214     pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
215     pthread_rwlock_rdlock(&cstReader->lock);
216     wPublications=cstReader->cstRemoteWriterCounter;
217     pthread_rwlock_unlock(&cstReader->lock);
218     pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
219     if (wPublications>=noPublications)
220       return ORTE_OK;
221     ORTESleepMs(sec*1000+ms);
222   } while (retries--);
223   return ORTE_TIMEOUT;  
224 }
225
226 /*****************************************************************************/
227 int
228 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
229   CSChange *csChange;
230
231   if (!cstReader) return ORTE_BAD_HANDLE;
232   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
233   pthread_rwlock_rdlock(&cstReader->lock);
234   status->strict=cstReader->strictReliableCounter;
235   status->bestEffort=cstReader->bestEffortsCounter;
236   status->issues=0;
237   ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
238     status->issues++;
239   pthread_rwlock_unlock(&cstReader->lock);
240   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
241   return ORTE_OK;
242 }
243
244 /*****************************************************************************/
245 int
246 ORTESubscriptionPull(ORTESubscription *cstReader) {
247   ORTESubsProp         *sp;
248   ORTERecvInfo         info;
249   NtpTime              timeNext;
250   
251   if (!cstReader) return ORTE_BAD_HANDLE;
252   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
253   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
254   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
255   pthread_rwlock_wrlock(&cstReader->lock);
256   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
257   if ((sp->mode==PULLED) && 
258       (cstReader->objectEntryOID->recvCallBack)) {
259     if (NtpTimeCmp(
260           getActualNtpTime(),
261           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
262       memset(&info,0,sizeof(info));
263       info.status=DEADLINE;
264       info.topic=(char*)sp->topic;
265       info.type=(char*)sp->typeName;
266       cstReader->objectEntryOID->recvCallBack(&info,
267           cstReader->objectEntryOID->instance,
268           cstReader->objectEntryOID->callBackParam);
269       NtpTimeAdd(timeNext,
270                 (getActualNtpTime()),
271                 sp->deadline);
272       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
273     }
274     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
275   }
276   pthread_rwlock_unlock(&cstReader->lock);
277   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
278   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
279   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
280   return ORTE_OK;
281 }
282
283
284 /*****************************************************************************/
285 inline void *
286 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
287   return cstReader->objectEntryOID->instance;
288 }
289