]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
b373929aed42136a5b11d33d1529d191b862b2b7
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        subscriptions, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTESubscription * 
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
32     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33     void *recvCallBackParam) {
34   GUID_RTPS             guid;
35   CSTReader             *cstReader;
36   CSTReaderParams       cstReaderParams;
37   ORTESubsProp          *sp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43   if (!cstReader) return NULL;
44   pthread_rwlock_rdlock(&d->typeEntry.lock);    
45   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
46     pthread_rwlock_unlock(&d->typeEntry.lock);    
47     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
48             ser./deser. function for a given typeName!!!\n");
49     return NULL;
50   }  
51   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
52   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
53   pthread_rwlock_wrlock(&d->subscriptions.lock);
54   //generate new guid of publisher
55   d->subscriptions.counter++;
56   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
57   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
58   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
59   memcpy(sp,&d->subsPropDefault,sizeof(ORTESubsProp));
60   strcpy(sp->topic,topic);
61   strcpy(sp->typeName,typeName);
62   sp->deadline=*deadline;
63   sp->minimumSeparation=*minimumSeparation;
64   switch (sType) {
65     case BEST_EFFORTS:
66       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
67       break;
68     case STRICT_RELIABLE:
69       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
70       break;
71   }
72   sp->mode=mode;
73   //insert object to structure objectEntry
74   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
75   objectEntryOID->private=ORTE_TRUE;
76   objectEntryOID->instance=instance;
77   objectEntryOID->recvCallBack=recvCallBack;
78   objectEntryOID->callBackParam=recvCallBackParam;
79   //create writerSubscription
80   cstReaderParams.delayResponceTimeMin=zNtpTime;
81   cstReaderParams.delayResponceTimeMax=zNtpTime;
82   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
83   cstReaderParams.repeatActiveQueryTime=iNtpTime;
84   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
85   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
86                 &typeNode->typeRegister);
87   //insert cstWriter to list of subscriberes
88   CSTReader_insert(&d->subscriptions,cstReader);
89   //generate csChange for writerSubscriberes
90   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
91   csChange=(CSChange*)MALLOC(sizeof(CSChange));
92   parameterUpdateCSChangeFromSubscription(csChange,sp);
93   csChange->guid=guid;
94   csChange->alive=ORTE_TRUE;
95   csChange->cdrStream.buffer=NULL;
96   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
97   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
98   pthread_rwlock_unlock(&d->subscriptions.lock);
99   pthread_rwlock_unlock(&d->typeEntry.lock);    
100   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
101   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
102   return cstReader;
103 }
104
105 /*****************************************************************************/
106 int
107 ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
108   CSChange              *csChange;
109   
110   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
111   csChange=(CSChange*)MALLOC(sizeof(CSChange));
112   CSChangeAttributes_init_head(csChange);
113   csChange->guid=cstReader->guid;
114   csChange->alive=ORTE_FALSE;
115   csChange->cdrStream.buffer=NULL;
116   CSTWriterAddCSChange(cstReader->domain,
117                        &cstReader->domain->writerSubscriptions,
118                        csChange);
119   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
120   return 0;
121 }
122
123 /*****************************************************************************/
124 int
125 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
126   int r;
127   if (!cstReader) return -1;
128   //generate csChange for writerSubscriptions
129   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
130   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
131   r=ORTESubscriptionDestroyLocked(cstReader);
132   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
133   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
134   return r;
135 }
136
137
138 /*****************************************************************************/
139 int
140 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
141   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
142   pthread_rwlock_rdlock(&cstReader->lock);
143   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
144   pthread_rwlock_unlock(&cstReader->lock);
145   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
146   return 0;
147 }
148
149 /*****************************************************************************/
150 int
151 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
152   CSChange              *csChange;
153
154   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
155   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
156   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
157   pthread_rwlock_rdlock(&cstReader->lock);
158   csChange=(CSChange*)MALLOC(sizeof(CSChange));
159   parameterUpdateCSChangeFromSubscription(csChange,sp);
160   csChange->guid=cstReader->guid;
161   csChange->alive=ORTE_TRUE;
162   csChange->cdrStream.buffer=NULL;
163   CSTWriterAddCSChange(cstReader->domain,
164       &cstReader->domain->writerSubscriptions,csChange);
165   pthread_rwlock_unlock(&cstReader->lock);
166   pthread_rwlock_unlock(&cstReader->domain->subscriptions.lock);
167   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
168   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
169   return 0;
170 }
171
172 /*****************************************************************************/
173 int
174 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
175     u_int32_t retries,u_int32_t noPublications) {
176   return 0;
177 }
178
179 /*****************************************************************************/
180 int
181 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
182   return 0;
183 }
184
185 /*****************************************************************************/
186 int
187 ORTESubscriptionPull(ORTESubscription *cstReader) {
188   ORTESubsProp         *sp;
189   ORTERecvInfo         info;
190   NtpTime              timeNext;
191   
192   if (!cstReader) return -1;
193   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
194   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
195   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
196   pthread_rwlock_wrlock(&cstReader->lock);
197   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
198   if (sp->mode==PULLED) {
199     if (NtpTimeCmp(
200           getActualNtpTime(),
201           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
202       memset(&info,0,sizeof(info));
203       info.status=DEADLINE;
204       info.topic=sp->topic;
205       info.type=sp->typeName;
206       cstReader->objectEntryOID->recvCallBack(&info,
207           cstReader->objectEntryOID->instance,
208           cstReader->objectEntryOID->callBackParam);
209       NtpTimeAdd(timeNext,
210                 (getActualNtpTime()),
211                 sp->deadline);
212       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
213     }
214     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
215   }
216   pthread_rwlock_unlock(&cstReader->lock);
217   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
218   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
219   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
220   return 0;
221 }