Building Concurrent Queue in C++

Queues are one of the most useful data structures. It helps us maintain order and process the tasks as they arrive. A good real life use case if printing documents where order of printing is essential. A basic queue has two primary operations, first is to add data to it using push() and the other is pop() to push data out of the queue.

Even with C++ 11 numerous new additions, we are yet not provided with a concurrent queue. Today let’s try to make a thread safe queue so that we can have concurrent access to it. Basically making a concurrent queue will allow multiple threads to simultaneously read and write data to it.

We will use the latest C++ 11 threads and std::queue (STL Queue) along with mutexes for locking and unlocking the critical sections.

Concurrent Queue

We can directly jump to the implementation of the concurrent queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <queue>
#include <mutex>

template<typename T>
class concurrent_queue {
public:
void push(const T& value) {
_mutx.lock();
_queue.push(value);
_mutx.unlock();
}

bool pop(T& out) {
_mutx.lock();
if (_queue.empty()) {
_mutx.unlock();
return false;
}
out = _queue.front();
_queue.pop();
_mutx.unlock();
return true;
}

private:
std::queue<T> _queue;
std::mutex _mutx;
};

That’s it? It was so simple? This was my first implementation before reading some articles and looking at the different implementations.

Technically we can improve a lot of things here, but let me point out the minimal important ones:

  • We are not controlling the capacity. We are not accepting the size of the queue, neither we have a default size.
  • We are not throwing exceptions when operating on empty queues.
  • We are not providing empty() method to the caller of our library.
  • We are manually locking/unlocking which is prone to error.
  • We should use a conditional variable.

Improving Concurrent Queue

Capacity Management

We cannot leave the queue to keep increasing infinitely, for that we must bound it with a fixed capacity. We can take this while initialization via the constructor.

Now a point to note is that if we are defining the constructor, then we must avoid the auto generation of the other constructors like the copy & move constructors. Since std::mutex is non-copyable and non-movable, if the compiler tries to auto generate a copy constructor or assignment operator, the code becomes invalid and unsafe.

1
std::size_t _capacity; // we will use to check the _queue.siz() with _capacity

And we will initialize this _capacity via the constructor and also assign a default value to it.

1
2
3
4
5
6
explicit concurrent_queue(std::size_t capacity = 100)
: _capacity(capacity), _closed(false)
{
if (_capacity == 0)
throw std::invalid_argument("capacity must be positive");
}

Also we explicitly have to avoid the copy and move constructors:

1
2
3
4
5
// avoid auto generation of these constructors via the compiler
concurrent_queue(const concurrent_queue&) = delete;
concurrent_queue& operator=(const concurrent_queue&) = delete;
concurrent_queue(concurrent_queue&&) = delete;
concurrent_queue& operator=(concurrent_queue&&) = delete;

Automatic Lock Management

You may have noticed the manual use of mutex lock() & unlock(), it is verbose and prone to error if we miss that it can lead to deadlock crashing the program. We can use the RAII (Resource Acquisition is Initialization) based lock objects that lock a mutex when they are created and automatically unlock it when they go out of scope.

std::lock_guard<std::mutex> is the simplest automatic lock-management tool in C++11.

1
2
3
4
5
void push(const T& value) {
_mutx.lock();
_queue.push(value);
_mutx.unlock();
}

We can refactor the cove above using:

1
2
3
4
void push(const T& value) {
std::lock_guard<std::mutex> lock(_mutx);
_queue.push(value);
}

Conditional Variables

A condition variable is a synchronization mechanism that lets one thread sleep and wait until another thread signals that something has happened. Currently when we are not using the conditional variable we are abruptly returning a boolean value to the pop method.

The C++ conditional variable provides us three methods:

  • wait() which signals a thread to wait.
  • notify_one() wakes up one sleeping thread.
  • notify_all() this call will wake up all sleeping threads.

In our case we need two conditional variables where they can be used for signaling the producers (the one pushing into the queue) and the other will be used for signaling the consumer (one who is consuming the queue data).

1
2
3
4
void push(const T& value) {
std::lock_guard<std::mutex> lock(_mutx);
_queue.push(value);
}

