Pull to refresh

Использование Boost.Asio с Coroutines TS

Reading time7 min
Views17K

Введение


Использование функций обратного вызова (callback) — популярный подход к построению сетевых приложений с использованием библиотеки Boost.Asio (и не только ее). Проблемой этого подхода является ухудшение читабельности и поддерживаемости кода при усложнении логики протокола обмена данными [1].


Как альтернатива коллбекам, сопрограммы (coroutines) можно применить для написания асинхронного кода, уровень читабельности которого будет близок к читабельности синхронного кода. Boost.Asio поддерживает такой подход, предоставляя возможность использования библиотеки Boost.Coroutine для обработки коллбеков.


Boost.Coroutine реализует сопрограммы с помощью сохранения контекста выполнения текущего потока. Этот подход конкурировал за включение в следующую редакцию стандарта C++ с предложением от Microsoft, которое вводит новые ключевые слова co_return, co_yield и co_await. Предложение Microsoft получило статус Technical Specification (TS) [2] и имеет высокие шансы стать стандартом.


Статья [3] демонстрирует использование Boost.Asio с Coroutines TS и boost::future. В своей статье я хочу показать, как можно обойтись без boost::future. Мы возьмем за основу пример асинхронного TCP эхо-сервера из Boost.Asio и будем его модифицировать, используя сопрограммы из Coroutines TS.



На момент написания статьи Coroutines TS реализована в компиляторах Visual C++ 2017 и clang 5.0. Мы будем использовать clang. Необходимо установить флаги компилятора для включения экспериментальной поддержки стандарта C++ 20 (-std=c++2a) и Coroutines TS (-fcoroutines-ts). Также нужно включить заголовочный файл <experimental/coroutine>.



Сопрограмма для чтения из сокета



В оригинальном примере функция для чтения из сокета выглядит так:


void do_read() {
    auto self(shared_from_this());
    socket_.async_read_some(
        boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
                do_write(length);
            }
        });
}

Мы инициируем асинхронное чтение из сокета и задаем коллбек, который будет вызван при получении данных и инициирует их отсылку обратно. Функция записи в оригинале выглядит так:


void do_write(std::size_t length) {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
            if (!ec) {
                do_read();
            }
        });
}

При успешной записи данных в сокет мы снова инициируем асинхронное чтение. По сути, логика программы сводится к циклу (псевдокод):



while (!ec)
{
	ec = read(buffer);
	if (!ec)
	{
		ec = write(buffer);
	}
}

Было бы удобно закодировать это в виде явного цикла, однако в таком случае нам пришлось бы сделать чтение и запись синхронными операциями. Нам это не подходит, поскольку мы хотим обслуживать несколько клиентских сессий в одном потоке выполнения одновременно. На помощь приходят сопрограммы. Перепишем функцию do_read() в следующем виде:



void do_read() {
    auto self(shared_from_this());
    const auto[ec, length] = co_await async_read_some(
        socket_, boost::asio::buffer(data_, max_length));

    if (!ec) {
        do_write(length);
    }
}

Использование ключевого слова co_await (а также co_yield и co_return) превращает функцию в сопрограмму. Такая функция имеет несколько точек (suspension point), где ее выполнение приостанавливается (suspend) с сохранением состояния (значений локальных переменных). Позже выполнение сопрограммы может быть возобновлено (resume), начиная с последней остановки. Ключевое слово co_await в нашей функции создает suspension point: после того, как асинхронное чтение инициировано, выполнение сопрограммы do_read() будет приостановлено до завершения чтения. Возврата из функции при этом не происходит, но выполнение программы продолжается, начиная с точки вызова сопрограммы. Когда клиент подключается, вызывается session::start(), где do_read() вызывается первый раз для этой сессии. После начала асинхронного чтения продолжается выполнение функции start(), происходит возврат из нее и инициируется прием следующего соединения. Далее продолжает выполнение код из Asio, который вызвал обработчик-аргумент async_accept().


Для того, чтобы магия co_await работала, его выражение — в нашем случае функция async_read_some() — должен возвращать объект класса, который соответствует определенному контракту. Реализация async_read_some() взята из комментария к статье [3].



template <typename SyncReadStream, typename DynamicBuffer>
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            s.async_read_some(std::move(buffers),
                              [this, coro](auto ec, auto sz) mutable {
                                  this->ec = ec;
                                  this->sz = sz;
                                  coro.resume();
                              });
        }
        auto await_resume() { return std::make_pair(ec, sz); }
    };
    return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

