PandaRoot
PndLmdThreadPool.h
Go to the documentation of this file.
1 /*
2  * @author Sehe (https://stackoverflow.com/users/85371/sehe)
3  *
4  * Thread Pool implementation using boost::threads.
5  *
6  * taken from this StackOverflow answer:
7  * https://stackoverflow.com/questions/22685176/boost-group-threads-maximal-number-of-parallel-thread
8  *
9  */
10 
11 #include <boost/thread.hpp>
12 #include <boost/phoenix.hpp>
13 #include <boost/optional.hpp>
14 
15 // using namespace boost;
16 using namespace boost::phoenix::arg_names;
17 
19  private:
20  boost::mutex mx;
21  boost::condition_variable cv;
22 
23  typedef boost::function<void()> job_t;
24  std::deque<job_t> _queue;
25 
26  boost::thread_group pool;
27 
28  boost::atomic_bool shutdown;
29  static void worker_thread(PndLmdThreadPool &q)
30  {
31  while (boost::optional<job_t> job = q.dequeue())
32  (*job)();
33  }
34 
35  public:
36  // create thread pool with maximum possible threads
37  PndLmdThreadPool() : shutdown(false)
38  {
39  for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
40  pool.create_thread(boost::bind(worker_thread, boost::ref(*this)));
41  }
42 
43  // create thread pool with maxThreads threads
44  PndLmdThreadPool(unsigned int maxThreads) : shutdown(false)
45  {
46  if (maxThreads > boost::thread::hardware_concurrency() || maxThreads == 0)
47  maxThreads = boost::thread::hardware_concurrency();
48  for (unsigned i = 0; i < maxThreads; ++i)
49  pool.create_thread(boost::bind(worker_thread, boost::ref(*this)));
50  }
51 
52  void enqueue(job_t job)
53  {
54  boost::lock_guard<boost::mutex> lk(mx);
55  _queue.push_back(job);
56  cv.notify_one();
57  }
58 
59  boost::optional<job_t> dequeue()
60  {
61  boost::unique_lock<boost::mutex> lk(mx);
62  namespace phx = boost::phoenix;
63  cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
64  if (_queue.empty())
65  return boost::none;
66  job_t job = _queue.front();
67  _queue.pop_front();
68  return job;
69  }
70 
71  // wait for all threads to complete
72  void wait()
73  {
74  shutdown = true;
75  {
76  boost::lock_guard<boost::mutex> lk(mx);
77  cv.notify_all();
78  }
79  pool.join_all();
80  }
81 
83  {
84  shutdown = true;
85  {
86  boost::lock_guard<boost::mutex> lk(mx);
87  cv.notify_all();
88  }
89  pool.join_all();
90  }
91 };
void enqueue(job_t job)
unsigned int i
Definition: P4_F32vec4.h:21
PndLmdThreadPool(unsigned int maxThreads)
boost::optional< job_t > dequeue()