]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSHeardBeat.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / RTPSHeardBeat.c
1 /*
2  *  $Id: RTPSHeardBeat.c,v 0.0.0.1      2003/10/07 
3  *
4  *  DEBUG:  section 48                  RTPS message HeardBeat
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int 
36 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
37     SequenceNumber *fsn,SequenceNumber *lsn,
38     ObjectId roid,ObjectId woid,Boolean f_bit) 
39 {
40   CDR_Endianness     data_endian;
41   CORBA_octet        flags;
42
43   if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
44
45   /* submessage id */
46   CDR_put_octet(cdrCodec,HEARTBEAT);
47
48   /* flags */
49   flags=cdrCodec->data_endian;
50   if (f_bit) flags|=2;
51   CDR_put_octet(cdrCodec,flags);
52
53   /* length */
54   CDR_put_ushort(cdrCodec,24);
55
56   /* next data are sent in big endianing */
57   data_endian=cdrCodec->data_endian;
58   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
59
60   /* readerObjectId */
61   CDR_put_ulong(cdrCodec,roid);
62   
63   /* writerObjectId */
64   CDR_put_ulong(cdrCodec,woid);
65
66   cdrCodec->data_endian=data_endian;
67
68   /* firstSeqNumber */
69   CDR_put_ulong(cdrCodec,fsn->high);
70   CDR_put_ulong(cdrCodec,fsn->low);
71
72   /* lastSeqNumber */
73   CDR_put_ulong(cdrCodec,lsn->high);
74   CDR_put_ulong(cdrCodec,lsn->low);
75
76   return 28;
77 }
78
79 /**********************************************************************************/
80 void 
81 HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
82     SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
83   CSTRemoteWriter    *cstRemoteWriter;
84   
85   if (!cstReader) return;
86   cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
87   if (!cstRemoteWriter) return;
88
89   cstRemoteWriter->firstSN=*fsn;
90   cstRemoteWriter->lastSN=*lsn;
91   cstRemoteWriter->ACKRetriesCounter=0;
92
93   if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
94     cstRemoteWriter->sn=*lsn;
95   if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
96     if (SeqNumberCmp(*fsn,noneSN)!=0) {
97       SeqNumberDec(cstRemoteWriter->sn,*fsn);
98     }
99   }
100
101   if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
102     CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
103   } else {
104     CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
105   }
106
107   if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
108     char queue=1;
109     cstRemoteWriter->commStateACK=ACKPENDING;
110     if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
111       queue=2;
112     eventDetach(cstReader->domain,
113         cstRemoteWriter->spobject->objectEntryAID,
114         &cstRemoteWriter->repeatActiveQueryTimer,
115         queue); 
116     eventDetach(cstReader->domain,
117         cstRemoteWriter->spobject->objectEntryAID,
118         &cstRemoteWriter->delayResponceTimer,
119         queue);   //metatraffic timer
120     eventAdd(cstReader->domain,
121         cstRemoteWriter->spobject->objectEntryAID,
122         &cstRemoteWriter->delayResponceTimer,
123         queue,    //metatraffic timer
124         "CSTReaderResponceTimer",
125         CSTReaderResponceTimer,
126         &cstRemoteWriter->cstReader->lock,
127         cstRemoteWriter,
128         &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
129   }
130 }
131
132 /**********************************************************************************/
133 void 
134 RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
135   GUID_RTPS          writerGUID;
136   ObjectId           roid,woid;
137   SequenceNumber     fsn,lsn;
138   CSTReader          *cstReader=NULL;
139   CDR_Endianness     data_endian;
140   CORBA_octet        flags;
141   char               f_bit;
142
143   /* restore flag possition in submessage */
144   cdrCodec->rptr-=3;
145
146   /* flags */
147   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
148   f_bit=flags & 2;
149
150   /* move reading possition to begin of submessage */
151   cdrCodec->rptr+=2;
152
153   /* next data are sent in big endianing */
154   data_endian=cdrCodec->data_endian;
155   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
156
157   /* readerObjectId */
158   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
159   
160   /* writerObjectId */
161   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
162
163   cdrCodec->data_endian=data_endian;
164
165   /* firstSeqNumber */
166   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
167   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
168
169   /* lastSeqNumber */
170   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.high);
171   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.low);
172
173   if (SeqNumberCmp(fsn,lsn)==1) return;         // lsn<fsn -> break
174   writerGUID.hid=mi->sourceHostId;
175   writerGUID.aid=mi->sourceAppId;
176   writerGUID.oid=woid;
177
178   debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
179                 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
180
181   if ((d->guid.aid & 0x03)==MANAGER) {
182     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
183         ((writerGUID.aid & 0x03)==MANAGER)) {
184       pthread_rwlock_wrlock(&d->readerManagers.lock);
185       cstReader=&d->readerManagers;
186     }
187     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
188          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
189         ((writerGUID.oid==OID_WRITE_APP) &&
190          ((writerGUID.aid & 0x03)==MANAGER))) {
191       pthread_rwlock_wrlock(&d->readerApplications.lock);
192       cstReader=&d->readerApplications;
193     }
194   }
195
196   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
197     switch (writerGUID.oid) {
198       case OID_WRITE_MGR:
199         pthread_rwlock_wrlock(&d->readerManagers.lock);
200         cstReader=&d->readerManagers;
201         break;
202       case OID_WRITE_APP:
203         pthread_rwlock_wrlock(&d->readerApplications.lock);
204         cstReader=&d->readerApplications;
205         break;
206       case OID_WRITE_PUBL:
207         pthread_rwlock_wrlock(&d->readerPublications.lock);
208         cstReader=&d->readerPublications;
209         break;
210       case OID_WRITE_SUBS:
211         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
212         cstReader=&d->readerSubscriptions;
213         break;
214     }
215
216     if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
217       pthread_rwlock_rdlock(&d->subscriptions.lock);
218       gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
219         pthread_rwlock_wrlock(&cstReader->lock);
220         HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
221         pthread_rwlock_unlock(&cstReader->lock);    
222       }
223       pthread_rwlock_unlock(&d->subscriptions.lock);
224       cstReader=NULL;
225     }
226   }  
227
228   HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
229
230   if (cstReader)
231     pthread_rwlock_unlock(&cstReader->lock);
232