async_read_some() возвращает объект класса Awaiter, который реализует контракт, требуемый co_await:


  • await_ready() вызывается в начале ожидания для проверки, готов ли уже результат асинхронной операции. Поскольку для получения результата нам всегда нужно дождаться, пока данные будут прочитаны, мы возвращаем false.
  • await_suspend() вызывается перед тем, как вызывающая сопрограмма будет приостановлена. Здесь мы инициируем асинхронное чтение и передаем обработчик, который сохранит результаты выполнения асинхронной операции в переменных-членах класса Awaiter и возобновит сопрограмму.
  • await_resume() — возвращаемое значение этой функции будет результатом выполнения co_await. Просто возвращаем сохраненные ранее результаты выполнения асинхронной операции.

Если теперь попытаться собрать нашу программу, то получим ошибку компиляции:



error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits<void, session &>' has no member named 'promise_type'
    void do_read() {
         ^

Причина в том, что компилятор требует, чтобы для сопрограммы тоже был реализован некоторый контракт. Это делается с помощью специализации шаблона std::experimental::coroutine_traits:



template <typename... Args>
struct std::experimental::coroutine_traits<void, Args...> {
    struct promise_type {
        void get_return_object() {}
        std::experimental::suspend_never initial_suspend() { return {}; }
        std::experimental::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

Мы специализировали coroutine_traits для сопрограмм с возращаемым значением типа void и любым количеством и типами параметров. Сопрограмма do_read() подходит под это описание. Специализация шаблона содержит тип promise_type с следующими функциями:


  • get_return_object() вызывается для создания объекта, который сопрограмма будет впоследствии заполнять и возвращать. В нашем случае ничего создавать не нужно, так как do_read() ничего не возвращает.
  • initial_suspend() определяет, будет ли сопрограмма приостановлена перед первым вызовом. Аналогия — запуск приостановленного потока в Windows. Нам нужно, чтобы do_read() выполнялась без начальной остановки, поэтому возвращаем suspend_never.
  • final_suspend() определяет, будет ли сопрограмма приостановлена перед возвратом значения и завершением. Возвращаем suspend_never.
  • return_void() указывает компилятору, что сопрограмма ничего не возвращает.
  • unhandled_exception() вызывается, если внутри сопрограммы было сгенерировано исключение, и оно не было обработано внутри сопрограммы. В этом случае аварийно завершаем программу.

Теперь можно запустить сервер и проверить его работоспособность, открыв несколько подключений с помощью telnet.



Сопрограмма для записи в сокет


Функция записи do_write() все еще основывается на использовании коллбека. Исправим это. Перепишем do_write() в следующем виде:



auto do_write(std::size_t length) {
    auto self(shared_from_this());
    struct Awaiter {
        std::shared_ptr<session> ssn;
        std::size_t length;
        std::error_code ec;

        bool await_ready() { return false; }
        auto await_resume() { return ec; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            const auto[ec, sz] = co_await async_write(
                ssn->socket_, boost::asio::buffer(ssn->data_, length));
            this->ec = ec;
            coro.resume();
        }
    };
    return Awaiter{self, length};
}

Напишем awaitable-обертку для записи в сокет:



template <typename SyncReadStream, typename DynamicBuffer>
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        auto await_resume() { return std::make_pair(ec, sz); }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            boost::asio::async_write(
                s, std::move(buffers), [this, coro](auto ec, auto sz) mutable {
                    this->ec = ec;
                    this->sz = sz;
                    coro.resume();
                });
        }
    };
    return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

Последний шаг — перепишем do_read() в виде явного цикла:



void do_read() {
    auto self(shared_from_this());
    while (true) {
        const auto[ec, sz] = co_await async_read_some(
            socket_, boost::asio::buffer(data_, max_length));
        if (!ec) {
            auto ec = co_await do_write(sz);
            if (ec) {
                std::cout << "Error writing to socket: " << ec << std::endl;
                break;
            }
        } else {
            std::cout << "Error reading from socket: " << ec << std::endl;
            break;
        }
    }
}

Логика программы теперь записана в виде, близком к синхронному коду, однако она выполняется асинхронно. Ложкой дёгтя является то, что нам пришлось написать дополнительный awaitable-класс для возвращаемого значения do_write(). Это иллюстрирует один из недостатков Coroutines TS — распространение co_await вверх по стеку асинхронных вызовов [4].


Переделку функции server::do_accept() в сопрограмму оставим в качестве упражнения. Полный текст программы можно найти на GitHub.



Заключение


Мы рассмотрели использование Boost.Asio с Coroutines TS для программирования асинхронных сетевых приложений. Преимущество такого подхода — улучшение читабельности кода, поскольку он становится близок по форме к синхронному. Недостаток — необходимость в написании дополнительных оберток для поддержки модели сопрограмм, реализованной в Coroutines TS.



Ссылки


  1. Асинхронность: назад в будущее
  2. Working Draft, Technical Specification for C++ Extensions for Coroutines
  3. Using C++ Coroutines with Boost C++ Libraries
  4. Возражения против принятия Coroutines с await в C++17
Tags:
Hubs:
+20
Comments16

Articles

Change theme settings