]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSHeardBeat.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSHeardBeat.c
1 /*
2  *  $Id: RTPSHeardBeat.c,v 0.0.0.1      2003/10/07
3  *
4  *  DEBUG:  section 48                  RTPS message HeardBeat
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
37                     SequenceNumber *fsn, SequenceNumber *lsn,
38                     ObjectId roid, ObjectId woid, Boolean f_bit)
39 {
40   CDR_Endianness     data_endian;
41   CORBA_octet        flags;
42
43   if (cdrCodec->buf_len < cdrCodec->wptr+28)
44     return -1;
45
46   /* submessage id */
47   CDR_put_octet(cdrCodec, HEARTBEAT);
48
49   /* flags */
50   flags = cdrCodec->data_endian;
51   if (f_bit)
52     flags |= 2;
53   CDR_put_octet(cdrCodec, flags);
54
55   /* length */
56   CDR_put_ushort(cdrCodec, 24);
57
58   /* next data are sent in big endianing */
59   data_endian = cdrCodec->data_endian;
60   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
61
62   /* readerObjectId */
63   CDR_put_ulong(cdrCodec, roid);
64
65   /* writerObjectId */
66   CDR_put_ulong(cdrCodec, woid);
67
68   cdrCodec->data_endian = data_endian;
69
70   /* firstSeqNumber */
71   CDR_put_ulong(cdrCodec, fsn->high);
72   CDR_put_ulong(cdrCodec, fsn->low);
73
74   /* lastSeqNumber */
75   CDR_put_ulong(cdrCodec, lsn->high);
76   CDR_put_ulong(cdrCodec, lsn->low);
77
78   return 28;
79 }
80
81 /**********************************************************************************/
82 void
83 HeartBeatProc(CSTReader *cstReader, GUID_RTPS *writerGUID,
84               SequenceNumber *fsn, SequenceNumber *lsn, char f_bit)
85 {
86   CSTRemoteWriter    *cstRemoteWriter;
87
88   if (!cstReader)
89     return;
90   cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
91   if (!cstRemoteWriter)
92     return;
93
94   cstRemoteWriter->firstSN = *fsn;
95   cstRemoteWriter->lastSN = *lsn;
96   cstRemoteWriter->ACKRetriesCounter = 0;
97
98   if (SeqNumberCmp(cstRemoteWriter->sn, *lsn) > 0)
99     cstRemoteWriter->sn = *lsn;
100   if (SeqNumberCmp(cstRemoteWriter->sn, *fsn) < 0) {
101     if (SeqNumberCmp(*fsn, noneSN) != 0) {
102       SeqNumberDec(cstRemoteWriter->sn, *fsn);
103     }
104   }
105
106   if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
107     CSTReaderProcCSChangesIssue(cstRemoteWriter, ORTE_FALSE);
108   } else {
109     CSTReaderProcCSChanges(cstReader->domain, cstRemoteWriter);
110   }
111
112   if ((!f_bit) && (cstRemoteWriter->commStateACK == WAITING)) {
113     char queue = 1;
114     cstRemoteWriter->commStateACK = ACKPENDING;
115     if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
116       queue = 2;
117     eventDetach(cstReader->domain,
118                 cstRemoteWriter->spobject->objectEntryAID,
119                 &cstRemoteWriter->repeatActiveQueryTimer,
120                 queue);
121     eventDetach(cstReader->domain,
122                 cstRemoteWriter->spobject->objectEntryAID,
123                 &cstRemoteWriter->delayResponceTimer,
124                 queue); //metatraffic timer
125     eventAdd(cstReader->domain,
126              cstRemoteWriter->spobject->objectEntryAID,
127              &cstRemoteWriter->delayResponceTimer,
128              queue, //metatraffic timer
129              "CSTReaderResponceTimer",
130              CSTReaderResponceTimer,
131              &cstRemoteWriter->cstReader->lock,
132              cstRemoteWriter,
133              &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
134   }
135 }
136
137 /**********************************************************************************/
138 void
139 RTPSHeartBeat(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi)
140 {
141   GUID_RTPS          writerGUID;
142   ObjectId           roid, woid;
143   SequenceNumber     fsn, lsn;
144   CSTReader          *cstReader = NULL;
145   CDR_Endianness     data_endian;
146   CORBA_octet        flags;
147   char               f_bit;
148
149   /* restore flag possition in submessage */
150   cdrCodec->rptr -= 3;
151
152   /* flags */
153   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
154   f_bit = flags & 2;
155
156   /* move reading possition to begin of submessage */
157   cdrCodec->rptr += 2;
158
159   /* next data are sent in big endianing */
160   data_endian = cdrCodec->data_endian;
161   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
162
163   /* readerObjectId */
164   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
165
166   /* writerObjectId */
167   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
168
169   cdrCodec->data_endian = data_endian;
170
171   /* firstSeqNumber */
172   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
173   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
174
175   /* lastSeqNumber */
176   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.high);
177   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.low);
178
179   if (SeqNumberCmp(fsn, lsn) == 1)
180     return;                                     // lsn<fsn -> break
181   writerGUID.hid = mi->sourceHostId;
182   writerGUID.aid = mi->sourceAppId;
183   writerGUID.oid = woid;
184
185   debug(48, 3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
186                 f_bit ? 'F' : 'f', woid, mi->sourceHostId, mi->sourceAppId);
187
188   if ((d->guid.aid & 0x03) == MANAGER) {
189     if ((writerGUID.oid == OID_WRITE_APPSELF) &&
190         ((writerGUID.aid & 0x03) == MANAGER)) {
191       pthread_rwlock_wrlock(&d->readerManagers.lock);
192       cstReader = &d->readerManagers;
193     }
194     if (((writerGUID.oid == OID_WRITE_APPSELF) &&
195          ((writerGUID.aid & 0x03) == MANAGEDAPPLICATION)) ||
196         ((writerGUID.oid == OID_WRITE_APP) &&
197          ((writerGUID.aid & 0x03) == MANAGER))) {
198       pthread_rwlock_wrlock(&d->readerApplications.lock);
199       cstReader = &d->readerApplications;
200     }
201   }
202
203   if ((d->guid.aid & 3) == MANAGEDAPPLICATION) {
204     switch (writerGUID.oid) {
205       case OID_WRITE_MGR:
206         pthread_rwlock_wrlock(&d->readerManagers.lock);
207         cstReader = &d->readerManagers;
208         break;
209       case OID_WRITE_APP:
210         pthread_rwlock_wrlock(&d->readerApplications.lock);
211         cstReader = &d->readerApplications;
212         break;
213       case OID_WRITE_PUBL:
214         pthread_rwlock_wrlock(&d->readerPublications.lock);
215         cstReader = &d->readerPublications;
216         break;
217       case OID_WRITE_SUBS:
218         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
219         cstReader = &d->readerSubscriptions;
220         break;
221     }
222
223     if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
224       pthread_rwlock_rdlock(&d->subscriptions.lock);
225       gavl_cust_for_each(CSTReader, &d->subscriptions, cstReader) {
226         pthread_rwlock_wrlock(&cstReader->lock);
227         HeartBeatProc(cstReader, &writerGUID, &fsn, &lsn, f_bit);
228         pthread_rwlock_unlock(&cstReader->lock);
229       }
230       pthread_rwlock_unlock(&d->subscriptions.lock);
231       cstReader = NULL;
232     }
233   }
234
235   HeartBeatProc(cstReader, &writerGUID, &fsn, &lsn, f_bit);
236
237   if (cstReader)
238     pthread_rwlock_unlock(&cstReader->lock);
239 }