Building a Thread Pool with C++ and STL
Thread Pool Implementation in Plain C++ Using STL: A Comprehensive Guide for Beginners and Experienced Developers
Thread pools are a fundamental concept in concurrent programming, providing a way to efficiently manage and execute a large number of tasks concurrently. Thread pools are a group of worker threads that can be used to execute tasks concurrently. Instead of creating a new thread for each task, a thread pool reuses existing threads, which can significantly improve performance and resource utilization.
Boost’s asio::thread_pool is a powerful tool in the Boost library that makes asynchronous I/O and concurrency easier. Inspired by Boost, this article carefully shows how to create a thread pool using only the STL and C++. This article also includes code breakdowns and sequence diagrams to help readers understand the implementation better.
The complete example can be found here or at the end of the article.
Boost Library
The Boost libraries are a collection of open-source C++ libraries that provide a wide range of functionality, including networking, concurrency, filesystem, regular expressions, and testing. Boost aims to provide portable, efficient, and reusable code for C++ developers. The Boost libraries are a valuable source of new features and functionality for the C++ language, and many Boost components have been adopted into the C++ Standard Library over the years.
Boost Asio Library
Boost Asio is a part of the Boost library that provides support for asynchronous I/O and concurrency. It’s commonly used for building network and communication applications. Asynchronous I/O allows the program to perform other tasks while waiting for I/O operations to complete, improving efficiency and responsiveness.
boost::asio::thread_pool
The Asio library in Boost includes a thread pool implementation starting with 1.66 version. boost::asio::thread_pool
is a component within boost::asio
that provides a simple way to manage a pool of threads.
Here’s a basic overview of using boost::asio::thread_pool
:
#include <iostream>
#include <boost/asio/thread_pool.hpp>
int main() {
boost::asio::thread_pool pool;
for (int i = 0; i < 5; ++i) {
boost::asio::post(pool, [i]() {
std::cout << "Task " << i
<< " executed by thread " << std::this_thread::get_id() << std::endl;
});
}
std::future<int> r1 = boost::asio::post(pool, asio::use_future([]() { return 2; }));
std::cout << "Result = " << r1.get() << '\n';
// Optional: Wait for all tasks to complete
pool.join();
return 0;
}
Rewrite It with STL
When I saw the new thread_pool
usage and the new definition in the Asio library, I was impressed. To understand how they were able to get return values from asynchronous operations using a single wrapper function and to improve my skills, I decided to rewrite the thread pool implementation in plain C++ using only the STL. I set a requirement for myself to replicate the boost::asio::thread_pool
implementation as closely as possible, including the function calls and wrapper. When I finished my implementation, it looked like this:
class thread_pool
{
private:
std::atomic_bool is_active {true};
std::vector<std::thread> pool;
std::condition_variable cv;
std::mutex guard;
std::deque<std::packaged_task<void()>> pending_jobs;
public:
explicit thread_pool(int num_threads = 1)
{
for (int i = 0; i < num_threads; i++) {
pool.emplace_back(&thread_pool::run, this);
}
}
~thread_pool() {
is_active = false;
cv.notify_all();
for (auto& th : pool) {
th.join();
}
}
void post(std::packaged_task<void()> job) {
std::unique_lock lock(guard);
pending_jobs.emplace_back(std::move(job));
cv.notify_one();
}
private:
void run() noexcept
{
while (is_active)
{
thread_local std::packaged_task<void()> job;
{
std::unique_lock lock(guard);
cv.wait(lock, [&]{ return !pending_jobs.empty() || !is_active; });
if (!is_active) break;
job.swap(pending_jobs.front());
pending_jobs.pop_front();
}
job();
}
}
};
Let’s break down the key components of the implementation:
thread_pool
Class: This class encapsulates the entire thread pool.
It maintains a set of threads, a condition variable (cv
), and a mutex (guard
) to synchronize access to the task queue.post
Method: Enqueues a new task into the task queue and notifies a waiting thread.run
Method: The actual function executed by each worker thread. It continuously waits for tasks in the queue and executes them. In this function, thread_pool doesn’t know and does not care whether the executing function has return value or not. This will be handled by the globalpost
function by usinguse_future
wrapper function.
Note: where the line
thread_local std::packaged_task<void()> job;
I usedthread_local
to make sure that the ownership of the executing task is moved to the thread where it will be called.
Let’s write the global post
and use_future
functions to complete the thread pool implementation:
template <class Executor, class Fn>
void post(Executor& exec, Fn&& func)
{
using return_type = decltype(func());
static_assert(std::is_void_v<return_type>, "posting functions with return types must be used with \"use_future\" tag.");
std::packaged_task<void()> task(std::forward<Fn>(func));
exec.post(std::move(task));
}
struct use_future_tag {};
template <class Fn>
constexpr auto use_future(Fn&& func) {
return std::make_tuple(use_future_tag{}, std::forward<Fn>(func));
}
template <class Executor, class Fn>
[[nodiscard]] decltype(auto)
post(Executor& exec, std::tuple<use_future_tag, Fn>&& tpl)
{
using return_type = std::invoke_result_t<Fn>;
auto&& [_, func] = tpl;
if constexpr (std::is_void_v<return_type>)
{
std::packaged_task<void()> tsk(std::move(func));
auto ret_future = tsk.get_future();
exec.post(std::move(tsk));
return ret_future;
}
else
{
struct forwarder_t {
forwarder_t(Fn&& fn) : tsk(std::forward<Fn>(fn)) {}
void operator()(std::shared_ptr<std::promise<return_type>> promise) noexcept
{
promise->set_value(tsk());
}
private:
std::decay_t<Fn> tsk;
} forwarder(std::forward<Fn>(func));
auto promise = std::make_shared<std::promise<return_type>>();
auto ret_future = promise->get_future();
std::packaged_task<void()> tsk([promise = std::move(promise), forwarder = std::move(forwarder)] () mutable {
forwarder(promise);
});
exec.post(std::move(tsk));
return ret_future;
}
}
post
Function: this is the plain function which only creates apackaged_task
from the given lambda or function pointer and puts into queue of the providedExecutor
class, which is ourthread_pool
in this instance.use_future
Function: Instead of Boost implementation (which they have created an allocator aware wrapper class that holds the callable) what I did here is I simply moved the callable object into a tuple withuse_future_tag
struct. Therefore I can create an overload version ofpost
function returns astd::future<>
object with the return type of the callable object.
All problems in computer science can be solved by another level of indirection. Butler Lampson, 1972
- Second
post
Function: As thethread_pool
class only acceptsstd::packaged_task<void()>
type of callable into its queue, I had to create a higher order function, a local callable classforwarder_t
which makes the actual function call, stores the return value into a promise object and returns void. When thethread_pool
run the task which has return value, the following sequence will be executed:
Complete Example
I learned a lot by studying how thread pools work, exploring the Boost boost::asio::thread_pool
library, and writing my own thread pool implementation in plain C++. I think this experience has helped me to understand concurrent programming better, especially how thread pools work.
I hope this article has been helpful to you, whether you are using Boost’s thread pool or are looking for a simpler, dependency-free solution.
Happy coding.
Complete example can be found Compiler Explorer
#include <iostream>
#include <memory>
#include <thread>
#include <mutex>
#include <future>
#include <condition_variable>
#include <functional>
#include <vector>
#include <deque>
#include <type_traits>
class thread_pool
{
private:
std::atomic_bool is_active {true};
std::vector<std::thread> pool;
std::condition_variable cv;
std::mutex guard;
std::deque<std::packaged_task<void()>> pending_jobs;
public:
explicit thread_pool(int num_threads = 1)
{
for (int i = 0; i < num_threads; i++) {
pool.emplace_back(&thread_pool::run, this);
}
}
~thread_pool() {
is_active = false;
cv.notify_all();
for (auto& th : pool) {
th.join();
}
}
void post(std::packaged_task<void()> job) {
std::unique_lock lock(guard);
pending_jobs.emplace_back(std::move(job));
cv.notify_one();
}
private:
void run() noexcept
{
while (is_active)
{
thread_local std::packaged_task<void()> job;
{
std::unique_lock lock(guard);
cv.wait(lock, [&]{ return !pending_jobs.empty() || !is_active; });
if (!is_active) break;
job.swap(pending_jobs.front());
pending_jobs.pop_front();
}
job();
}
}
};
struct use_future_tag {};
template <class Fn>
constexpr auto use_future(Fn&& func) {
return std::make_tuple(use_future_tag {}, std::forward<Fn>(func));
}
template <class Executor, class Fn>
void post(Executor& exec, Fn&& func)
{
using return_type = decltype(func());
static_assert(std::is_void_v<return_type>, "posting functions with return types must be used with \"use_future\" tag.");
std::packaged_task<void()> task(std::forward<Fn>(func));
exec.post(std::move(task));
}
template <class Executor, class Fn>
[[nodiscard]] decltype(auto)
post(Executor& exec, std::tuple<use_future_tag, Fn>&& tpl)
{
using return_type = std::invoke_result_t<Fn>;
auto&& [_, func] = tpl;
if constexpr (std::is_void_v<return_type>)
{
std::packaged_task<void()> tsk(std::move(func));
auto ret_future = tsk.get_future();
exec.post(std::move(tsk));
return ret_future;
}
else
{
struct forwarder_t {
forwarder_t(Fn&& fn) : tsk(std::forward<Fn>(fn)) {}
void operator()(std::shared_ptr<std::promise<return_type>> promise) noexcept
{
promise->set_value(tsk());
}
private:
std::decay_t<Fn> tsk;
} forwarder(std::forward<Fn>(func));
auto promise = std::make_shared<std::promise<return_type>>();
auto ret_future = promise->get_future();
std::packaged_task<void()> tsk([promise = std::move(promise), forwarder = std::move(forwarder)] () mutable {
forwarder(promise);
});
exec.post(std::move(tsk));
return ret_future;
}
}
using namespace std::chrono_literals;
int main()
{
thread_pool pool {2};
auto waiter =
post(pool, use_future([]
{
std::this_thread::sleep_for(1s);
return 2;
}));
auto test_lmbda = [] {
thread_local int count = 1;
std::cout
<< "working thread: " << std::this_thread::get_id()
<< "\tcount: " << count++ << std::endl;
std::this_thread::sleep_for(50ms);
};
for (size_t i = 0; i < 10; i++)
{
post(pool, test_lmbda);
}
return waiter.get();
}
Originally published at https://nixiz.github.io on October 6, 2023.