Pull to refresh

Потоки, блокировки и условные переменные в C++11 [Часть 2]

Reading time 7 min
Views 159K
Для более полного понимания этой статьи, рекомендуется прочитать ее первую часть, где основное внимание было уделено потокам и блокировкам, в ней объяснено много моментов (терминов, функций и т.д.), которые без пояснения будут использованы здесь.
В данной статье будут рассмотрены условные переменные…

Условные переменные


Помимо описанных ранее способов синхронизации, C++11 предоставляет поддержку условных переменных, которые позволяют блокировать один или более потоков, пока либо не будет получено уведомление от другого потока, либо не произойдет мифическое spurious wakeup («ложное/случайное пробуждение»).
Есть две реализации условных переменных, доступных в заголовке <condition_variable>:
  • condition_variable: требует от любого потока перед ожиданием сначала выполнить std::unique_lock
  • condition_variable_any: более общая реализация, которая работает с любым типом, который можно заблокировать. Эта реализация может быть более дорогим (с точки зрения ресурсов и производительности) для использования, поэтому ее следует использовать только если необходима те дополнительные возможности, которые она обеспечивает

Опишу, как работают условные переменные:
  • Должен быть хотя бы один поток, ожидающий, пока какое-то условие станет истинным. Ожидающий поток должен сначала выполнить unique_lock. Эта блокировка передается методу wait(), который освобождает мьютекс и приостанавливает поток, пока не будет получен сигнал от условной переменной. Когда это произойдет, поток пробудится и снова выполнится lock.
  • Должен быть хотя бы один поток, сигнализирующий о том, что условие стало истинным. Сигнал может быть послан с помощью notify_one(), при этом будет разблокирован один (любой) поток из ожидающих, или notify_all(), что разблокирует все ожидающие потоки.
  • В виду некоторых сложностей при создании пробуждающего условия, которое может быть предсказуемых в многопроцессорных системах, могут происходить ложные пробуждения (spurious wakeup). Это означает, что поток может быть пробужден, даже если никто не сигнализировал условной переменной. Поэтому необходимо еще проверять, верно ли условие пробуждение уже после то, как поток был пробужден. Т.к. ложные пробуждения могут происходить многократно, такую проверку необходимо организовывать в цикле.

Код ниже демонстрирует пример использования условной переменной, для синхронизации потоков: во время работы некоторых потоков (назовем их «рабочими») могут произойти ошибку, при этом они помещаются в очередь. Поток «регистратора» обрабатывает эти ошибки (получая их из очереди) и печатает их. «Рабочие» сигнализируют «регистратору», когда происходит ошибка. Регистратор ожидает сигнала условной переменной. Чтобы избежать ложных пробуждений, ожидание происходит в цикле, где проверяется булевское условие.
#include <condition_variable>
#include <iostream>
#include <random>
#include <thread>
#include <mutex>
#include <queue>

std::mutex              g_lockprint;
std::mutex              g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int>         g_codes;
bool                    g_done;
bool                    g_notified;

void workerFunc(int id, std::mt19937 &generator)
{
     // стартовое сообщение
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "[worker " << id << "]\trunning..." << std::endl;
     }
     // симуляция работы
     std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
     // симуляция ошибки
     int errorcode = id*100+1;
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout  << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
     }
     // сообщаем об ошибке
     {
          std::unique_lock<std::mutex> locker(g_lockqueue);
          g_codes.push(errorcode);
          g_notified = true;
          g_queuecheck.notify_one();
      }
}

void loggerFunc()
{
     // стартовое сообщение
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "[logger]\trunning..." << std::endl;
     }
     // до тех пор, пока не будет получен сигнал
     while(!g_done)
     {
          std::unique_lock<std::mutex> locker(g_lockqueue);
          while(!g_notified) // от ложных пробуждений
               g_queuecheck.wait(locker);
          // если есть ошибки в очереди, обрабатывать их
          while(!g_codes.empty())
          {
               std::unique_lock<std::mutex> locker(g_lockprint);
               std::cout << "[logger]\tprocessing error:  " << g_codes.front()  << std::endl;
               g_codes.pop();
          }
          g_notified = false;
     }
}

int main()
{
     // инициализация генератора псевдо-случайных чисел
     std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
     // запуск регистратора
     std::thread loggerThread(loggerFunc);
     // запуск рабочих
     std::vector<std::thread> threads;
     for(int i = 0; i < 5; ++i)
          threads.push_back(std::thread(workerFunc, i+1, std::ref(generator)));
     for(auto &t: threads)
          t.join();
     // сообщаем регистратору о завершении и ожидаем его
     g_done = true;
     loggerthread.join();
     return 0;
}

Выполнение этого кода даст примерно следующий результат (результат каждый раз будет разным, т.к. рабочие потоки работают (точнее спят) случайные интервалы времени):
[logger]        running...
[worker 1]      running...
[worker 2]      running...
[worker 3]      running...
[worker 4]      running...
[worker 5]      running...
[worker 1]      an error occurred: 101
[worker 2]      an error occurred: 201
[logger]        processing error:  101
[logger]        processing error:  201
[worker 5]      an error occurred: 501
[logger]        processing error:  501
[worker 3]      an error occurred: 301
[worker 4]      an error occurred: 401
[logger]        processing error:  301
[logger]        processing error:  401

