Комментарии 17
Для того, чтобы прерывать блокирующие IO-операции, необходимо реализовать соответствующий cancellation_handler. Для POSIX'а можно использовать такую схему — вместо того, чтобы ждать данных в read
, места во write
, подключения в accept
и т.д. — можно всё ожидание делать в вызове poll
. Тогда для отмены блокирующиего IO достаточно использовать прерываемую версию poll
, которая уже есть в rethread: https://github.com/bo-on-software/rethread/blob/master/rethread/poll.hpp
Хотелось бы добавить немного подробностей относительно pthread_cancel. Безусловно, использование данного механизма может приводить к проблемам с освобождением мьютексов и ресурсов. Однако, стоило также упомянуть про механизм pthread_cleanup_push() / pthread_cleanup_pop (), позволяющий устанавливать функции-обработчики для выполнения процедуры корректного освобождения ресурсов при завершении работы потока. Я сталкивался с тем, что отмена потока при помощи pthread_cancel () приводит к тому, что мьютекс остаётся не освобождённым. И добавление простейшего обработчика, который мьютекс освобождает, решило проблему. Безусловно, написание такого рода обработчиков достаточно трудоёмко. Тем не менее, упомянуть о таком варианте, на мой взгляд, стоило.
Также хотелось бы уточнить, что означает фраза «Нельзя применить для прерывания отдельных функций или задач»? Меня она поставила в тупик)
За статью большое спасибо, мне очень понравилось)
Добрый день.
pthread_cleanup_push/pthread_cleanup_pop я упоминать не стал, потому что плюсах им есть отличная альтернатива: RAII.
Проблема с обработчиками прерывания в Си в том, что их не все пишут. Из-за этого минусы использования pthread_cancel могут проявиться не сразу, а с задержкой.
Реальная история: в проекте появляется ещё одна 3-party библиотека, написанная на Си (обёртка над специфичным железом). Без исходного кода и связи с разработчиками библиотеки. Через несколько месяцев обнаруживаются странные дэдлоки, которых быть не должно. После нескольких дней дебага выясняется, что если вызвать pthread_cancel для потока в тот момент, когда этот поток находится внутри этой библиотеки — то он просто тихо умирает, даже не разматывая свой стек.
Насчёт отдельных задач — я имел в виду отмену долгой задачи без прерывания всего потока. Например, в приложении-навигаторе может быть отдельный поток для того, чтобы рассчитывать маршрут. Рассчёт маршрута из точки А в точку Б — это отдельная задача. Если пользователь изменил начальную и конечную точки — расчёт можно отменить, но прерывать весь поток смысла нет.
Извиняюсь за орфографию.
Относительно RAII согласен, механизм прекрасный. Недавно открыл его для себя. Я имею в виде не теоритически, но на практике.
Про отдельные задачи теперь понял, спасибо) В моём восприятии всё упёрлось в то, что отдельная задача выполняется целиком отдельным потоком, который целиком останавливается.
Так что мешает сделать что-нибудь в стиле
struct cancel_aware_thread {
template <class Function, class... Args>
cancel_aware_thread(std::function<void(std::thread&)> && cancelFunction, Function && threadFunc, Args&& ... args) {
_impl(threadFunc, args);
cancellation = cancelFunction;
}
~cancel_aware_thread() {
cancellation();
}
private:
std::thread _impl;
std::function<void(std::thread &)> cancellation;
}
И пусть себе программист думает, как именно его поток должен завершаться. В частности, в первом коде выше будет
void not_so_dangerous_thread()
{
cancel_aware_thread t([] (std::thread & t) {t.join();}, [] { do_something(); });
do_another_thing(); // may throw - will not cause a termination
}
//пардон за ошибки синтаксиса, if any
P.S. explicit operator bool() — зачем?
Проблема в том, что сам по себе join() ситуацию не улучшит (это обсуждается в разделе "Серьёзная проблема").
Больше всего информации о том, как прервать какую-нибудь блокирующую функцию есть у автора этой функции. В месте создания потока разработчик может и не знать о том, в каких именно вызовах этот поток может уснуть, и как его там прервать. Вот если поток ожидает на многопоточной очереди — как его правильно прервать? Если положить в очередь специальный объект, то его может вытащить другой поток, а не тот, который мы пытались разбудить.
Explicit оператору приведения к булеану нужен для того, чтобы не компилировался следующий код:
standalone_cancellation_token token;
int i = token + 5;
Ну, тогда нужно сделать класс cancelable_function<...>, который будет знать, что выполнять, и как это остановить. Тогда мой cancel_aware_thread будет принимать её в конструктор и запоминать, как её остановить.
В Вашем коде, кажется, токен отмены пытается решить все проблемы — отмена ожиданий (системных и плюсовых), выставление флага. Тут тоже надо знать, что безопасно для той или иной функции в том или ином контексте. В частности, если функция, которая запускается в потоке, не известна программисту, нужно будет от автора получить или cancellation_handler, который потом использовать при создании токена, или просто мою cancelable_function. Кажется, второе нативнее. Код будет каким-то таким:
cancel_aware_thread(get_cancelable_function());
И это решит сразу все проблемы — и исключения в конструкторах, и перед деструктором.
Архитектурно, токен (вернее, хендлер) — это разрыв логики работы функции и её прерывания, на мой взгляд.
Ну, тогда нужно сделать класс cancelable_function<...>, который будет знать, что выполнять, и как это остановить
Проблема в том, что все блокирующие функции, которые будут вызваны из cancellable_function, тоже должны быть представлены в виде cancellable_function, которые должны каким-то образом связываться с вызывающей cancellable_function, и т.д. В моём подходе аналог cancellable_function — это обычная функция, которая принимает ссылку на cancellation_token и передаёт её вызываемым блокирующим функциям. Мне кажется, что так гораздо лаконичнее.
Тут тоже надо знать, что безопасно для той или иной функции в том или ином контексте. В частности, если функция, которая запускается в потоке, не известна программисту, нужно будет от автора получить или cancellation_handler, который потом использовать при создании токена <...>
Я не вполне понял, что вы имеете в виду под "известна программисту", но от автора ничего получать не надо. Токены не требуют для своего создания cancellation_handler, токен создаётся и живёт вместе с потоком. Когда в этом потоке вызывается прерываемая функция — она использует токен для того, чтобы проверять статус прерывания. Если эта функция вызывает другую прерываемую функцию — она передаёт ей ссылку на этот токен.
Вот небольшой пример программы, которая обрабатывает задачи от пользователя в отдельном потоке:
void do_subtask(const subtask_data& data, const cancellation_token& token)
{
// ...
}
void do_task(const task_data& data, const cancellation_token& token)
{
for (int i = 0; i < Subtasks && token; ++i)
do_subtask(data.get_subtask(i), token);
}
void do_work(concurrent_queue<task_data>& tasks, const cancellation_token& token)
{
while(token)
{
auto data = tasks.pop(token);
if (data)
do_task(*data, token);
}
}
int main()
{
concurrent_queue<task_data> tasks;
rethread::thread t{ [&tasks] (const cancellation_token& token) { do_work(tasks, token); } };
for (auto i = read_input(); i; i = read_input())
tasks.push(*i);
return 0;
}
То есть токен создаётся вместе с потоком, а потом передаётся в каждую блокирующую (или просто длительную) функцию.
Our preference would be to employ a task-based design for this (see Item 35), but let’s assume we’d like to set the priority of the thread doing the filtering. Item 35 explains that that requires use of the thread’s native handle, and that’s accessible only through the std::thread API; the task-based API (i.e., futures) doesn’t provide it. Our approach will therefore be based on threads, not tasks.
We could come up with code like this:
constexpr auto tenMillion = 10000000; // see Item 15
// for constexpr
bool doWork(std::function<bool(int)> filter, // returns whether
int maxVal = tenMillion) // computation was
{ // performed; see
// Item 2 for
// std::function
std::vector<int> goodVals; // values that
// satisfy filter
std::thread t([&filter, maxVal, &goodVals] // populate
{ // goodVals
for (auto i = 0; i <= maxVal; ++i)
{ if (filter(i)) goodVals.push_back(i); }
});
auto nh = t.native_handle(); // use t's native
// handle to set
// t's priority
...
if (conditionsAreSatisfied()) {
t.join(); // let t finish
performComputation(goodVals);
return true; // computation was
// performed
}
return false; // computation was
} // not performed
Before I explain why this code is problematic, I’ll remark that tenMillion’s initializing value can be made more readable in C++14 by taking advantage of C++14’s ability to use an apostrophe as a digit separator:
constexpr auto tenMillion = 10'000'000; // C++14
I’ll also remark that setting t’s priority after it has started running is a bit like closing the proverbial barn door after the equally proverbial horse has bolted. A better design would be to start t in a suspended state (thus making it possible to adjust its priority before it does any computation), but I don’t want to distract you with that code. If you’re more distracted by the code’s absence, turn to Item 39, because it shows how to start threads suspended.
But back to doWork. If conditionsAreSatisfied() returns true, all is well, but if it returns false or throws an exception, the std::thread object t will be joinable when its destructor is called at the end of doWork. That would cause program execution to be terminated.
You might wonder why the std::thread destructor behaves this way. It’s because the two other obvious options are arguably worse. They are:
- An implicit join. In this case, a std::thread’s destructor would wait for its underlying asynchronous thread of execution to complete. That sounds reasonable, but it could lead to performance anomalies that would be difficult to track down. For example, it would be counterintuitive that doWork would wait for its filter to be applied to all values if conditionsAreSatisfied() had already returned false.
- An implicit detach. In this case, a std::thread’s destructor would sever the connection between the std::thread object and its underlying thread of execution. The underlying thread would continue to run. This sounds no less reasonable than the join approach, but the debugging problems it can lead to are worse. In doWork, for example, goodVals is a local variable that is captured by reference. It’s also modified inside the lambda (via the call to push_back). Suppose, then, that while the lambda is running asynchronously, conditionsAreSatisfied() returns false. In that case, doWork would return, and its local variables (including goodVals) would be destroyed. Its stack frame would be popped, and execution of its thread would continue at doWork’s call site. Statements following that call site would, at some point, make additional function calls, and at least one such call would probably end up using some or all of the memory that had once been occupied by the doWork stack frame. Let’s call such a function f. While f was running, the lambda that doWork initiated would still be running asynchronously. That lambda could call push_back on the stack memory that used to be goodVals but that is now somewhere inside f’s stack frame. Such a call would modify the memory that used to be goodVals, and that means that from f’s perspective, the content of memory in its stack frame could spontaneously change! Imagine the fun you’d have debugging that.
The Standardization Committee decided that the consequences of destroying a joinable thread were sufficiently dire that they essentially banned it (by specifying that destruction of a joinable thread causes program termination).
This puts the onus on you to ensure that if you use a std::thread object, it’s made unjoinable on every path out of the scope in which it’s defined. But covering every path can be complicated. It includes flowing off the end of the scope as well as jumping out via a return, continue, break, goto or exception. That can be a lot of paths.
TLDR: альтернативы хуже.
В статье этот вопрос рассмотрен, в разделе "Серьёзная проблема". Кратко — так слишком просто получить дедлоки.
Ещё рекомендую посмотреть P0379R0 Why joining_thread from P0206 is a Bad Idea
Если кому-то любопытно — видео можно посмотреть здесь: meetingcpp.ru/?page_id=1050
Решаем проблемы с RAII у std::thread: cancellation_token как альтернатива pthread_cancel и boost::thread::interrupt