std::condition_variable (Live Hacking Multithreaded Queue)

Overview

#include <condition_variable>

Generic communication device

  • Communicate changes in shared state ⟶ notification/wakeup

  • Used together with a mutex

Operations

Operation

Description

wait

  • Atomically unlocks mutex, and puts calling thread to sleep until condition is signalled

  • When condition is signalled, atomically takes the mutex and wakes one (or all) waiters

signal (notify)

Signal the condition variable (i.e., notify waiter)

Sample communication scenarios

Overview: Operations

Operation

Description

Constructor

Default only (cannot move when someone’s waiting)

wait(lock, stop_waiting)

  • lock: unique lock guard

  • stop_waiting: callable (predicate) that returns bool; see below

wait_for(), wait_until()

Timeouts of various sorts

notify_one()

Notify (and wakeup) one waiting thread

notify_all()

Notify (and wakeup) all waiting threads

Note

Broadcasting (notify_all()) a condition variable wakes up all waiting threads. If only one can handle the event, then waking all is a waste of resources - in this case notify_one() should rather be used.

See Thundering herd problem

Communication, Polling: Thread-Safe Queue

  • Double-ended queue (std::deque)

  • Producer adds to tail

  • Consumer removes from head

  • Protected by a mutex

  • Corner cases ⟶ conditions

    • Queue is full

    • Queue is empty

    • ⟶ spin on either side if condition holds

#include <thread>
#include <mutex>
#include <deque>
#include <chrono>
#include <iostream>


template <typename T>
class ThreadSafeQueue
{
public:
    ThreadSafeQueue(unsigned maxelem) : _maxelem(maxelem) {}

    template<typename dur>
    void push(T elem, dur d)
    {
        while (true) {
            {
                std::scoped_lock guard(_lock);
                if (_queue.size() <= _maxelem) {
                    _queue.push_back(elem);
                    return;
                }
            }
            std::this_thread::sleep_for(d);
        }
    }

    template <typename dur>
    T pop(dur d)
    {
        while (true) {
            {
                std::scoped_lock guard(_lock);
                if (_queue.size() > 0) {
                    T elem = _queue.front();
                    _queue.pop_front();
                    return elem;
                }
            }
            std::this_thread::sleep_for(d);
        }            
    };

private:
    std::deque<T> _queue;
    unsigned _maxelem;
    std::mutex _lock;
};


using namespace std::chrono_literals;

int main()
{
    ThreadSafeQueue<int> queue(10);
    std::thread producer([&queue](){
        int i = 0;
        while (true) {
            queue.push(i++, 2ms);
            std::this_thread::sleep_for(1ms);
        }
    });
    std::thread consumer1([&queue](){
        while (true)
            std::cout << "1: " << queue.pop(1ms) << std::endl;
    });
    std::thread consumer2([&queue](){
        while (true)
            std::cout << "2: " << queue.pop(2ms) << std::endl;
    });

    producer.join();
    consumer1.join();
    consumer2.join();

    return 0;
}

Anti-Polling: Thread-Safe Queue, And POSIX Condition Variables

#include <thread>
#include <deque>
#include <iostream>
#include <pthread.h>


template <typename T>
class ThreadSafeQueue
{
public:
    ThreadSafeQueue(unsigned maxelem)
    : _maxelem(maxelem)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_not_empty, nullptr);
        pthread_cond_init(&_not_full, nullptr);
    }

    void push(T elem)
    {
        pthread_mutex_lock(&_lock);
        while (_queue.size() == _maxelem)
            pthread_cond_wait(&_not_full, &_lock);
        _queue.push_back(elem);
        pthread_mutex_unlock(&_lock);

        pthread_cond_signal(&_not_empty);
    }

    T pop()
    {
        pthread_mutex_lock(&_lock);
        while (_queue.size() == 0)
            pthread_cond_wait(&_not_empty, &_lock);
        T elem = _queue.front();
        _queue.pop_front();
        pthread_mutex_unlock(&_lock);

        pthread_cond_signal(&_not_full);
        return elem;
    };

private:
    std::deque<T> _queue;
    unsigned _maxelem;

    pthread_mutex_t _lock;
    pthread_cond_t _not_empty;
    pthread_cond_t _not_full;
};


using namespace std::chrono_literals;

int main()
{
    ThreadSafeQueue<int> queue(10);
    std::thread producer([&queue](){
        int i = 0;
        while (true) {
            queue.push(i++);
            std::this_thread::sleep_for(1ms);
        }
    });
    std::thread consumer1([&queue](){
        while (true)
            std::cout << "1: " << queue.pop() << std::endl;
    });
    std::thread consumer2([&queue](){
        while (true)
            std::cout << "2: " << queue.pop() << std::endl;
    });

    producer.join();
    consumer1.join();
    consumer2.join();

    return 0;
}

Discussion: Signalling And Waiting, Predicates: Separation Of Concerns

  • Waiting - and atomically releasing a mutex - is one thing

    • Necessary to implement any kind of parallel handshake

    • ⟶ OS building block

  • The condition itself is part of the concrete problem

    Here:

    • Full: _queue.size() == _maxelem ⟶ wait until not empty

    • Empty: _queue.size() == 0 ⟶ wait until not full

    predicate

  • Signalling wakes a waiter (possibly immediately, depending on realtime attributes, and whatnot)

    • Signalling while holding the mutex ⟶ waiter has to go to sleep waiting for the mutex again

    • ⟶ best done when not holding the mutex

  • Mutex: scoped locking would be great also

That being said …

Thread-Safe Queue, And C++ Condition Variables

Wish list

  • Scoped locking: std::unique_lock (see here)

  • Predicate: parameter (usually a lambda) to condvar.wait()

#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <chrono>
#include <iostream>


template <typename T>
class ThreadSafeQueue
{
public:
    ThreadSafeQueue(unsigned maxelem)
    : _maxelem(maxelem)
    {}

    void push(T elem)
    {
        {
            std::unique_lock<std::mutex> guard(_lock);
            _not_full.wait(guard, [this](){return _queue.size() < _maxelem;});
            _queue.push_back(elem);
        } // scoped locking: unlock before signal
        _not_empty.notify_one();
    }

    T pop()
    {
        T elem;
        {
            std::unique_lock<std::mutex> guard(_lock);
            _not_empty.wait(guard, [this](){return _queue.size() > 0;});
            elem = _queue.front();
            _queue.pop_front();
        }
        _not_full.notify_one();

        return elem;
    };

private:
    std::deque<T> _queue;
    unsigned _maxelem;

    std::mutex _lock;
    std::condition_variable _not_empty;
    std::condition_variable _not_full;
};


using namespace std::chrono_literals;

int main()
{
    ThreadSafeQueue<int> queue(10);
    std::thread producer([&queue](){
        int i = 0;
        while (true) {
            queue.push(i++);
            std::this_thread::sleep_for(1ms);
        }
    });
    std::thread consumer1([&queue](){
        while (true)
            std::cout << "1: " << queue.pop() << std::endl;
    });
    std::thread consumer2([&queue](){
        while (true)
            std::cout << "2: " << queue.pop() << std::endl;
    });

    producer.join();
    consumer1.join();
    consumer2.join();

    return 0;
}