[ VIGRA Homepage | Function Index | Class Index | Namespaces | File List | Main Page ]

threadpool.hxx
1/************************************************************************/
2/* */
3/* Copyright 2014-2015 by Thorsten Beier, Philip Schill */
4/* and Ullrich Koethe */
5/* */
6/* This file is part of the VIGRA computer vision library. */
7/* The VIGRA Website is */
8/* http://hci.iwr.uni-heidelberg.de/vigra/ */
9/* Please direct questions, bug reports, and contributions to */
10/* ullrich.koethe@iwr.uni-heidelberg.de or */
11/* vigra@informatik.uni-hamburg.de */
12/* */
13/* Permission is hereby granted, free of charge, to any person */
14/* obtaining a copy of this software and associated documentation */
15/* files (the "Software"), to deal in the Software without */
16/* restriction, including without limitation the rights to use, */
17/* copy, modify, merge, publish, distribute, sublicense, and/or */
18/* sell copies of the Software, and to permit persons to whom the */
19/* Software is furnished to do so, subject to the following */
20/* conditions: */
21/* */
22/* The above copyright notice and this permission notice shall be */
23/* included in all copies or substantial portions of the */
24/* Software. */
25/* */
26/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
27/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
28/* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
29/* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
30/* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
31/* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
32/* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
33/* OTHER DEALINGS IN THE SOFTWARE. */
34/* */
35/************************************************************************/
36#ifndef VIGRA_THREADPOOL_HXX
37#define VIGRA_THREADPOOL_HXX
38
39#include <vector>
40#include <queue>
41#include <stdexcept>
42#include <cmath>
43#include "mathutil.hxx"
44#include "counting_iterator.hxx"
45#include "threading.hxx"
46
47
48namespace vigra
49{
50
51/** \addtogroup ParallelProcessing
52*/
53
54//@{
55
56 /**\brief Option base class for parallel algorithms.
57
58 <b>\#include</b> <vigra/threadpool.hxx><br>
59 Namespace: vigra
60 */
62{
63 public:
64
65 /** Constants for special settings.
66 */
67 enum {
68 Auto = -1, ///< Determine number of threads automatically (from <tt>threading::thread::hardware_concurrency()</tt>)
69 Nice = -2, ///< Use half as many threads as <tt>Auto</tt> would.
70 NoThreads = 0 ///< Switch off multi-threading (i.e. execute tasks sequentially)
71 };
72
74 : numThreads_(actualNumThreads(Auto))
75 {}
76
77 /** \brief Get desired number of threads.
78
79 <b>Note:</b> This function may return 0, which means that multi-threading
80 shall be switched off entirely. If an algorithm receives this value,
81 it should revert to a sequential implementation. In contrast, if
82 <tt>numThread() == 1</tt>, the parallel algorithm version shall be
83 executed with a single thread.
84 */
85 int getNumThreads() const
86 {
87 return numThreads_;
88 }
89
90 /** \brief Get desired number of threads.
91
92 In contrast to <tt>numThread()</tt>, this will always return a value <tt>>=1</tt>.
93 */
95 {
96 return std::max(1,numThreads_);
97 }
98
99 /** \brief Set the number of threads or one of the constants <tt>Auto</tt>,
100 <tt>Nice</tt> and <tt>NoThreads</tt>.
101
102 Default: <tt>ParallelOptions::Auto</tt> (use system default)
103
104 This setting is ignored if the preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>
105 is defined. Then, the number of threads is set to 0 and all tasks revert to
106 sequential algorithm implementations. The same can be achieved at runtime
107 by passing <tt>n = 0</tt> to this function. In contrast, passing <tt>n = 1</tt>
108 causes the parallel algorithm versions to be executed with a single thread.
109 Both possibilities are mainly useful for debugging.
110 */
112 {
113 numThreads_ = actualNumThreads(n);
114 return *this;
115 }
116
117
118 private:
119 // helper function to compute the actual number of threads
120 static size_t actualNumThreads(const int userNThreads)
121 {
122 #ifdef VIGRA_SINGLE_THREADED
123 return 0;
124 #else
125 return userNThreads >= 0
126 ? userNThreads
127 : userNThreads == Nice
128 ? threading::thread::hardware_concurrency() / 2
129 : threading::thread::hardware_concurrency();
130 #endif
131 }
132
133 int numThreads_;
134};
135
136/********************************************************/
137/* */
138/* ThreadPool */
139/* */
140/********************************************************/
141
142 /**\brief Thread pool class to manage a set of parallel workers.
143
144 <b>\#include</b> <vigra/threadpool.hxx><br>
145 Namespace: vigra
146 */
148{
149 public:
150
151 /** Create a thread pool from ParallelOptions. The constructor just launches
152 the desired number of workers. If the number of threads is zero,
153 no workers are started, and all tasks will be executed in synchronously
154 in the present thread.
155 */
157 : stop(false)
158 {
159 init(options);
160 }
161
162 /** Create a thread pool with n threads. The constructor just launches
163 the desired number of workers. If \arg n is <tt>ParallelOptions::Auto</tt>,
164 the number of threads is determined by <tt>threading::thread::hardware_concurrency()</tt>.
165 <tt>ParallelOptions::Nice</tt> will create half as many threads.
166 If <tt>n = 0</tt>, no workers are started, and all tasks will be executed
167 synchronously in the present thread. If the preprocessor flag
168 <tt>VIGRA_SINGLE_THREADED</tt> is defined, the number of threads is always set
169 to zero (i.e. synchronous execution), regardless of the value of \arg n. This
170 is useful for debugging.
171 */
172 ThreadPool(const int n)
173 : stop(false)
174 {
175 init(ParallelOptions().numThreads(n));
176 }
177
178 /**
179 * The destructor joins all threads.
180 */
181 ~ThreadPool();
182
183 /**
184 * Enqueue a task that will be executed by the thread pool.
185 * The task result can be obtained using the get() function of the returned future.
186 * If the task throws an exception, it will be raised on the call to get().
187 */
188 template<class F>
189 auto enqueueReturning(F&& f) -> threading::future<decltype(f(0))>;
190
191 /**
192 * Enqueue function for tasks without return value.
193 * This is a special case of the enqueueReturning template function, but
194 * some compilers fail on <tt>std::result_of<F(int)>::type</tt> for void(int) functions.
195 */
196 template<class F>
197 threading::future<void> enqueue(F&& f) ;
198
199 /**
200 * Block until all tasks are finished.
201 */
203 {
204 threading::unique_lock<threading::mutex> lock(queue_mutex);
205 finish_condition.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
206 }
207
208 /**
209 * Return the number of worker threads.
210 */
211 size_t nThreads() const
212 {
213 return workers.size();
214 }
215
216private:
217
218 // helper function to init the thread pool
219 void init(const ParallelOptions & options);
220
221 // need to keep track of threads so we can join them
222 std::vector<threading::thread> workers;
223
224 // the task queue
225 std::queue<std::function<void(int)> > tasks;
226
227 // synchronization
228 threading::mutex queue_mutex;
229 threading::condition_variable worker_condition;
230 threading::condition_variable finish_condition;
231 bool stop;
232 threading::atomic_long busy, processed;
233};
234
235inline void ThreadPool::init(const ParallelOptions & options)
236{
237 busy.store(0);
238 processed.store(0);
239
240 const size_t actualNThreads = options.getNumThreads();
241 for(size_t ti = 0; ti<actualNThreads; ++ti)
242 {
243 workers.emplace_back(
244 [ti,this]
245 {
246 for(;;)
247 {
248 std::function<void(int)> task;
249 {
250 threading::unique_lock<threading::mutex> lock(this->queue_mutex);
251
252 // will wait if : stop == false AND queue is empty
253 // if stop == true AND queue is empty thread function will return later
254 //
255 // so the idea of this wait, is : If where are not in the destructor
256 // (which sets stop to true, we wait here for new jobs)
257 this->worker_condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
258 if(!this->tasks.empty())
259 {
260 ++busy;
261 task = std::move(this->tasks.front());
262 this->tasks.pop();
263 lock.unlock();
264 task(ti);
265 ++processed;
266 --busy;
267 finish_condition.notify_one();
268 }
269 else if(stop)
270 {
271 return;
272 }
273 }
274 }
275 }
276 );
277 }
278}
279
281{
282 {
283 threading::unique_lock<threading::mutex> lock(queue_mutex);
284 stop = true;
285 }
286 worker_condition.notify_all();
287 for(threading::thread &worker: workers)
288 worker.join();
289}
290
291template<class F>
292inline auto
293ThreadPool::enqueueReturning(F&& f) -> threading::future<decltype(f(0))>
294{
295 typedef decltype(f(0)) result_type;
296 typedef threading::packaged_task<result_type(int)> PackageType;
297
298 auto task = std::make_shared<PackageType>(f);
299 auto res = task->get_future();
300
301 if(workers.size()>0){
302 {
303 threading::unique_lock<threading::mutex> lock(queue_mutex);
304
305 // don't allow enqueueing after stopping the pool
306 if(stop)
307 throw std::runtime_error("enqueue on stopped ThreadPool");
308
309 tasks.emplace(
310 [task](int tid)
311 {
312 (*task)(std::move(tid));
313 }
314 );
315 }
316 worker_condition.notify_one();
317 }
318 else{
319 (*task)(0);
320 }
321
322 return res;
323}
324
325template<class F>
326inline threading::future<void>
328{
329#if defined(USE_BOOST_THREAD) && \
330 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
331 // Without variadic templates, boost:thread::packaged_task only
332 // supports the signature 'R()' (functions with no arguments).
333 // We bind the thread_id parameter to 0, so this parameter
334 // must NOT be used in function f (fortunately, this is the case
335 // for the blockwise versions of convolution, labeling and
336 // watersheds).
337 typedef threading::packaged_task<void()> PackageType;
338 auto task = std::make_shared<PackageType>(std::bind(f, 0));
339#else
340 typedef threading::packaged_task<void(int)> PackageType;
341 auto task = std::make_shared<PackageType>(f);
342#endif
343
344 auto res = task->get_future();
345 if(workers.size()>0){
346 {
347 threading::unique_lock<threading::mutex> lock(queue_mutex);
348
349 // don't allow enqueueing after stopping the pool
350 if(stop)
351 throw std::runtime_error("enqueue on stopped ThreadPool");
352
353 tasks.emplace(
354 [task](int tid)
355 {
356#if defined(USE_BOOST_THREAD) && \
357 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
358 (*task)();
359#else
360 (*task)(std::move(tid));
361#endif
362 }
363 );
364 }
365 worker_condition.notify_one();
366 }
367 else{
368#if defined(USE_BOOST_THREAD) && \
369 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
370 (*task)();
371#else
372 (*task)(0);
373#endif
374 }
375 return res;
376}
377
378/********************************************************/
379/* */
380/* parallel_foreach */
381/* */
382/********************************************************/
383
384// nItems must be either zero or std::distance(iter, end).
385// NOTE: the redundancy of nItems and iter,end here is due to the fact that, for forward iterators,
386// computing the distance from iterators is costly, and, for input iterators, we might not know in advance
387// how many items there are (e.g., stream iterators).
388template<class ITER, class F>
389inline void parallel_foreach_impl(
390 ThreadPool & pool,
391 const std::ptrdiff_t nItems,
392 ITER iter,
393 ITER end,
394 F && f,
395 std::random_access_iterator_tag
396){
397 std::ptrdiff_t workload = std::distance(iter, end);
398 vigra_precondition(workload == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
399 const float workPerThread = float(workload)/pool.nThreads();
400 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
401
402 std::vector<threading::future<void> > futures;
403 for( ;iter<end; iter+=chunkedWorkPerThread)
404 {
405 const size_t lc = std::min(workload, chunkedWorkPerThread);
406 workload-=lc;
407 futures.emplace_back(
408 pool.enqueue(
409 [&f, iter, lc]
410 (int id)
411 {
412 for(size_t i=0; i<lc; ++i)
413 f(id, iter[i]);
414 }
415 )
416 );
417 }
418 for (auto & fut : futures)
419 {
420 fut.get();
421 }
422}
423
424
425
426// nItems must be either zero or std::distance(iter, end).
427template<class ITER, class F>
428inline void parallel_foreach_impl(
429 ThreadPool & pool,
430 const std::ptrdiff_t nItems,
431 ITER iter,
432 ITER end,
433 F && f,
434 std::forward_iterator_tag
435){
436 if (nItems == 0)
437 nItems = std::distance(iter, end);
438
439 std::ptrdiff_t workload = nItems;
440 const float workPerThread = float(workload)/pool.nThreads();
441 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
442
443 std::vector<threading::future<void> > futures;
444 for(;;)
445 {
446 const size_t lc = std::min(chunkedWorkPerThread, workload);
447 workload -= lc;
448 futures.emplace_back(
449 pool.enqueue(
450 [&f, iter, lc]
451 (int id)
452 {
453 auto iterCopy = iter;
454 for(size_t i=0; i<lc; ++i){
455 f(id, *iterCopy);
456 ++iterCopy;
457 }
458 }
459 )
460 );
461 for (size_t i = 0; i < lc; ++i)
462 {
463 ++iter;
464 if (iter == end)
465 {
466 vigra_postcondition(workload == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
467 break;
468 }
469 }
470 if(workload==0)
471 break;
472 }
473 for (auto & fut : futures)
474 fut.get();
475}
476
477
478
479// nItems must be either zero or std::distance(iter, end).
480template<class ITER, class F>
481inline void parallel_foreach_impl(
482 ThreadPool & pool,
483 const std::ptrdiff_t nItems,
484 ITER iter,
485 ITER end,
486 F && f,
487 std::input_iterator_tag
488){
489 std::ptrdiff_t num_items = 0;
490 std::vector<threading::future<void> > futures;
491 for (; iter != end; ++iter)
492 {
493 auto item = *iter;
494 futures.emplace_back(
495 pool.enqueue(
496 [&f, &item](int id){
497 f(id, item);
498 }
499 )
500 );
501 ++num_items;
502 }
503 vigra_postcondition(num_items == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
504 for (auto & fut : futures)
505 fut.get();
506}
507
508// Runs foreach on a single thread.
509// Used for API compatibility when the numbe of threads is 0.
510template<class ITER, class F>
511inline void parallel_foreach_single_thread(
512 ITER begin,
513 ITER end,
514 F && f,
515 const std::ptrdiff_t nItems = 0
516){
517 std::ptrdiff_t n = 0;
518 for (; begin != end; ++begin)
519 {
520 f(0, *begin);
521 ++n;
522 }
523 vigra_postcondition(n == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
524}
525
526/** \brief Apply a functor to all items in a range in parallel.
527
528 <b> Declarations:</b>
529
530 \code
531 namespace vigra {
532 // pass the desired number of threads or ParallelOptions::Auto
533 // (creates an internal thread pool accordingly)
534 template<class ITER, class F>
535 void parallel_foreach(int64_t nThreads,
536 ITER begin, ITER end,
537 F && f,
538 const uint64_t nItems = 0);
539
540 // use an existing thread pool
541 template<class ITER, class F>
542 void parallel_foreach(ThreadPool & pool,
543 ITER begin, ITER end,
544 F && f,
545 const uint64_t nItems = 0);
546
547 // pass the integers from 0 ... (nItems-1) to the functor f,
548 // using the given number of threads or ParallelOptions::Auto
549 template<class F>
550 void parallel_foreach(int64_t nThreads,
551 uint64_t nItems,
552 F && f);
553
554 // likewise with an existing thread pool
555 template<class F>
556 void parallel_foreach(ThreadPool & threadpool,
557 uint64_t nItems,
558 F && f);
559 }
560 \endcode
561
562 Create a thread pool (or use an existing one) to apply the functor \arg f
563 to all items in the range <tt>[begin, end)</tt> in parallel. \arg f must
564 be callable with two arguments of type <tt>size_t</tt> and <tt>T</tt>, where
565 the first argument is the thread index (starting at 0) and T is convertible
566 from the iterator's <tt>reference_type</tt> (i.e. the result of <tt>*begin</tt>).
567
568 If the iterators are forward iterators (<tt>std::forward_iterator_tag</tt>), you
569 can provide the optional argument <tt>nItems</tt> to avoid the a
570 <tt>std::distance(begin, end)</tt> call to compute the range's length.
571
572 Parameter <tt>nThreads</tt> controls the number of threads. <tt>parallel_foreach</tt>
573 will split the work into about three times as many parallel tasks.
574 If <tt>nThreads = ParallelOptions::Auto</tt>, the number of threads is set to
575 the machine default (<tt>std::thread::hardware_concurrency()</tt>).
576
577 If <tt>nThreads = 0</tt>, the function will not use threads,
578 but will call the functor sequentially. This can also be enforced by setting the
579 preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>, ignoring the value of
580 <tt>nThreads</tt> (useful for debugging).
581
582 <b>Usage:</b>
583
584 \code
585 #include <iostream>
586 #include <algorithm>
587 #include <vector>
588 #include <vigra/threadpool.hxx>
589
590 using namespace std;
591 using namespace vigra;
592
593 int main()
594 {
595 size_t const n_threads = 4;
596 size_t const n = 2000;
597 vector<int> input(n);
598
599 auto iter = input.begin(),
600 end = input.end();
601
602 // fill input with 0, 1, 2, ...
603 iota(iter, end, 0);
604
605 // compute the sum of the elements in the input vector.
606 // (each thread computes the partial sum of the items it sees
607 // and stores the sum at the appropriate index of 'results')
608 vector<int> results(n_threads, 0);
609 parallel_foreach(n_threads, iter, end,
610 // the functor to be executed, defined as a lambda function
611 // (first argument: thread ID, second argument: result of *iter)
612 [&results](size_t thread_id, int items)
613 {
614 results[thread_id] += items;
615 }
616 );
617
618 // collect the partial sums of all threads
619 int sum = accumulate(results.begin(), results.end(), 0);
620
621 cout << "The sum " << sum << " should be equal to " << (n*(n-1))/2 << endl;
622 }
623 \endcode
624 */
625doxygen_overloaded_function(template <...> void parallel_foreach)
626
627template<class ITER, class F>
628inline void parallel_foreach(
629 ThreadPool & pool,
630 ITER begin,
631 ITER end,
632 F && f,
633 const std::ptrdiff_t nItems = 0)
634{
635 if(pool.nThreads()>1)
636 {
637 parallel_foreach_impl(pool,nItems, begin, end, f,
638 typename std::iterator_traits<ITER>::iterator_category());
639 }
640 else
641 {
642 parallel_foreach_single_thread(begin, end, f, nItems);
643 }
644}
645
646template<class ITER, class F>
647inline void parallel_foreach(
648 int64_t nThreads,
649 ITER begin,
650 ITER end,
651 F && f,
652 const std::ptrdiff_t nItems = 0)
653{
654
655 ThreadPool pool(nThreads);
656 parallel_foreach(pool, begin, end, f, nItems);
657}
658
659template<class F>
660inline void parallel_foreach(
661 int64_t nThreads,
662 std::ptrdiff_t nItems,
663 F && f)
664{
665 auto iter = range(nItems);
666 parallel_foreach(nThreads, iter, iter.end(), f, nItems);
667}
668
669
670template<class F>
671inline void parallel_foreach(
672 ThreadPool & threadpool,
673 std::ptrdiff_t nItems,
674 F && f)
675{
676 auto iter = range(nItems);
677 parallel_foreach(threadpool, iter, iter.end(), f, nItems);
678}
679
680//@}
681
682} // namespace vigra
683
684#endif // VIGRA_THREADPOOL_HXX
Option base class for parallel algorithms.
Definition: threadpool.hxx:62
@ Nice
Use half as many threads as Auto would.
Definition: threadpool.hxx:69
@ NoThreads
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:70
@ Auto
Determine number of threads automatically (from threading::thread::hardware_concurrency())
Definition: threadpool.hxx:68
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:111
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:85
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:94
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:148
auto enqueueReturning(F &&f) -> threading::future< decltype(f(0))>
Definition: threadpool.hxx:293
size_t nThreads() const
Definition: threadpool.hxx:211
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:156
threading::future< void > enqueue(F &&f)
Definition: threadpool.hxx:327
ThreadPool(const int n)
Definition: threadpool.hxx:172
void waitFinished()
Definition: threadpool.hxx:202
~ThreadPool()
Definition: threadpool.hxx:280
Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.

© Ullrich Köthe (ullrich.koethe@iwr.uni-heidelberg.de)
Heidelberg Collaboratory for Image Processing, University of Heidelberg, Germany

html generated using doxygen and Python
vigra 1.11.1