2 This file is part of Valgrind, a dynamic binary instrumentation
5 Copyright (C) 2008-2008 Google Inc
8 This program is free software; you can redistribute it and/or
9 modify it under the terms of the GNU General Public License as
10 published by the Free Software Foundation; either version 2 of the
11 License, or (at your option) any later version.
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
23 The GNU General Public License is contained in the file COPYING.
26 // Author: Konstantin Serebryany <opensource@google.com>
28 // Here we define few simple classes that wrap pthread primitives.
30 // We need this to create unit tests for helgrind (or similar tool)
31 // that will work with different threading frameworks.
33 // If one needs to test helgrind's support for another threading library,
34 // he/she can create a copy of this file and replace pthread_ calls
35 // with appropriate calls to his/her library.
37 // Note, that some of the methods defined here are annotated with
38 // ANNOTATE_* macros defined in dynamic_annotations.h.
40 // DISCLAIMER: the classes defined in this header file
41 // are NOT intended for general use -- only for unit tests.
44 #ifndef THREAD_WRAPPERS_PTHREAD_H
45 #define THREAD_WRAPPERS_PTHREAD_H
48 #include <semaphore.h>
52 #include <limits.h> // INT_MAX
55 #include <libkern/OSAtomic.h>
66 #include "../../drd/drd.h"
67 #define ANNOTATE_NO_OP(arg) do { } while(0)
68 #define ANNOTATE_EXPECT_RACE(addr, descr) \
69 ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race")
70 static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; }
74 # error "Pleeease, do not define NDEBUG"
78 /// Set this to true if malloc() uses mutex on your platform as this may
79 /// introduce a happens-before arc for a pure happens-before race detector.
80 const bool kMallocUsesMutex = false;
82 /// Current time in milliseconds.
83 static inline int64_t GetCurrentTimeMillis() {
85 gettimeofday(&now, NULL);
86 return now.tv_sec * 1000 + now.tv_usec / 1000;
89 /// Copy tv to ts adding offset in milliseconds.
90 static inline void timeval2timespec(timeval *const tv,
92 int64_t offset_milli) {
93 const int64_t ten_9 = 1000000000LL;
94 const int64_t ten_6 = 1000000LL;
95 const int64_t ten_3 = 1000LL;
96 int64_t now_nsec = (int64_t)tv->tv_sec * ten_9;
97 now_nsec += (int64_t)tv->tv_usec * ten_3;
98 int64_t then_nsec = now_nsec + offset_milli * ten_6;
99 ts->tv_sec = then_nsec / ten_9;
100 ts->tv_nsec = then_nsec % ten_9;
107 /// helgrind does not (yet) support spin locks, so we annotate them.
113 CHECK(0 == pthread_spin_init(&mu_, 0));
114 ANNOTATE_RWLOCK_CREATE((void*)&mu_);
117 ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
118 CHECK(0 == pthread_spin_destroy(&mu_));
121 CHECK(0 == pthread_spin_lock(&mu_));
122 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
125 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
126 CHECK(0 == pthread_spin_unlock(&mu_));
129 pthread_spinlock_t mu_;
137 SpinLock() : mu_(OS_SPINLOCK_INIT) {
138 ANNOTATE_RWLOCK_CREATE((void*)&mu_);
141 ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
144 OSSpinLockLock(&mu_);
145 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
148 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
149 OSSpinLockUnlock(&mu_);
156 #endif // NO_SPINLOCK
158 /// Just a boolean condition. Used by Mutex::LockWhen and similar.
161 typedef bool (*func_t)(void*);
163 template <typename T>
164 Condition(bool (*func)(T*), T* arg)
165 : func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
167 Condition(bool (*func)())
168 : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
170 bool Eval() { return func_(arg_); }
178 /// Wrapper for pthread_mutex_t.
180 /// pthread_mutex_t is *not* a reader-writer lock,
181 /// so the methods like ReaderLock() aren't really reader locks.
182 /// We can not use pthread_rwlock_t because it
183 /// does not work with pthread_cond_t.
185 /// TODO: We still need to test reader locks with this class.
186 /// Implement a mode where pthread_rwlock_t will be used
187 /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen).
190 friend class CondVar;
193 CHECK(0 == pthread_mutex_init(&mu_, NULL));
194 CHECK(0 == pthread_cond_init(&cv_, NULL));
195 signal_at_unlock_ = true; // Always signal at Unlock to make
196 // Mutex more friendly to hybrid detectors.
199 CHECK(0 == pthread_cond_destroy(&cv_));
200 CHECK(0 == pthread_mutex_destroy(&mu_));
202 void Lock() { CHECK(0 == pthread_mutex_lock(&mu_));}
203 bool TryLock() { return (0 == pthread_mutex_trylock(&mu_));}
205 if (signal_at_unlock_) {
206 CHECK(0 == pthread_cond_signal(&cv_));
208 CHECK(0 == pthread_mutex_unlock(&mu_));
210 void ReaderLock() { Lock(); }
211 bool ReaderTryLock() { return TryLock();}
212 void ReaderUnlock() { Unlock(); }
214 void LockWhen(Condition cond) { Lock(); WaitLoop(cond); }
215 void ReaderLockWhen(Condition cond) { Lock(); WaitLoop(cond); }
216 void Await(Condition cond) { WaitLoop(cond); }
218 bool ReaderLockWhenWithTimeout(Condition cond, int millis)
219 { Lock(); return WaitLoopWithTimeout(cond, millis); }
220 bool LockWhenWithTimeout(Condition cond, int millis)
221 { Lock(); return WaitLoopWithTimeout(cond, millis); }
222 bool AwaitWithTimeout(Condition cond, int millis)
223 { return WaitLoopWithTimeout(cond, millis); }
227 void WaitLoop(Condition cond) {
228 signal_at_unlock_ = true;
229 while(cond.Eval() == false) {
230 pthread_cond_wait(&cv_, &mu_);
232 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
235 bool WaitLoopWithTimeout(Condition cond, int millis) {
237 struct timespec timeout;
239 gettimeofday(&now, NULL);
240 timeval2timespec(&now, &timeout, millis);
242 signal_at_unlock_ = true;
243 while (cond.Eval() == false && retcode == 0) {
244 retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout);
247 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
252 // A hack. cv_ should be the first data member so that
253 // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works.
254 // (See also racecheck_unittest.cc)
257 bool signal_at_unlock_; // Set to true if Wait was called.
261 class MutexLock { // Scoped Mutex Locker/Unlocker
275 /// Wrapper for pthread_cond_t.
278 CondVar() { CHECK(0 == pthread_cond_init(&cv_, NULL)); }
279 ~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_)); }
280 void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); }
281 bool WaitWithTimeout(Mutex *mu, int millis) {
283 struct timespec timeout;
284 gettimeofday(&now, NULL);
285 timeval2timespec(&now, &timeout, millis);
286 return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout);
288 void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); }
289 void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); }
295 // pthreads do not allow to use condvar with rwlock so we can't make
296 // ReaderLock method of Mutex to be the real rw-lock.
297 // So, we need a special lock class to test reader locks.
298 #define NEEDS_SEPERATE_RW_LOCK
301 RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); }
302 ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); }
303 void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); }
304 void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); }
305 void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
306 void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
308 pthread_cond_t dummy; // Damn, this requires some redesign...
309 pthread_rwlock_t mu_;
312 class ReaderLockScoped { // Scoped RWLock Locker/Unlocker
314 ReaderLockScoped(RWLock *mu)
318 ~ReaderLockScoped() {
325 class WriterLockScoped { // Scoped RWLock Locker/Unlocker
327 WriterLockScoped(RWLock *mu)
331 ~WriterLockScoped() {
341 /// Wrapper for pthread_create()/pthread_join().
344 typedef void *(*worker_t)(void*);
346 MyThread(worker_t worker, void *arg = NULL, const char *name = NULL)
347 :w_(worker), arg_(arg), name_(name) {}
348 MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL)
349 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
350 MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL)
351 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
353 ~MyThread(){ w_ = NULL; arg_ = NULL;}
354 void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));}
355 void Join() { CHECK(0 == pthread_join(t_, NULL));}
356 pthread_t tid() const { return t_; }
358 static void ThreadBody(MyThread *my_thread) {
359 if (my_thread->name_) {
360 ANNOTATE_THREAD_NAME(my_thread->name_);
362 my_thread->w_(my_thread->arg_);
371 /// Just a message queue.
372 class ProducerConsumerQueue {
374 ProducerConsumerQueue(int unused) {
375 //ANNOTATE_PCQ_CREATE(this);
377 ~ProducerConsumerQueue() {
379 //ANNOTATE_PCQ_DESTROY(this);
383 void Put(void *item) {
386 ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
387 //ANNOTATE_PCQ_PUT(this);
392 // Blocks if the queue is empty.
394 mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
396 bool ok = TryGetInternal(&item);
402 // If queue is not empty,
403 // remove an element from queue, put it into *res and return true.
404 // Otherwise return false.
405 bool TryGet(void **res) {
407 bool ok = TryGetInternal(res);
414 std::queue<void*> q_; // protected by mu_
417 bool TryGetInternal(void ** item_ptr) {
420 *item_ptr = q_.front();
422 //ANNOTATE_PCQ_GET(this);
426 static bool IsQueueNotEmpty(std::queue<void*> * queue) {
427 return !queue->empty();
433 /// Function pointer with zero, one or two parameters.
435 typedef void (*F0)();
436 typedef void (*F1)(void *arg1);
437 typedef void (*F2)(void *arg1, void *arg2);
446 } else if (n_params == 1) {
449 CHECK(n_params == 2);
450 (F2(f))(param1, param2);
456 Closure *NewCallback(void (*f)()) {
457 Closure *res = new Closure;
466 Closure *NewCallback(void (*f)(P1), P1 p1) {
467 CHECK(sizeof(P1) <= sizeof(void*));
468 Closure *res = new Closure;
471 res->param1 = (void*)p1;
476 template <class T, class P1, class P2>
477 Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
478 CHECK(sizeof(P1) <= sizeof(void*));
479 Closure *res = new Closure;
482 res->param1 = (void*)p1;
483 res->param2 = (void*)p2;
487 /*! A thread pool that uses ProducerConsumerQueue.
490 ThreadPool pool(n_workers);
492 pool.Add(NewCallback(func_with_no_args));
493 pool.Add(NewCallback(func_with_one_arg, arg));
494 pool.Add(NewCallback(func_with_two_args, arg1, arg2));
495 ... // more calls to pool.Add()
497 // the ~ThreadPool() is called: we wait workers to finish
498 // and then join all threads in the pool.
503 //! Create n_threads threads, but do not start.
504 explicit ThreadPool(int n_threads)
506 for (int i = 0; i < n_threads; i++) {
507 MyThread *thread = new MyThread(&ThreadPool::Worker, this);
508 workers_.push_back(thread);
512 //! Start all threads.
513 void StartWorkers() {
514 for (size_t i = 0; i < workers_.size(); i++) {
515 workers_[i]->Start();
520 void Add(Closure *closure) {
524 int num_threads() { return workers_.size();}
526 //! Wait workers to finish, then join all threads.
528 for (size_t i = 0; i < workers_.size(); i++) {
531 for (size_t i = 0; i < workers_.size(); i++) {
537 std::vector<MyThread*> workers_;
538 ProducerConsumerQueue queue_;
540 static void *Worker(void *p) {
541 ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
543 Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
544 if(closure == NULL) {
553 /// Wrapper for pthread_barrier_t.
556 explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));}
557 ~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_));}
559 // helgrind 3.3.0 does not have an interceptor for barrier.
560 // but our current local version does.
561 // ANNOTATE_CONDVAR_SIGNAL(this);
562 pthread_barrier_wait(&b_);
563 // ANNOTATE_CONDVAR_WAIT(this, this);
566 pthread_barrier_t b_;
571 class BlockingCounter {
573 explicit BlockingCounter(int initial_count) :
574 count_(initial_count) {}
575 bool DecrementCount() {
576 MutexLock lock(&mu_);
581 mu_.LockWhen(Condition(&IsZero, &count_));
585 static bool IsZero(int *arg) { return *arg == 0; }
590 int AtomicIncrement(volatile int *value, int increment);
593 inline int AtomicIncrement(volatile int *value, int increment) {
594 return __sync_add_and_fetch(value, increment);
599 inline int AtomicIncrement(volatile int *value, int increment) {
600 return OSAtomicAdd32(increment, value);
603 // TODO(timurrrr) this is a hack
604 #define memalign(A,B) malloc(B)
606 // TODO(timurrrr) this is a hack
607 int posix_memalign(void **out, size_t al, size_t size) {
608 *out = memalign(al, size);
613 #endif // THREAD_WRAPPERS_PTHREAD_H
614 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker