]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEPublication.c
Added prerelease of ORTE-0.2 (Real Time Publisher Subscriber communication protocol...
[orte.git] / orte / liborte / ORTEPublication.c
1 /*
2  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
3  *
4  *  DEBUG:  section 31                  Functions working over publication
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 GAVL_CUST_NODE_INT_IMP(PublicationList, 
25                        PSEntry, ObjectEntryOID, GUID_RTPS,
26                        publications, psNode, guid, gavl_cmp_guid);
27
28 /*****************************************************************************/
29 ORTEPublication * 
30 ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
31     void *instance,NtpTime *persistence,int strength,
32     ORTESendCallBack sendCallBack,void *sendCallBackParam,
33     NtpTime *sendCallBackDelay) {
34   GUID_RTPS             guid;
35   CSTWriter             *cstWriter;
36   CSTWriterParams       cstWriterParams;
37   ORTEPublProp          *pp;
38   ObjectEntryOID        *objectEntryOID;   
39   CSChange              *csChange;
40   TypeNode              *typeNode;
41   
42   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
43   if (!cstWriter) return NULL;
44   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
45   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
46   pthread_rwlock_rdlock(&d->typeEntry.lock);    
47   if (!(typeNode=ORTEType_find(&d->typeEntry,&typeName))) {
48     pthread_rwlock_unlock(&d->typeEntry.lock);    
49     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
50     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
51     printf("before call ORTEPublicationCreate is necessary to register \n\
52             ser./deser. function for a given typeName!!!\n");
53     return NULL;
54   }  
55   pthread_rwlock_wrlock(&d->publications.lock);
56   //generate new guid of publisher
57   d->publications.counter++;
58   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
59   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
60   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
61   memcpy(pp,&d->publPropDefault,sizeof(ORTEPublProp));
62   strcpy(pp->topic,topic);
63   strcpy(pp->typeName,typeName);
64   pp->persistence=*persistence;
65   pp->strength=strength;
66   pp->reliabilityOffered=PID_VALUE_RELIABILITY_BEST_EFFORTS | 
67                          PID_VALUE_RELIABILITY_STRICT;
68   //insert object to structure objectEntry
69   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
70   objectEntryOID->private=ORTE_TRUE;
71   objectEntryOID->instance=instance;
72   objectEntryOID->sendCallBack=sendCallBack;
73   objectEntryOID->callBackParam=sendCallBackParam;
74   if (objectEntryOID->sendCallBack!=NULL) {  
75     if (sendCallBackDelay!=NULL) {
76       objectEntryOID->sendCallBackDelay=*sendCallBackDelay;
77       eventAdd(d,
78           objectEntryOID->objectEntryAID,
79           &objectEntryOID->sendCallBackDelayTimer,
80           0,   
81           "PublicationCallBackTimer",
82           PublicationCallBackTimer,
83           &cstWriter->lock,
84           cstWriter,
85           &objectEntryOID->sendCallBackDelay);               
86     }
87   }
88   //create writerPublication
89   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
90   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
91   cstWriterParams.refreshPeriod=iNtpTime;  //cann't refresh csChange(s)
92   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
93   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
94   cstWriterParams.fullAcknowledge=ORTE_TRUE;
95   CSTWriterInit(d,cstWriter,objectEntryOID,guid.oid,&cstWriterParams,
96                 &typeNode->typeRegister);
97   //insert cstWriter to list of publications
98   CSTWriter_insert(&d->publications,cstWriter);
99   //generate csChange for writerPublisher
100   pthread_rwlock_wrlock(&d->writerPublications.lock);
101   csChange=(CSChange*)MALLOC(sizeof(CSChange));
102   parameterUpdateCSChangeFromPublication(csChange,pp);
103   csChange->guid=guid;
104   csChange->alive=ORTE_TRUE;
105   csChange->cdrStream.buffer=NULL;
106   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
107   pthread_rwlock_unlock(&d->writerPublications.lock);
108   pthread_rwlock_unlock(&d->publications.lock);
109   pthread_rwlock_unlock(&d->typeEntry.lock);    
110   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
111   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
112   return cstWriter;
113 }
114
115 /*****************************************************************************/
116 int
117 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
118   CSChange              *csChange;
119
120   if (!cstWriter) return -1;
121   //generate csChange for writerPublisher
122   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
123   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
124   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
125   if (cstWriter->objectEntryOID->sendCallBack!=NULL) {  
126     eventDetach(cstWriter->domain,
127         cstWriter->objectEntryOID->objectEntryAID,
128         &cstWriter->objectEntryOID->sendCallBackDelayTimer,
129         0);
130   }
131   csChange=(CSChange*)MALLOC(sizeof(CSChange));
132   CSChangeAttributes_init_head(csChange);
133   csChange->cdrStream.buffer=NULL;
134   csChange->guid=cstWriter->guid;
135   csChange->alive=ORTE_FALSE;
136   CSTWriterAddCSChange(cstWriter->domain,
137                        &cstWriter->domain->writerPublications,
138                        csChange);
139   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
140   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
141   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
142   return 0;
143 }
144
145
146 /*****************************************************************************/
147 int
148 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
149   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
150   pthread_rwlock_rdlock(&cstWriter->lock);
151   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
152   pthread_rwlock_unlock(&cstWriter->lock);
153   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
154   return 0;
155 }
156
157 /*****************************************************************************/
158 int
159 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
160   CSChange              *csChange;
161
162   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
163   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
164   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
165   pthread_rwlock_rdlock(&cstWriter->lock);
166   csChange=(CSChange*)MALLOC(sizeof(CSChange));
167   parameterUpdateCSChangeFromPublication(csChange,pp);
168   csChange->guid=cstWriter->guid;
169   csChange->alive=ORTE_TRUE;
170   csChange->cdrStream.buffer=NULL;
171   CSTWriterAddCSChange(cstWriter->domain,
172       &cstWriter->domain->writerPublications,csChange);
173   pthread_rwlock_unlock(&cstWriter->lock);
174   pthread_rwlock_unlock(&cstWriter->domain->publications.lock);
175   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
176   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
177   return 0;
178 }
179
180 /*****************************************************************************/
181 int
182 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
183     unsigned int retries,unsigned int noSubscriptions) {
184   return 0;
185 }
186
187 /*****************************************************************************/
188 int
189 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
190   return 0;
191 }
192
193 /*****************************************************************************/
194 int
195 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
196   ORTEPublProp          *pp;
197   
198   if (!cstWriter) return -1;
199   pthread_rwlock_wrlock(&cstWriter->lock);
200   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
201   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
202     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
203       NtpTime             expire,atime=getActualNtpTime();
204       struct timespec     wtime; 
205       //Count max block time
206       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
207       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
208       wtime.tv_nsec*=1000;  //conver to nano seconds
209       pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
210       pthread_rwlock_unlock(&cstWriter->lock);
211       pthread_mutex_timedlock(
212           &cstWriter->mutexCSChangeDestroyed,
213           &wtime);
214       pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
215       pthread_rwlock_wrlock(&cstWriter->lock);    
216       pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
217       if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
218         debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
219                       cstWriter->csChangesCounter);
220         pthread_rwlock_unlock(&cstWriter->lock);
221         return -2;
222       }
223     }
224   }
225   pthread_rwlock_unlock(&cstWriter->lock);
226   return 0;
227 }
228
229 /*****************************************************************************/
230 int
231 ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
232   CSChange              *csChange;
233   SequenceNumber        snNext;
234   
235   if (!cstWriter) return -1;
236   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
237   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
238   if (!CSTRemoteReader_is_empty(cstWriter)) {
239     csChange=(CSChange*)MALLOC(sizeof(CSChange));
240     CSChangeAttributes_init_head(csChange);
241     csChange->guid=cstWriter->guid;
242     csChange->alive=ORTE_FALSE;
243     csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
244                               +20+cstWriter->typeRegister->getMaxSize;
245     csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
246     csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
247     SeqNumberInc(snNext,cstWriter->lastSN);
248     RTPSHeaderCreate(csChange->cdrStream.buffer,
249                     cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
250     RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
251                     12,getActualNtpTime());
252     RTPSIssueCreateHeader(csChange->cdrStream.buffer+
253                     RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
254                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
255     //serialization routine
256     if (cstWriter->typeRegister->serialize) {
257       cstWriter->typeRegister->serialize(
258           &csChange->cdrStream,
259           cstWriter->objectEntryOID->instance);
260     } else {
261       //no deserialization -> memcpy
262       memcpy(csChange->cdrStream.bufferPtr,
263             cstWriter->objectEntryOID->instance,
264             cstWriter->typeRegister->getMaxSize);
265       csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
266     }
267     csChange->cdrStream.needByteSwap=ORTE_FALSE;
268     CSTWriterAddCSChange(cstWriter->domain,
269                         cstWriter,
270                         csChange);
271   }
272   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
273   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
274   return 0;
275 }
276
277 /*****************************************************************************/
278 int
279 ORTEPublicationSend(ORTEPublication *cstWriter) {
280   int             r;
281
282   if (!cstWriter) return -1;
283   //PrepareSendingQueue
284   if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
285   //send
286   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
287   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
288   pthread_rwlock_wrlock(&cstWriter->lock);
289   r=ORTEPublicationSendLocked(cstWriter);
290   pthread_rwlock_unlock(&cstWriter->lock);
291   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
292   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
293   return r;
294 }