Let’s see how using the two conditional variables will change the push method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void push(const T& value) {
std::unique_lock<std::mutex> lock(_mutx);

// Wait for Space
// Blocks the producer thread if the queue is full AND not closed.
// The condition variable releases the lock while waiting and re-acquires it
// before the thread wakes up and re-evaluates the predicate.
_producer_cv.wait(lock, [this] {
return _queue.size() < _capacity;
});

_queue.push(value);

// Notify Consumers
// Release the lock immediately after modifying the state and before notifying
// for a slight performance gain, allowing the waiting consumer to acquire the lock sooner.
lock.unlock();
_consumer_cv.notify_one();
}

Throwing Exceptions

We should throw runtime exceptions for cases like pushing to the queue which is full or trying to pop from the empty queue. So we can create the runtime exceptions which we can throw:

1
2
3
4
5
6
7
8
9
10
11
class queue_empty_exp : public std::runtime_error {
public:
explicit queue_empty_exp(const std::string& err)
: std::runtime_error(err) {}
};

class queue_full_exp : public std::runtime_error {
public:
explicit queue_full_exp(const std::string& err)
: std::runtime_error(err) {}
};

Graceful Shutdown

Suppose we want to terminate the program, we cannot simply quit the terminal. We have to make sure that there is no data corruption and not threads are actively waiting/starving or in deadlock. Graceful shutdown is the ability of the program to stop all ongoing work and terminate all threads in a controlled manner without data loss, deadlocks, or abrupt crashes.

We have two scenarios to cover:

  • If the system needs to shut down while the queue is full, the producer thread is stuck indefinitely on _producer_cv.wait().
  • If the producers have stopped, the queue is empty, and the system needs to shut down, the consumer thread is stuck indefinitely on _consumer_cv.wait()

We can do this by adding a boolean flag say _close so that we can carry out graceful shutdown.

Complete Implementation

Here’s the complete code with all the required improvements.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdexcept>

class queue_empty_exp : public std::runtime_error {
public:
explicit queue_empty_exp(const std::string& err)
: std::runtime_error(err) {}
};

class queue_full_exp : public std::runtime_error {
public:
explicit queue_full_exp(const std::string& err)
: std::runtime_error(err) {}
};

template<typename T>
class concurrent_queue {
public:
explicit concurrent_queue(std::size_t capacity = 100)
: _capacity(capacity), _closed(false)
{
if (_capacity == 0)
throw std::invalid_argument("capacity must be positive");
}

concurrent_queue(const concurrent_queue&) = delete;
concurrent_queue& operator=(const concurrent_queue&) = delete;
concurrent_queue(concurrent_queue&&) = delete;
concurrent_queue& operator=(concurrent_queue&&) = delete;

~concurrent_queue() {
close();
}

void close() {
{
std::lock_guard<std::mutex> lock(_mutx);
_closed = true;
}
// wake everyone
_producer_cv.notify_all();
_consumer_cv.notify_all();
}

void push(const T& value) {
std::unique_lock<std::mutex> lock(_mutx);

_producer_cv.wait(lock, [this] {
return _queue.size() < _capacity || _closed;
});
if (_closed)
throw queue_full_exp("push on closed queue");
_queue.push(value);

// notify one consumer
lock.unlock();
_consumer_cv.notify_one();
}

T pop() {
std::lock_guard<std::mutex> lock(_mutx);
if (_queue.empty())
throw queue_empty_exp("pop from empty queue");
T val = std::move(_queue.front());
_queue.pop();
_producer_cv.notify_one();
return val;
}

std::size_t size() const {
std::lock_guard<std::mutex> lock(_mutx);
return _queue.size();
}

bool empty() const {
std::lock_guard<std::mutex> lock(_mutx);
return _queue.empty();
}

bool closed() const {
std::lock_guard<std::mutex> lock(_mutx);
return _closed;
}

private:
std::queue<T> _queue;
mutable std::mutex _mutx;
std::condition_variable _producer_cv; // waits when queue is full
std::condition_variable _consumer_cv; // waits when queue is empty
std::size_t _capacity;
bool _closed;
};

Closing Notes

I was reading about thread pools and to build that we need a concurrent queue.

When I started with Concurrent Queue, I just added the simple implementation but after doing some R&D about it I got to know that I knew very little about both C++ and the concepts of concurrency. Now that the implementation of the concurrent queue is complete I can move on to the thread pool implementation and let’s see what all new things I can learn there.

References