]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/libstdc++-v3/contrib/libstdc++-v3-4.3.3/include/parallel/workstealing.h
Inital import
[l4.git] / l4 / pkg / libstdc++-v3 / contrib / libstdc++-v3-4.3.3 / include / parallel / workstealing.h
1 // -*- C++ -*-
2
3 // Copyright (C) 2007, 2008 Free Software Foundation, Inc.
4 //
5 // This file is part of the GNU ISO C++ Library.  This library is free
6 // software; you can redistribute it and/or modify it under the terms
7 // of the GNU General Public License as published by the Free Software
8 // Foundation; either version 2, or (at your option) any later
9 // version.
10
11 // This library is distributed in the hope that it will be useful, but
12 // WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // General Public License for more details.
15
16 // You should have received a copy of the GNU General Public License
17 // along with this library; see the file COPYING.  If not, write to
18 // the Free Software Foundation, 59 Temple Place - Suite 330, Boston,
19 // MA 02111-1307, USA.
20
21 // As a special exception, you may use this file as part of a free
22 // software library without restriction.  Specifically, if other files
23 // instantiate templates or use macros or inline functions from this
24 // file, or you compile this file and link it with other files to
25 // produce an executable, this file does not by itself cause the
26 // resulting executable to be covered by the GNU General Public
27 // License.  This exception does not however invalidate any other
28 // reasons why the executable file might be covered by the GNU General
29 // Public License.
30
31 /** @file parallel/workstealing.h
32  *  @brief Parallelization of embarrassingly parallel execution by
33  *  means of work-stealing.
34  *
35  *  Work stealing is described in
36  *
37  *  R. D. Blumofe and C. E. Leiserson.
38  *  Scheduling multithreaded computations by work stealing.
39  *  Journal of the ACM, 46(5):720–748, 1999.
40  *
41  *  This file is a GNU parallel extension to the Standard C++ Library.
42  */
43
44 // Written by Felix Putze.
45
46 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H
47 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
48
49 #include <parallel/parallel.h>
50 #include <parallel/random_number.h>
51 #include <parallel/compatibility.h>
52
53 namespace __gnu_parallel
54 {
55
56 #define _GLIBCXX_JOB_VOLATILE volatile
57
58 /** @brief One job for a certain thread. */
59 template<typename _DifferenceTp>
60   struct Job
61   {
62     typedef _DifferenceTp difference_type;
63
64     /** @brief First element.
65      *
66      *  Changed by owning and stealing thread. By stealing thread,
67      *  always incremented. */
68     _GLIBCXX_JOB_VOLATILE difference_type first;
69
70     /** @brief Last element.
71      *
72      *  Changed by owning thread only. */
73     _GLIBCXX_JOB_VOLATILE difference_type last;
74
75     /** @brief Number of elements, i. e. @c last-first+1.
76      *
77      *  Changed by owning thread only. */
78     _GLIBCXX_JOB_VOLATILE difference_type load;
79   };
80
81 /** @brief Work stealing algorithm for random access iterators.
82   *
83   *  Uses O(1) additional memory. Synchronization at job lists is
84   *  done with atomic operations.
85   *  @param begin Begin iterator of element sequence.
86   *  @param end End iterator of element sequence.
87   *  @param op User-supplied functor (comparator, predicate, adding
88   *  functor, ...).
89   *  @param f Functor to "process" an element with op (depends on
90   *  desired functionality, e. g. for std::for_each(), ...).
91   *  @param r Functor to "add" a single result to the already
92   *  processed elements (depends on functionality).
93   *  @param base Base value for reduction.
94   *  @param output Pointer to position where final result is written to
95   *  @param bound Maximum number of elements processed (e. g. for
96   *  std::count_n()).
97   *  @return User-supplied functor (that may contain a part of the result).
98   */
99 template<typename RandomAccessIterator,
100          typename Op,
101          typename Fu,
102          typename Red,
103          typename Result>
104   Op
105   for_each_template_random_access_workstealing(RandomAccessIterator begin,
106                                                RandomAccessIterator end,
107                                                Op op, Fu& f, Red r,
108                                                Result base, Result& output,
109                                                typename std::iterator_traits
110                                                <RandomAccessIterator>::
111                                                difference_type bound)
112   {
113     _GLIBCXX_CALL(end - begin)
114
115     typedef std::iterator_traits<RandomAccessIterator> traits_type;
116     typedef typename traits_type::difference_type difference_type;
117     
118     const _Settings& __s = _Settings::get();
119
120     difference_type chunk_size = static_cast<difference_type>(__s.workstealing_chunk_size);
121
122     // How many jobs?
123     difference_type length = (bound < 0) ? (end - begin) : bound;
124
125     // To avoid false sharing in a cache line.
126     const int stride = __s.cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
127
128     // Total number of threads currently working.
129     thread_index_t busy = 0;
130
131     Job<difference_type> *job;
132
133     omp_lock_t output_lock;
134     omp_init_lock(&output_lock);
135
136     // Write base value to output.
137     output = base;
138
139     // No more threads than jobs, at least one thread.
140     thread_index_t num_threads =
141         __gnu_parallel::max<thread_index_t>(1,
142             __gnu_parallel::min<difference_type>(length, get_max_threads()));
143
144 #   pragma omp parallel shared(busy) num_threads(num_threads)
145       {
146
147 #       pragma omp single
148           {
149             num_threads = omp_get_num_threads();
150
151             // Create job description array.
152             job = new Job<difference_type>[num_threads * stride];
153           }
154
155         // Initialization phase.
156
157         // Flags for every thread if it is doing productive work.
158         bool iam_working = false;
159
160         // Thread id.
161         thread_index_t iam = omp_get_thread_num();
162
163         // This job.
164         Job<difference_type>& my_job = job[iam * stride];
165
166         // Random number (for work stealing).
167         thread_index_t victim;
168
169         // Local value for reduction.
170         Result result = Result();
171
172         // Number of elements to steal in one attempt.
173         difference_type steal;
174
175         // Every thread has its own random number generator
176         // (modulo num_threads).
177         random_number rand_gen(iam, num_threads);
178
179         // This thread is currently working.
180 #       pragma omp atomic
181           ++busy;
182
183         iam_working = true;
184
185         // How many jobs per thread? last thread gets the rest.
186         my_job.first =
187             static_cast<difference_type>(iam * (length / num_threads));
188
189         my_job.last = (iam == (num_threads - 1)) ?
190             (length - 1) : ((iam + 1) * (length / num_threads) - 1);
191         my_job.load = my_job.last - my_job.first + 1;
192
193         // Init result with first value (to have a base value for reduction).
194         if (my_job.first <= my_job.last)
195           {
196             // Cannot use volatile variable directly.
197             difference_type my_first = my_job.first;
198             result = f(op, begin + my_first);
199             ++my_job.first;
200             --my_job.load;
201           }
202
203         RandomAccessIterator current;
204
205 #       pragma omp barrier
206
207         // Actual work phase
208         // Work on own or stolen start
209         while (busy > 0)
210           {
211             // Work until no productive thread left.
212 #           pragma omp flush(busy)
213
214             // Thread has own work to do
215             while (my_job.first <= my_job.last)
216               {
217                 // fetch-and-add call
218                 // Reserve current job block (size chunk_size) in my queue.
219                 difference_type current_job =
220                   fetch_and_add<difference_type>(&(my_job.first), chunk_size);
221
222                 // Update load, to make the three values consistent,
223                 // first might have been changed in the meantime
224                 my_job.load = my_job.last - my_job.first + 1;
225                 for (difference_type job_counter = 0;
226                      job_counter < chunk_size && current_job <= my_job.last;
227                      ++job_counter)
228                   {
229                     // Yes: process it!
230                     current = begin + current_job;
231                     ++current_job;
232
233                     // Do actual work.
234                     result = r(result, f(op, current));
235                   }
236
237 #               pragma omp flush(busy)
238               }
239
240             // After reaching this point, a thread's job list is empty.
241             if (iam_working)
242               {
243                 // This thread no longer has work.
244 #               pragma omp atomic
245                 --busy;
246
247                 iam_working = false;
248               }
249
250             difference_type supposed_first, supposed_last, supposed_load;
251             do
252               {
253                 // Find random nonempty deque (not own), do consistency check.
254                 yield();
255 #               pragma omp flush(busy)
256                 victim = rand_gen();
257                 supposed_first = job[victim * stride].first;
258                 supposed_last = job[victim * stride].last;
259                 supposed_load = job[victim * stride].load;
260               }
261             while (busy > 0
262               && ((supposed_load <= 0)
263                 || ((supposed_first + supposed_load - 1) != supposed_last)));
264
265             if (busy == 0)
266               break;
267
268             if (supposed_load > 0)
269               {
270                 // Has work and work to do.
271                 // Number of elements to steal (at least one).
272                 steal = (supposed_load < 2) ? 1 : supposed_load / 2;
273
274                 // Push victim's start forward.
275                 difference_type stolen_first =
276                     fetch_and_add<difference_type>(
277                         &(job[victim * stride].first), steal);
278                 difference_type stolen_try =
279                     stolen_first + steal - difference_type(1);
280
281                 my_job.first = stolen_first;
282                 my_job.last = __gnu_parallel::min(stolen_try, supposed_last);
283                 my_job.load = my_job.last - my_job.first + 1;
284
285                 // Has potential work again.
286 #               pragma omp atomic
287                   ++busy;
288                 iam_working = true;
289
290 #               pragma omp flush(busy)
291               }
292 #           pragma omp flush(busy)
293           } // end while busy > 0
294             // Add accumulated result to output.
295         omp_set_lock(&output_lock);
296         output = r(output, result);
297         omp_unset_lock(&output_lock);
298       }
299
300     delete[] job;
301
302     // Points to last element processed (needed as return value for
303     // some algorithms like transform)
304     f.finish_iterator = begin + length;
305
306     omp_destroy_lock(&output_lock);
307
308     return op;
309   }
310 } // end namespace
311
312 #endif