]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/plr/server/src/redundancy/dmr.cc
update
[l4.git] / l4 / pkg / plr / server / src / redundancy / dmr.cc
1 #include "../log"
2 #include "../redundancy.h"
3 #include "../app_loading"
4 #include "../fault_observers"
5
6 #define MSG() DEBUGf(Romain::Log::Redundancy)
7 #define MSGi(inst) MSG() << "[" << (inst)->id() << "] "
8
9 /* Replication protocol:
10  * =====================
11  *
12  * Everyone goes to sleep, except the last thread to enter. This thread becomes
13  * the 'leader'. The leader returns from this function with the First_syscall return
14  * value. It then goes on to execute the system call (in manager.cc). Depending on
15  * its return value,
16  *
17  * a) For replicatable calls: it stores its VCPU state after the system call using
18  *    the function put(). All other replicas then use get() to obtain this state.
19  *
20  * b) For non-replicatable calls: it sets the other replicas' return value to 
21  *    Repeat_syscall. The replicas then perform handling themselves.
22  *
23  * After all the handling, everyone waits in resume() until the last replica reaches
24  * the resumption point. Then each VCPU goes back to where it came from.
25  *
26  *
27  * Detection and recovery:
28  * =======================
29  *
30  * Before executing the fault handler, the leader checksums all VCPU states. If a
31  * mismatch is found, it calls the recover() function. recover() sets things straight
32  * so that after the handler is done, everyone is in an identical state again. The leader
33  * then goes on to execute the call.
34  */
35
36 Romain::DMR::DMR(unsigned instances)
37         : _leave_count(0), _enter_count(0), _block_count(0),
38       _rv(Romain::RedundancyCallback::Invalid),
39       _num_instances(instances), _num_instances_bak(0)
40 {
41         for (unsigned i = 0; i < _num_instances; ++i)
42                 _orig_vcpu[i] = 0;
43         _check(pthread_mutex_init(&_enter_mtx, NULL) != 0, "error initializing mtx");
44         _check(pthread_cond_init(&_enter, NULL) != 0,      "error initializing condvar");
45         _check(pthread_mutex_init(&_leave_mtx, NULL) != 0, "error initializing mtx");
46         _check(pthread_cond_init(&_leave, NULL) != 0,      "error initializing condvar");
47         _check(pthread_mutex_init(&_block_mtx, NULL) != 0, "error initializing mtx");
48         _check(pthread_cond_init(&_block, NULL) != 0,      "error initializing condvar");
49 }
50
51
52 void
53 Romain::Replicator::put(Romain::App_thread *t)
54 {
55         //memset(&_regs, 0, sizeof(_regs)); // XXX
56 #define PUT(field) _regs.field = t->vcpu()->r()->field
57         PUT(es); PUT(ds); PUT(gs); PUT(fs);
58         PUT(di); PUT(si); PUT(bp); PUT(pfa);
59         PUT(ax); PUT(bx); PUT(cx); PUT(dx);
60         PUT(trapno); PUT(err); PUT(ip); PUT(flags);
61         PUT(sp); PUT(ss);
62 #undef PUT
63         l4_utcb_t *addr = reinterpret_cast<l4_utcb_t*>(t->remote_utcb()); 
64         memcpy(&_utcb, addr, L4_UTCB_OFFSET);
65 }
66
67
68 void
69 Romain::Replicator::get(Romain::App_thread *t)
70 {
71 #define PUT(field) t->vcpu()->r()->field = _regs.field
72         PUT(es); PUT(ds); PUT(gs); PUT(fs);
73         PUT(di); PUT(si); PUT(bp); PUT(pfa);
74         PUT(ax); PUT(bx); PUT(cx); PUT(dx);
75         PUT(trapno); PUT(err); PUT(ip); PUT(flags);
76         PUT(sp); PUT(ss);
77 #undef PUT
78         l4_utcb_t *addr = reinterpret_cast<l4_utcb_t*>(t->remote_utcb()); 
79         memcpy(addr, &_utcb, L4_UTCB_OFFSET);
80 }
81
82 bool
83 Romain::DMR::checksum_replicas()
84 {
85         unsigned long csums[MAX_REPLICAS] = {0, };
86         unsigned idx;
87
88         // calc checksums
89         for (idx = 0; idx < _num_instances; ++idx)
90                 csums[idx] = _orig_vcpu[idx]->csum_state();
91
92         // validate checksums
93         for (idx = 1; idx < _num_instances; ++idx)
94                 if (csums[idx] != csums[idx-1]) {
95 #if 1
96                         ERROR() << "State mismatch detected!";
97                         ERROR() << "=== vCPU states ===";
98                         for (unsigned cnt = 0; cnt < _num_instances; ++cnt) {
99                                 ERROR() << "--- instance " << cnt << " @ "
100                                         << _orig_vcpu[cnt]->vcpu() << " (cs: "
101                                         << std::hex << csums[cnt] << ") ---";
102                                 if (_orig_vcpu[cnt])
103                                         _orig_vcpu[cnt]->vcpu()->print_state();
104                         }
105                         //enter_kdebug("checksum");
106 #endif
107                         return false;
108                 }
109
110         return true;
111 }
112
113
114 class RecoverAbort
115 {
116         public:
117                 static __attribute__((noreturn)) void recover()
118                 {
119                         ERROR() << "Aborting after error.";
120                         throw("ERROR -> abort");
121                 }
122 };
123
124
125 class RedundancyAbort
126 {
127         public:
128                 static void recover(Romain::App_thread** threads, unsigned count,
129                                                         unsigned *good, unsigned *bad)
130                 {
131                         unsigned long csums[count];
132                         unsigned idx;
133
134                         // calc checksums
135                         for (idx = 0; idx < count; ++idx)
136                                 csums[idx] = threads[idx]->csum_state();
137
138                         // find mismatch
139                         for (idx = 1; idx < count; ++idx)
140                                 if (csums[idx] != csums[idx-1]) { // mismatch
141                                         if (csums[idx] == csums[(idx + 1) % count]) {
142                                                 *good = idx;
143                                                 *bad  = idx-1;
144                                         } else {
145                                                 *good = idx-1;
146                                                 *bad  = idx;
147                                         }
148                                 }
149                 }
150 };
151
152
153 void
154 Romain::DMR::recover(Romain::App_model *am)
155 {
156         if (_num_instances < 3)
157                 RecoverAbort::recover(); // noreturn
158
159         unsigned good = ~0, bad = ~0;
160         RedundancyAbort::recover(_orig_vcpu, _num_instances, &good, &bad);
161         DEBUG() << "good " << good << ", bad " << bad;
162
163         // XXX: This does not suffice. We also need to copy memory content
164         //      from a correct replica to the incorrect one
165         replicator().put(_orig_vcpu[good]);
166         replicator().get(_orig_vcpu[bad]);
167         am->rm()->replicate(good, bad);
168
169         DEBUG() << "after recovery:";
170         for (unsigned i = 0; i < _num_instances; ++i)
171                 DEBUG() << i << " " << std::hex << _orig_vcpu[i]->csum_state();
172 }
173
174
175 Romain::RedundancyCallback::EnterReturnVal
176 Romain::DMR::enter(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
177 {
178         (void)a;
179         MSGi(i) << "DMR::enter act(" << _enter_count << ")";
180
181         Romain::RedundancyCallback::EnterReturnVal ret = Romain::RedundancyCallback::First_syscall;
182
183         // enter ourselves into the list of faulted threads
184         _orig_vcpu[i->id()] = t;
185
186         pthread_mutex_lock(&_enter_mtx);
187
188         if (++_enter_count < _num_instances) {
189                 //MSGi(i) << "I'm not the last instance -> going to wait.";
190                 // wait for the leader
191                 pthread_cond_wait(&_enter, &_enter_mtx);
192                 // get the return value set by the leader
193                 ret = _rv;
194         } else {
195                 // everyone is here, so checksum the VCPUs now
196                 if (!checksum_replicas())
197                         recover(a);
198                 // at this point, recovery has made sure that all replicas
199                 // are in the same state.
200         }
201
202         --_enter_count;
203
204         pthread_mutex_unlock(&_enter_mtx);
205
206         /*
207          * If the leader told us to skip the syscall, get replicated VCPU and
208          * UTCB states here.
209          */
210         if (ret == Romain::RedundancyCallback::Skip_syscall) {
211                 replicator().get(t);
212         }
213
214         return ret;
215 }
216
217
218 void Romain::DMR::leader_repeat(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
219 {
220         (void)i; (void)t; (void)a;
221         MSGi(i) << __func__;
222         _rv = Romain::RedundancyCallback::Repeat_syscall;
223 }
224
225
226 void Romain::DMR::leader_replicate(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
227 {
228         (void)i; (void)t; (void)a;
229         MSGi(i) << __func__;
230         _rv = Romain::RedundancyCallback::Skip_syscall;
231
232         //t->vcpu()->print_state();
233         replicator().put(t);
234 }
235
236
237 void Romain::DMR::resume(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
238 {
239         (void)i; (void)t; (void)a;
240         //MSGi(i) << "[l] acquiring leave mtx";
241         pthread_mutex_lock(&_leave_mtx);
242         if (_leave_count == 0) {
243                 pthread_mutex_lock(&_enter_mtx);
244                 pthread_cond_broadcast(&_enter);
245                 pthread_mutex_unlock(&_enter_mtx);
246         }
247
248         //MSGi(i) << "++_leave_count " << _leave_count;
249         if (++_leave_count < _num_instances) {
250                 MSGi(i) << "Waiting for other replicas to commit their syscall.";
251                 //MSGi(i) << "cond_wait(leave)";
252                 pthread_cond_wait(&_leave, &_leave_mtx);
253                 //MSGi(i) << "success: cond_wait(leave)";
254         } else {
255                 for (unsigned i = 0; i < _num_instances; ++i)
256                         _orig_vcpu[i] = 0;
257                 pthread_cond_broadcast(&_leave);
258         }
259         //MSGi(i) << "counts @ resume: " << _enter_count << " " << _leave_count;
260         --_leave_count;
261         pthread_mutex_unlock(&_leave_mtx);
262
263         //enter_kdebug("DMR::resume");
264 }
265
266 void Romain::DMR::wait(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
267 {
268         MSGi(i) << __func__;
269         pthread_mutex_lock(&_block_mtx);
270         ++_block_count;
271         MSGi(i) << "going to wait. block_count: " << _block_count;
272         pthread_cond_broadcast(&_enter);
273         pthread_cond_wait(&_block, &_block_mtx);
274         pthread_mutex_unlock(&_block_mtx);
275 }
276
277 void Romain::DMR::silence(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
278 {
279         MSGi(i) << __func__;
280         // 1. Tell anyone who is still waiting to enter that he can now do so.
281         //    These replicas will all run until they block on _block_mtx.
282         pthread_cond_broadcast(&_enter);
283
284         while (_block_count < (_num_instances - 1))
285                 l4_sleep(20); // XXX handshake
286
287         _num_instances_bak = _num_instances;
288         _num_instances     = 1;
289 }
290
291 void Romain::DMR::wakeup(Romain::App_instance *i, Romain::App_thread *t, Romain::App_model *a)
292 {
293         MSGi(i) << __func__;
294         _block_count   = 0;
295         _num_instances = _num_instances_bak;
296         pthread_cond_broadcast(&_block);
297 }