У метода wait, обозначенного выше, есть две перегрузки:
  • та, что использует только unique_lock; он (метод) блокирует поток и добавляет его в очередь потоков, ожидающих сигнала от этой условной переменной; поток пробуждается, когда будет получен сигнал от условной переменной или в случае ложного пробуждения.
  • та, что в дополнении к unique_lock, принимает предикат, используемый в цикле до тех пор, пока он не вернет false; эта перегрузка может использоваться, чтобы избежать ложных пробуждений. В общем случае это эквивалентно такому циклу:
    while(!predicate()) 
         wait(lock);
    

Таким образом, используя вторую перегрузку, можно избежать использования булевского флага g_notified в примере выше:
void workerFunc(int id, std::mt19937 &generator)
{
     // стартовое сообщение
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "[worker " << id << "]\trunning..." << std::endl;
     }
     // симуляция работы
     std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
     // симуляция ошибки
     int errorcode = id*100+1;
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
     }
     // сообщаем об ошибке
     {
          std::unique_lock<std::mutex> locker(g_lockqueue);
          g_codes.push(errorcode);
          g_queuecheck.notify_one();
     }
}

void loggerFunc()
{
     // стартовое сообщение
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "[logger]\trunning..." << std::endl;
     }
     // до тех пор, пока не будет получен сигнал
     while(!g_done)
     {
          std::unique_lock<std::mutex> locker(g_lockqueue);
          g_queuecheck.wait(locker, [&](){return !g_codes.empty();});
          // если есть ошибки в очереди, обрабатывать их
          while(!g_codes.empty())
          {
               std::unique_lock<std::mutex> locker(g_lockprint);
               std::cout << "[logger]\tprocessing error:  " << g_codes.front() << std::endl;
               g_codes.pop();
          }
     }
}

В дополнении к перегруженному методу wait(), есть еще два похожих метода с такой же перегрузкой для предиката:
  • wait_for: блокирует поток до тех пор, пока не будет получен сигнал условной переменной
  • wait_until: блокирует поток до тех пор, пока не будет получен сигнал условной переменной или не будет достигнут определенный момент времени

Перегрузка этих методов без предиката возвращает cv_status, показывающий, произошел ли таймаут, или пробуждение произошло из-за сигнала условной переменной, или это ложное пробуждение.

Std также предоставляет функцию notify_all_at_thread_exit, которая реализует механизм уведомления других потоков о том, что данных поток завершил свою работу, включая уничтожение всех объектов thread_local. Ожидание потоков механизмом, отличным от join, может привести к неправильному поведению, когда thread_locals уже были использованы, а их деструкторы могли вызываться после того, как поток был пробужден или после того, как уже завершился (см. N3070 и N2880. Как правило, вызов этой функции должен произойти непосредственно до того, как поток начнет свое существование. Ниже приведен пример, как notify_all_at_thread_exit может использоваться с условными переменными для синхронизации двух потоков:
std::mutex              g_lockprint;
std::mutex              g_lock;
std::condition_variable g_signal;
bool                    g_done;

void workerFunc(std::mt19937 &generator)
{
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "worker running..." << std::endl;
     }
     std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "worker finished..." << std::endl;
     }
     std::unique_lock<std::mutex> lock(g_lock);
     g_done = true;
     std::notify_all_at_thread_exit(g_signal, std::move(lock));
}

int main()
{
     std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
     std::cout << "main running..." << std::endl;
     std::thread worker(workerFunc, std::ref(generator));
     worker.detach();
     std::cout << "main crunching..." << std::endl;
     std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
     {
          std::unique_lock<std::mutex> locker(g_lockprint);
          std::cout << "main waiting for worker..." << std::endl;
     }
     std::unique_lock<std::mutex> lock(g_lock);
     while(!g_done) // против ложных пробуждений
          g_signal.wait(lock);
     std::cout << "main finished..." << std::endl;
     return 0;
}

Если worker заканчивает свою работу перед потоком main, то результат будет таким:
main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...

Если поток main заканчивает свою работу перед потоком worker, то результат будет таким:
main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...


В качестве заключения


Стандарт C++11 позволяет разработчикам C++ писать многопоточный код стандартным, платформонезависимым способом. Эта статья — всего лишь «пробежка» по потокам и механизмам синхронизации от std. Заголовок <thread> предоставляет класс с тем же именем (и много дополнительных функций), представляющий потоки. Заголовок <mutex> обеспечивает реализацию нескольких мьютексов и «оберток» для синхронизации доступа к потокам. Заголовок <condition_variable> предоставляет две реализации условных переменных, которые позволяют блокировать один или более потоков, до получение уведомления от другого потока или до ложного пробуждения. Для более подробной информации и понимания сути дела, конечно же, рекомендуется прочитать дополнительную литературу :)
Tags:
Hubs:
+54
Comments 8
Comments Comments 8

Articles