Pull to refresh

Каспийские монстры многопоточности

C++


Мне очень хочется показать, что в многопоточности С++ «очень быстро» не исключает «очень безопасно». А значит можно писать эффективные и стабильные программы со сколь угодно большим количеством потоков и при этом избегать траты кучи времени на отладку многопоточности. Если Вам интересно, как мне удаётся не выстрелить себе в ногу, и чем я за это плачу, добро пожаловать

Когда лет 7-8 назад пришлось писать всё более многопоточные программы, мой друг — Капитан Очевидность — обратил моё внимание на следующий факт: чем больше потоков, чем активнее они взаимодействуют, тем больше требуется объектов синхронизации и тем больше бессонных ночей я трачу на этапе тестирования. Ситуация усложняется тем, что многопоточные ошибки подобны красивым барышням: они всегда случайно появляются в поле зрения, но встретить какую-то конкретно ещё раз — гораздо труднее.

В общем, когда число потоков в программах стабильно ушло за 5-6, я понял, что с этим нужно что-то делать, и делать очень быстро, пока их не стало 10 и больше.
При этом, если в том же Win API есть печеньки вроде WaitForMultipleObjects, то при переходе в кросс-платформенную среду, у нас остаются только мьютексы, критические секции и небезусловная поддержка сигналов (а так и случилось, когда я перешёл на замечательный кроссплатформенный фреймворк U++).

Прошло несколько месяцев поиска решений, когда мой взгляд упал на описание замечательного языка программирования Эрланг. Он предлагал далеко не новую, но очень элегантную систему межпоточного взаимодействия, которая сильно повышает стабильность работы программы.

Если своими словами и вкратце, речь идёт о том, что каждая подзадача работает со своим «адресным пространством» (вернее, набором данных), то есть изолирована от других. И единственный способ взаимодействия подзадач — обмен асинхронными сообщениями. Это всё имеет под собой серьёзную теорию, массу умных названий и особенностей, но мой друг Капитан Очевидность очень не любит все эти скучные детали и просит меня перейти сразу к сути.

Мы представляем каждый наш поток в виде конвейера. Это значит, что поток крутится в некотором «бесконечном» цикле, который ожидает прихода нового сообщения. Как только сообщение обрабатывается, конвейерный поток либо обрабатывает следующее, либо засыпает в ожидании нового.

Давайте на секунду забудем, что входящие сообщения с аргументами передаются нам из многопоточной среды. Внутри конвейера мы имеем простейший однопоточный цикл обработки входящих потокобезопасных аргументов с использованием внутренних данных класса. Понятно, что для такой работы объекты синхронизации не нужны. Это обычный код, который и знать ничего не знает про многопоточность. Напоминаю, что наш класс — мизантроп, не поделился ни с кем своими данными, а значит на них не может повлиять ни один объект из другого потока (что, кстати, красиво ложится в требования инкапсуляции).

Теперь остаётся решить, как и в каком виде мы будем передавать эти самые асинхронные сообщения. На этом месте мой друг К.О. упорно замолчал, так что решение мне пришлось принимать самому. Довольно быстро стало понятно, что любое перечисление видов сообщений для каждого такого класса
enum {MESSAGE_....., MESSAGE_....., MESSAGE_.....}
будет выглядеть жестоко, а их обработка
switch (messageType)
{
case MESSAGE_.....: ....... break;
case MESSAGE_.....: ....... break;
case MESSAGE_.....: ....... break;
case MESSAGE_.....: ....... break;
}
будет кастовать законные лучи ненависти и поноса от благодарных потомков.

В общем, класть в очередь на обработку я стал сразу колбэки с аргументами. Так я избавился от ненужных перечислений и сделал код хорошо читаемым. Выглядит это следующим образом:
//объект, который превращён в отдельный конвейерный поток
class SomeJobThread : public CallbackThread
{
public: //thread SomeJobThread
	void DoSomeJob(String arg1, double arg2)
	{
		//обработка аргументов с учётом моих данных
		//в простейшем однопоточном стиле
	}
	
private:
	//мои данные
};

SomeJobThread someJob;
someJob.Request(&SomeJobThread::DoSomeJob, "OMG!", -1);


Такой код выглядит слегка необычно, но лишь поначалу. При этом он довольно выразителен, не перегружен объектами синхронизации, циклами и прочим.
По сути, я объединяю данные потока в единый класс, путём наследования делаю поток конвейерным и пишу в нём обработчики, состоящие из простого однопоточного кода. И всё!

Применение такого подхода слегка меняет подход к построению многопоточного приложения, но, поверьте, оно того стоит.

Здесь мой друг К.О., ласково прищурившись, замечает, что за потокобезопасность я плачу:
  • наличием цикла обработки сообщений
  • синхронизацией потоков с очередью
  • необходимостью копировать аргументы
  • памятью, которую съедает очередь потока


На самом же деле происходит следующее. Цикл обработки пользуется внутренним объектом синхронизации и «спит» всё время, пока на входе нет сообщений. Что не только очищает карму разработчика, но и вносит посильный вклад в борьбу за процессорные такты и экологию планеты. Вызов колбэка в моей реализации равен вызову виртуальной функции плюс вызов невиртуальной функции с аргументами, что не так много.

Синхронизация очереди с потоком делается простейшим образом через объект синхронизации, работа с которым абсолютно прозрачна для пользовательского кода. Лок — вытащить из очереди указатель — анлок. Не так уж много ресурсов это и отнимает. Плюс, можно воспользоваться неблокирующими очередями.

По поводу копирования аргументов. Копирование pod-типов — довольно быстрая операция, которая в общем-то незаметна в абсолютном большинстве применений. Когда речь идёт о более сложных аргументах, то имеет смысл воспользоваться разрушающим копированием. Например, строку можно передавать через разрушающую передачу указателя на внутренние данные (этот механизм активно используется в U++). Поэтому передача в виде аргумента списка или ассоциативного массива — очень дешёвая операция. По сути, передача сколь угодно сложных параметров в реальных программах сводится, как максимум, к копированию нескольких POD-переменных.

Ну и наконец, использование памяти. Каждый элемент очереди — небольшая структура, содержащая, кроме аргументов, указатель и виртуальную таблицу из двух указателей (всего == (1+1+2)*sizeof(void *) ), что совсем немного.

Ну и наконец, надо понимать, что любой подход не является панацеей на все случаи жизни. Например, основной поток высокопроизводительного веб-сервера — это задача из другой области. А вот практически любая многопоточная работа в десктопном приложении ложится на этот подход как влитая. В реальной жизни очередь потока из более чем тысячи-двух колбэков — это результат ошибки проектирования или ошибки при кодировании. Обе ловятся ограничением длины очереди и отладочным ассертом. А это значит, что каждая очередь занимает менее 64К памяти, что по нынешним меркам — величина почти незаметная.

Мало того, я обнаружил, что конвейерный подход привёл к заметному сокращению общего количества объектов синхронизации и, как следствие, уменьшению числа блокировок, а следовательно и росту производительности программ при повышении их стабильности.

Что тут ещё скажешь… очереди — это круто, не бойтесь очередей!


P.S. В статье я лишь описал принцип, обозначил подход. Есть много моментов, связанных с проектированием проектов с применением конвейерных потоков, обратной нотификацией, созданием пула конвейерных потоков для высоконагруженных приложений. Если статья будет интересна сообществу, эти моменты я раскрою подробнее.
Tags:с++многопоточностьочередиобъекты синхронизации
Hubs: C++
Total votes 88: ↑81 and ↓7 +74
Views6.8K

Comments 75

Only those users with full accounts are able to leave comments. Log in, please.