]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTESubscription.c
Added prerelease of ORTE-0.2 (Real Time Publisher Subscriber communication protocol...
[orte.git] / orte / liborte / ORTESubscription.c
1 /*
2  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
3  *
4  *  DEBUG:  section 33                  Functions working over subscription
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        subscriptions, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTESubscription * 
30 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
31     char *topic,char *typeName,void *instance,NtpTime *deadline,
32     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
33     void *recvCallBackParam) {
34   GUID_RTPS             guid;
35   CSTReader             *cstReader;
36   CSTReaderParams       cstReaderParams;
37   ORTESubsProp          *sp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   cstReader=(CSTReader*)MALLOC(sizeof(CSTReader));
43   if (!cstReader) return NULL;
44   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46   pthread_rwlock_rdlock(&d->typeEntry.lock);    
47   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48     pthread_rwlock_unlock(&d->typeEntry.lock);    
49     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
52             ser./deser. function for a given typeName!!!\n");
53     return NULL;
54   }  
55   pthread_rwlock_wrlock(&d->subscriptions.lock);
56   //generate new guid of publisher
57   d->subscriptions.counter++;
58   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
60   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
61   memcpy(sp,&d->subsPropDefault,sizeof(ORTESubsProp));
62   strcpy(sp->topic,topic);
63   strcpy(sp->typeName,typeName);
64   sp->deadline=*deadline;
65   sp->minimumSeparation=*minimumSeparation;
66   switch (sType) {
67     case BEST_EFFORTS:
68       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
69       break;
70     case STRICT_RELIABLE:
71       sp->reliabilityRequested=PID_VALUE_RELIABILITY_STRICT;
72       break;
73   }
74   sp->mode=mode;
75   //insert object to structure objectEntry
76   objectEntryOID=objectEntryAdd(d,&guid,(void*)sp);
77   objectEntryOID->private=ORTE_TRUE;
78   objectEntryOID->instance=instance;
79   objectEntryOID->recvCallBack=recvCallBack;
80   objectEntryOID->callBackParam=recvCallBackParam;
81   //create writerSubscription
82   cstReaderParams.delayResponceTimeMin=zNtpTime;
83   cstReaderParams.delayResponceTimeMax=zNtpTime;
84   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
85   cstReaderParams.repeatActiveQueryTime=iNtpTime;
86   cstReaderParams.fullAcknowledge=ORTE_FALSE;      
87   CSTReaderInit(d,cstReader,objectEntryOID,guid.oid,&cstReaderParams,
88                 &typeNode->typeRegister);
89   //insert cstWriter to list of subscriberes
90   CSTReader_insert(&d->subscriptions,cstReader);
91   //generate csChange for writerSubscriberes
92   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
93   csChange=(CSChange*)MALLOC(sizeof(CSChange));
94   parameterUpdateCSChangeFromSubscription(csChange,sp);
95   csChange->guid=guid;
96   csChange->alive=ORTE_TRUE;
97   csChange->cdrStream.buffer=NULL;
98   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
99   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
100   pthread_rwlock_unlock(&d->subscriptions.lock);
101   pthread_rwlock_unlock(&d->typeEntry.lock);    
102   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
103   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
104   return cstReader;
105 }
106
107 /*****************************************************************************/
108 int
109 ORTESubscriptionDestroy(ORTESubscription *cstReader) {
110   CSChange              *csChange;
111
112   if (!cstReader) return -1;
113   //generate csChange for writerSubscriptions
114   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
115   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
116   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
117   csChange=(CSChange*)MALLOC(sizeof(CSChange));
118   CSChangeAttributes_init_head(csChange);
119   csChange->guid=cstReader->guid;
120   csChange->alive=ORTE_FALSE;
121   csChange->cdrStream.buffer=NULL;
122   CSTWriterAddCSChange(cstReader->domain,
123                        &cstReader->domain->writerSubscriptions,
124                        csChange);
125   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
126   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
127   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
128   return 0;
129 }
130
131
132 /*****************************************************************************/
133 int
134 ORTESubscriptionPropertiesGet(ORTESubscription *cstReader,ORTESubsProp *sp) {
135   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
136   pthread_rwlock_rdlock(&cstReader->lock);
137   *sp=*(ORTESubsProp*)cstReader->objectEntryOID->attributes;
138   pthread_rwlock_unlock(&cstReader->lock);
139   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
140   return 0;
141 }
142
143 /*****************************************************************************/
144 int
145 ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
146   CSChange              *csChange;
147
148   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
149   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
150   pthread_rwlock_wrlock(&cstReader->domain->writerSubscriptions.lock);
151   pthread_rwlock_rdlock(&cstReader->lock);
152   csChange=(CSChange*)MALLOC(sizeof(CSChange));
153   parameterUpdateCSChangeFromSubscription(csChange,sp);
154   csChange->guid=cstReader->guid;
155   csChange->alive=ORTE_TRUE;
156   csChange->cdrStream.buffer=NULL;
157   CSTWriterAddCSChange(cstReader->domain,
158       &cstReader->domain->writerSubscriptions,csChange);
159   pthread_rwlock_unlock(&cstReader->lock);
160   pthread_rwlock_unlock(&cstReader->domain->subscriptions.lock);
161   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
162   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
163   return 0;
164 }
165
166 /*****************************************************************************/
167 int
168 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
169     u_int32_t retries,u_int32_t noPublications) {
170   return 0;
171 }
172
173 /*****************************************************************************/
174 int
175 ORTESubscriptionGetStatus(ORTESubscription *cstReader,ORTESubsStatus *status) {
176   return 0;
177 }
178
179 /*****************************************************************************/
180 int
181 ORTESubscriptionPull(ORTESubscription *cstReader) {
182   ORTESubsProp         *sp;
183   ORTERecvInfo         info;
184   NtpTime              timeNext;
185   
186   if (!cstReader) return -1;
187   pthread_rwlock_rdlock(&cstReader->domain->objectEntry.objRootLock);
188   pthread_rwlock_wrlock(&cstReader->domain->objectEntry.htimRootLock);
189   pthread_rwlock_rdlock(&cstReader->domain->writerSubscriptions.lock);
190   pthread_rwlock_wrlock(&cstReader->lock);
191   sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
192   if (sp->mode==PULLED) {
193     if (NtpTimeCmp(
194           getActualNtpTime(),
195           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
196       memset(&info,0,sizeof(info));
197       info.status=DEADLINE;
198       info.topic=sp->topic;
199       info.type=sp->typeName;
200       cstReader->objectEntryOID->recvCallBack(&info,
201           cstReader->objectEntryOID->instance,
202           cstReader->objectEntryOID->callBackParam);
203       NtpTimeAdd(timeNext,
204                 (getActualNtpTime()),
205                 sp->deadline);
206       htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
207     }
208     CSTReaderProcCSChangesIssue(cstReader->cstRemoteWriterSubscribed,ORTE_TRUE);
209   }
210   pthread_rwlock_unlock(&cstReader->lock);
211   pthread_rwlock_unlock(&cstReader->domain->writerSubscriptions.lock);
212   pthread_rwlock_unlock(&cstReader->domain->objectEntry.htimRootLock);
213   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
214   return 0;
215 }