]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSHeardBeat.c
Migration to new version of OMK system.
[orte.git] / orte / liborte / RTPSHeardBeat.c
1 /*
2  *  $Id: RTPSHeardBeat.c,v 0.0.0.1      2003/10/07 
3  *
4  *  DEBUG:  section 48                  RTPS message HeardBeat
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /**********************************************************************************/
25 int 
26 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
27     SequenceNumber *fsn,SequenceNumber *lsn,
28     ObjectId roid,ObjectId woid,Boolean f_bit) 
29 {
30   CDR_Endianness     data_endian;
31   CORBA_octet        flags;
32
33   if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
34
35   /* submessage id */
36   CDR_put_octet(cdrCodec,HEARTBEAT);
37
38   /* flags */
39   flags=cdrCodec->data_endian;
40   if (f_bit) flags|=2;
41   CDR_put_octet(cdrCodec,flags);
42
43   /* length */
44   CDR_put_ushort(cdrCodec,24);
45
46   /* next data are sent in big endianing */
47   data_endian=cdrCodec->data_endian;
48   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
49
50   /* readerObjectId */
51   CDR_put_ulong(cdrCodec,roid);
52   
53   /* writerObjectId */
54   CDR_put_ulong(cdrCodec,woid);
55
56   cdrCodec->data_endian=data_endian;
57
58   /* firstSeqNumber */
59   CDR_put_ulong(cdrCodec,fsn->high);
60   CDR_put_ulong(cdrCodec,fsn->low);
61
62   /* lastSeqNumber */
63   CDR_put_ulong(cdrCodec,lsn->high);
64   CDR_put_ulong(cdrCodec,lsn->low);
65
66   return 28;
67 }
68
69 /**********************************************************************************/
70 void 
71 HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
72     SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
73   CSTRemoteWriter    *cstRemoteWriter;
74   
75   if (!cstReader) return;
76   cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
77   if (!cstRemoteWriter) return;
78
79   cstRemoteWriter->firstSN=*fsn;
80   cstRemoteWriter->lastSN=*lsn;
81   cstRemoteWriter->ACKRetriesCounter=0;
82
83   if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
84     cstRemoteWriter->sn=*lsn;
85   if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
86     if (SeqNumberCmp(*fsn,noneSN)!=0) {
87       SeqNumberDec(cstRemoteWriter->sn,*fsn);
88     }
89   }
90
91   if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
92     CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
93   } else {
94     CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
95   }
96
97   if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
98     char queue=1;
99     cstRemoteWriter->commStateACK=ACKPENDING;
100     if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
101       queue=2;
102     eventDetach(cstReader->domain,
103         cstRemoteWriter->spobject->objectEntryAID,
104         &cstRemoteWriter->repeatActiveQueryTimer,
105         queue); 
106     eventDetach(cstReader->domain,
107         cstRemoteWriter->spobject->objectEntryAID,
108         &cstRemoteWriter->delayResponceTimer,
109         queue);   //metatraffic timer
110     eventAdd(cstReader->domain,
111         cstRemoteWriter->spobject->objectEntryAID,
112         &cstRemoteWriter->delayResponceTimer,
113         queue,    //metatraffic timer
114         "CSTReaderResponceTimer",
115         CSTReaderResponceTimer,
116         &cstRemoteWriter->cstReader->lock,
117         cstRemoteWriter,
118         &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
119   }
120 }
121
122 /**********************************************************************************/
123 void 
124 RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
125   GUID_RTPS          writerGUID;
126   ObjectId           roid,woid;
127   SequenceNumber     fsn,lsn;
128   CSTReader          *cstReader=NULL;
129   CDR_Endianness     data_endian;
130   CORBA_octet        flags;
131   char               f_bit;
132
133   /* restore flag possition in submessage */
134   cdrCodec->rptr-=3;
135
136   /* flags */
137   CDR_get_octet(cdrCodec,&flags);
138   f_bit=flags & 2;
139
140   /* move reading possition to begin of submessage */
141   cdrCodec->rptr+=2;
142
143   /* next data are sent in big endianing */
144   data_endian=cdrCodec->data_endian;
145   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
146
147   /* readerObjectId */
148   CDR_get_ulong(cdrCodec,&roid);
149   
150   /* writerObjectId */
151   CDR_get_ulong(cdrCodec,&woid);
152
153   cdrCodec->data_endian=data_endian;
154
155   /* firstSeqNumber */
156   CDR_get_ulong(cdrCodec,&fsn.high);
157   CDR_get_ulong(cdrCodec,&fsn.low);
158
159   /* lastSeqNumber */
160   CDR_get_ulong(cdrCodec,&lsn.high);
161   CDR_get_ulong(cdrCodec,&lsn.low);
162
163   if (SeqNumberCmp(fsn,lsn)==1) return;         // lsn<fsn -> break
164   writerGUID.hid=mi->sourceHostId;
165   writerGUID.aid=mi->sourceAppId;
166   writerGUID.oid=woid;
167
168   debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
169                 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
170
171   if ((d->guid.aid & 0x03)==MANAGER) {
172     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
173         ((writerGUID.aid & 0x03)==MANAGER)) {
174       pthread_rwlock_wrlock(&d->readerManagers.lock);
175       cstReader=&d->readerManagers;
176     }
177     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
178          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
179         ((writerGUID.oid==OID_WRITE_APP) &&
180          ((writerGUID.aid & 0x03)==MANAGER))) {
181       pthread_rwlock_wrlock(&d->readerApplications.lock);
182       cstReader=&d->readerApplications;
183     }
184   }
185
186   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
187     switch (writerGUID.oid) {
188       case OID_WRITE_MGR:
189         pthread_rwlock_wrlock(&d->readerManagers.lock);
190         cstReader=&d->readerManagers;
191         break;
192       case OID_WRITE_APP:
193         pthread_rwlock_wrlock(&d->readerApplications.lock);
194         cstReader=&d->readerApplications;
195         break;
196       case OID_WRITE_PUBL:
197         pthread_rwlock_wrlock(&d->readerPublications.lock);
198         cstReader=&d->readerPublications;
199         break;
200       case OID_WRITE_SUBS:
201         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
202         cstReader=&d->readerSubscriptions;
203         break;
204     }
205
206     if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
207       pthread_rwlock_rdlock(&d->subscriptions.lock);
208       gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
209         pthread_rwlock_wrlock(&cstReader->lock);
210         HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
211         pthread_rwlock_unlock(&cstReader->lock);    
212       }
213       pthread_rwlock_unlock(&d->subscriptions.lock);
214       cstReader=NULL;
215     }
216   }  
217
218   HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
219
220   if (cstReader)
221     pthread_rwlock_unlock(&cstReader->lock);
222