]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
5f0e041f429424f821374b0f7be81994f32f806c
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        subscriptions, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTESubscription * 
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33     void *recvCallBackParam) {
34   GUID_RTPS             guid;
35   CSTReader             *cstReader;
36   CSTReaderParams       cstReaderParams;
37   ORTESubsProp          *sp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43   if (!cstReader) return NULL;
44   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46   pthread_rwlock_rdlock(&d->typeEntry.lock);    
47   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48     pthread_rwlock_unlock(&d->typeEntry.lock);    
49     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52             ser./deser. function for a given typeName!!!\n");
53     return NULL;
54   }  
55   pthread_rwlock_wrlock(&d->subscriptions.lock);
56   //generate new guid of publisher
57   d->subscriptions.counter++;
58   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
60   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
61   memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
62   strcpy(sp->topic,topic);
63   strcpy(sp->typeName,typeName);
64   sp->deadline=*deadline;
65   sp->minimumSeparation=*minimumSeparation;
66   switch (sType) {
67     case BEST_EFFORTS:
68       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
69       break;
70     case STRICT_RELIABLE:
71       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
72       break;
73   }
74   sp->mode=mode;
75   //insert object to structure objectEntry
76   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
77   objectEntryOID->privateCreated=ORTE_TRUE;
78   objectEntryOID->instance=instance;
79   objectEntryOID->recvCallBack=recvCallBack;
80   objectEntryOID->callBackParam=recvCallBackParam;
81   //create writerSubscription
82   cstReaderParams.delayResponceTimeMin=zNtpTime;
83   cstReaderParams.delayResponceTimeMax=zNtpTime;
84   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
85   cstReaderParams.repeatActiveQueryTime=iNtpTime;
86   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
87   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
88                 &typeNode->typeRegister);
89   //insert cstWriter to list of subscriberes
90   CSTReader_insert(&d->subscriptions,cstReader);
91   //generate csChange for writerSubscriberes
92   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
93   csChange=(CSChange*)MALLOC(sizeof(CSChange));
94   parameterUpdateCSChangeFromSubscription(csChange,sp);
95   csChange->guid=guid;
96   csChange->alive=ORTE_TRUE;
97   csChange->cdrStream.buffer=NULL;
98   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
99   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
100   pthread_rwlock_unlock(&d->subscriptions.lock);
101   pthread_rwlock_unlock(&d->typeEntry.lock);    
102   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
103   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
104   return cstReader;
105 }
106
107 /*****************************************************************************/
108 int
109 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
110   CSChange              *csChange;
111   
112   if (!cstReader) return ORTE_BAD_HANDLE;
113   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
114   csChange=(CSChange*)MALLOC(sizeof(CSChange));
115   CSChangeAttributes_init_head(csChange);
116   csChange->guid=cstReader->guid;
117   csChange->alive=ORTE_FALSE;
118   csChange->cdrStream.buffer=NULL;
119   CSTWriterAddCSChange(cstReader->domain,
120                        &cstReader->domain->writerSubscriptions,
121                        csChange);
122   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
123   return ORTE_OK;
124 }
125
126 /*****************************************************************************/
127 int
128 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
129   int r;
130   if (!cstReader) return ORTE_BAD_HANDLE;
131   //generate csChange for writerSubscriptions
132   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
133   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
134   pthread_rwlock_wrlock(&cstReader->lock);
135   r=ORTESubscriptionDestroyLocked(cstReader);
136   pthread_rwlock_unlock(&cstReader->lock);
137   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
138   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
139   return r;
140 }
141
142
143 /*****************************************************************************/
144 int
145 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
146   if (!cstReader) return ORTE_BAD_HANDLE;
147   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
148   pthread_rwlock_rdlock(&cstReader->lock);
149   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
150   pthread_rwlock_unlock(&cstReader->lock);
151   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
152   return ORTE_OK;
153 }
154
155 /*****************************************************************************/
156 int
157 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
158   CSChange              *csChange;
159
160   if (!cstReader) return ORTE_BAD_HANDLE;
161   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
162   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
163   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
164   pthread_rwlock_rdlock(&cstReader->lock);
165   csChange=(CSChange*)MALLOC(sizeof(CSChange));
166   parameterUpdateCSChangeFromSubscription(csChange,sp);
167   csChange->guid=cstReader->guid;
168   csChange->alive=ORTE_TRUE;
169   csChange->cdrStream.buffer=NULL;
170   CSTWriterAddCSChange(cstReader->domain,
171       &cstReader->domain->writerSubscriptions,csChange);
172   pthread_rwlock_unlock(&cstReader->lock);
173   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
174   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
175   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
176   return ORTE_OK;
177 }
178
179 /*****************************************************************************/
180 int
181 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
182     unsigned int retries,unsigned int noPublications) {
183   unsigned int wPublications;
184   uint32_t sec,ms;
185
186   if (!cstReader) return ORTE_BAD_HANDLE;
187   NtpTimeDisAssembToMs(sec,ms,wait);
188   do {
189     pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
190     pthread_rwlock_rdlock(&cstReader->lock);
191     wPublications=cstReader->cstRemoteWriterCounter;
192     pthread_rwlock_unlock(&cstReader->lock);
193     pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
194     if (wPublications>=noPublications)
195       return ORTE_OK;
196     ORTESleepMs(sec*1000+ms);
197   } while (retries--);
198   return ORTE_TIMEOUT;  
199 }
200
201 /*****************************************************************************/
202 int
203 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
204   CSChange *csChange;
205
206   if (!cstReader) return ORTE_BAD_HANDLE;
207   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
208   pthread_rwlock_rdlock(&cstReader->lock);
209   status->strict=cstReader->strictReliableCounter;
210   status->bestEffort=cstReader->bestEffortsCounter;
211   status->issues=0;
212   ul_list_for_each(CSTReaderCSChange,cstReader,csChange)
213     status->issues++;
214   pthread_rwlock_unlock(&cstReader->lock);
215   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
216   return ORTE_OK;
217 }
218
219 /*****************************************************************************/
220 int
221 ORTESubscriptionPull(ORTESubscription *cstReader) {
222   ORTESubsProp         *sp;
223   ORTERecvInfo         info;
224   NtpTime              timeNext;
225   
226   if (!cstReader) return ORTE_BAD_HANDLE;
227   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
228   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
229   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
230   pthread_rwlock_wrlock(&cstReader->lock);
231   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
232   if ((sp->mode==PULLED) && 
233       (cstReader->objectEntryOID->recvCallBack)) {
234     if (NtpTimeCmp(
235           getActualNtpTime(),
236           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
237       memset(&info,0,sizeof(info));
238       info.status=DEADLINE;
239       info.topic=sp->topic;
240       info.type=sp->typeName;
241       cstReader->objectEntryOID->recvCallBack(&info,
242           cstReader->objectEntryOID->instance,
243           cstReader->objectEntryOID->callBackParam);
244       NtpTimeAdd(timeNext,
245                 (getActualNtpTime()),
246                 sp->deadline);
247       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
248     }
249     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
250   }
251   pthread_rwlock_unlock(&cstReader->lock);
252   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
253   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
254   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
255   return ORTE_OK;
256 }