]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
Migration to new version of OMK system.
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        subscriptions, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTESubscription * 
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33     void *recvCallBackParam, IPAddress multicastIPAddress) {
34   GUID_RTPS             guid;
35   CSTReader             *cstReader;
36   CSTReaderParams       cstReaderParams;
37   ORTESubsProp          *sp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43   if (!cstReader) return NULL;
44   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46   pthread_rwlock_rdlock(&d->typeEntry.lock);    
47   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48     pthread_rwlock_unlock(&d->typeEntry.lock);    
49     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52             ser./deser. function for a given typeName!!!\n");
53     return NULL;
54   }  
55   pthread_rwlock_wrlock(&d->subscriptions.lock);
56   // join to multicast group
57   if (IN_MULTICAST(multicastIPAddress)) {
58     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
59     struct ip_mreq mreq;
60
61     mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
62     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
63     if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
64           IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
65         debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
66                       IPAddressToString(multicastIPAddress,sIPAddress));
67     }
68   }
69   //generate new guid of publisher
70   d->subscriptions.counter++;
71   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
72   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
73   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
74   memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
75   strcpy(sp->topic,topic);
76   strcpy(sp->typeName,typeName);
77   sp->deadline=*deadline;
78   sp->minimumSeparation=*minimumSeparation;
79   sp->multicast=multicastIPAddress;
80   switch (sType) {
81     case BEST_EFFORTS:
82       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
83       break;
84     case STRICT_RELIABLE:
85       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
86       break;
87   }
88   sp->mode=mode;
89   //insert object to structure objectEntry
90   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
91   objectEntryOID->privateCreated=ORTE_TRUE;
92   objectEntryOID->instance=instance;
93   objectEntryOID->recvCallBack=recvCallBack;
94   objectEntryOID->callBackParam=recvCallBackParam;
95   //create writerSubscription
96   cstReaderParams.delayResponceTimeMin=zNtpTime;
97   cstReaderParams.delayResponceTimeMax=zNtpTime;
98   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
99   cstReaderParams.repeatActiveQueryTime=iNtpTime;
100   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
101   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
102                 &typeNode->typeRegister);
103   //insert cstWriter to list of subscriberes
104   CSTReader_insert(&d->subscriptions,cstReader);
105   //generate csChange for writerSubscriberes
106   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
107   csChange=(CSChange*)MALLOC(sizeof(CSChange));
108   parameterUpdateCSChangeFromSubscription(csChange,sp);
109   csChange->guid=guid;
110   csChange->alive=ORTE_TRUE;
111   CDR_codec_init_static(&csChange->cdrCodec);
112   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
113   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
114   pthread_rwlock_unlock(&d->subscriptions.lock);
115   pthread_rwlock_unlock(&d->typeEntry.lock);    
116   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
117   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
118
119   return cstReader;
120 }
121
122 /*****************************************************************************/
123 int
124 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
125   CSChange              *csChange;
126   
127   if (!cstReader) return ORTE_BAD_HANDLE;
128   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
129   csChange=(CSChange*)MALLOC(sizeof(CSChange));
130   CSChangeAttributes_init_head(csChange);
131   csChange->guid=cstReader->guid;
132   csChange->alive=ORTE_FALSE;
133   csChange->cdrCodec.buffer=NULL;
134   CSTWriterAddCSChange(cstReader->domain,
135                        &cstReader->domain->writerSubscriptions,
136                        csChange);
137   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
138   return ORTE_OK;
139 }
140
141 /*****************************************************************************/
142 int
143 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
144   int r;
145   if (!cstReader) return ORTE_BAD_HANDLE;
146   //generate csChange for writerSubscriptions
147   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
148   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
149   pthread_rwlock_wrlock(&cstReader->lock);
150   r=ORTESubscriptionDestroyLocked(cstReader);
151   pthread_rwlock_unlock(&cstReader->lock);
152   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
153   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
154   return r;
155 }
156
157
158 /*****************************************************************************/
159 int
160 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
161   if (!cstReader) return ORTE_BAD_HANDLE;
162   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
163   pthread_rwlock_rdlock(&cstReader->lock);
164   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
165   pthread_rwlock_unlock(&cstReader->lock);
166   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
167   return ORTE_OK;
168 }
169
170 /*****************************************************************************/
171 int
172 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
173   CSChange              *csChange;
174
175   if (!cstReader) return ORTE_BAD_HANDLE;
176   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
177   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
178   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
179   pthread_rwlock_rdlock(&cstReader->lock);
180   csChange=(CSChange*)MALLOC(sizeof(CSChange));
181   parameterUpdateCSChangeFromSubscription(csChange,sp);
182   csChange->guid=cstReader->guid;
183   csChange->alive=ORTE_TRUE;
184   csChange->cdrCodec.buffer=NULL;
185   CSTWriterAddCSChange(cstReader->domain,
186       &cstReader->domain->writerSubscriptions,csChange);
187   pthread_rwlock_unlock(&cstReader->lock);
188   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
189   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
190   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
191   return ORTE_OK;
192 }
193
194 /*****************************************************************************/
195 int
196 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
197     unsigned int retries,unsigned int noPublications) {
198   unsigned int wPublications;
199   uint32_t sec,ms;
200
201   if (!cstReader) return ORTE_BAD_HANDLE;
202   NtpTimeDisAssembToMs(sec,ms,wait);
203   do {
204     pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
205     pthread_rwlock_rdlock(&cstReader->lock);
206     wPublications=cstReader->cstRemoteWriterCounter;
207     pthread_rwlock_unlock(&cstReader->lock);
208     pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
209     if (wPublications>=noPublications)
210       return ORTE_OK;
211     ORTESleepMs(sec*1000+ms);
212   } while (retries--);
213   return ORTE_TIMEOUT;  
214 }
215
216 /*****************************************************************************/
217 int
218 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
219   CSChange *csChange;
220
221   if (!cstReader) return ORTE_BAD_HANDLE;
222   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
223   pthread_rwlock_rdlock(&cstReader->lock);
224   status->strict=cstReader->strictReliableCounter;
225   status->bestEffort=cstReader->bestEffortsCounter;
226   status->issues=0;
227   ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
228     status->issues++;
229   pthread_rwlock_unlock(&cstReader->lock);
230   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
231   return ORTE_OK;
232 }
233
234 /*****************************************************************************/
235 int
236 ORTESubscriptionPull(ORTESubscription *cstReader) {
237   ORTESubsProp         *sp;
238   ORTERecvInfo         info;
239   NtpTime              timeNext;
240   
241   if (!cstReader) return ORTE_BAD_HANDLE;
242   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
243   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
244   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
245   pthread_rwlock_wrlock(&cstReader->lock);
246   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
247   if ((sp->mode==PULLED) && 
248       (cstReader->objectEntryOID->recvCallBack)) {
249     if (NtpTimeCmp(
250           getActualNtpTime(),
251           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
252       memset(&info,0,sizeof(info));
253       info.status=DEADLINE;
254       info.topic=sp->topic;
255       info.type=sp->typeName;
256       cstReader->objectEntryOID->recvCallBack(&info,
257           cstReader->objectEntryOID->instance,
258           cstReader->objectEntryOID->callBackParam);
259       NtpTimeAdd(timeNext,
260                 (getActualNtpTime()),
261                 sp->deadline);
262       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
263     }
264     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
265   }
266   pthread_rwlock_unlock(&cstReader->lock);
267   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
268   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
269   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
270   return ORTE_OK;
271 }
272
273
274 /*****************************************************************************/
275 inline void *
276 ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
277   return cstReader->objectEntryOID->instance;
278 }
279