Как стать автором
Обновить
223.33
Рейтинг
OTUS
Цифровые навыки от ведущих экспертов

Используем черную магию для создания быстрого кольцевого буфера

OTUSПрограммированиеC
Перевод
Автор оригинала: lo.calho.st

Вчера я заглянул на страницу Википедии, посвященную кольцевому буферу (circular buffer), и был заинтригован предполагаемой техникой оптимизации, с которой до этого не был знаком:

Реализация кольцевого буфера может быть оптимизирована путем отображения нижележащего буфера в двух смежных областях виртуальной памяти. (Естественно, длина нижележащего буфера должна в таком случае равняться некоторому размеру кратному размеру страницы системы.) Чтение и запись в кольцевой буфер могут выполняться в этой реализации с большей эффективностью посредством прямого доступа к памяти; те обращения, которые выходят за пределы первой области виртуальной памяти, автоматически переходят в начало нижележащего буфера. Когда смещение чтения продвигается во вторую область виртуальной памяти, оба смещения - чтения и записи - уменьшаются на длину нижележащего буфера.

В рамках реализации кольцевого буфера нам необходимо обработать случай, когда сообщение попадает на «разрыв» в очереди и должно быть перенесено (wrap around). Очевидная реализация записи в кольцевой буфер может полагаться на побайтовую запись и выглядеть примерно так:

void put(queue_t *q, uint8_t *data, size_t size) {
    for(size_t i = 0; i < size; i++){
        q->buffer[(q->tail + i) % q->buffer_size] = data[i];
    }
    q->tail = (q->tail + size) % q->buffer_size;
}

Тот факт, что для индексации в массиве необходима операция деления, затрудняет (если не исключает) векторизацию этой функции и, следовательно, делает ее неоправданно медленной. Хотя есть и другие оптимизации, которые мы можем здесь применить, методика из Википедии, предложенная выше, превосходит аппаратно-независимые подходы в силу того факта, что блок управления памятью может справиться с львиной долей логики циклического переноса за нас. Я был так воодушевлен этой идеей, что не стал проводить никаких дополнительных исследований и реализовал ее, основываясь только на кратком описании, приведенном выше.

В следующих двух разделах рассматривается конструкция кольцевого буфера и логика работы виртуальной памяти соответственно. Если вы не нуждаетесь в освежении этой информации, можете смело пропустить их.

Кольцевой буфер

Кольцевой (циклический) буфер - это удобный подход к хранению потоковых данных, которые создаются в реальном времени и вскоре после этого потребляются. Он «закольцован» для того, чтобы новые данные могли постоянно перезанимать пространство, ранее занимаемое данными, которые с тех пор уже были использованы.

Кольцевой буфер обычно инкрементируется посредством сохранения двух указателей (или индексов): указателя чтения (я назову его head - голова) и указателя записи (или tail - хвост). Очевидно, мы записываем в буфер по tail, а читаем из head. Структура может быть определена следующим образом:

typedef struct {
    uint8_t *buffer;
    size_t   buffer_size;
    size_t   head;
    size_t   tail;
    size_t   bytes_avail;
} queue_t;

Учитывая это, мы можем написать простые методы чтения (get) и записи (put), используя побайтовый доступ:

bool put(queue_t *q, uint8_t *data, size_t size) {
    if(q->buffer_size - q->bytes_avail < size){
        return false;
    }
    for(size_t i = 0; i < size; i++){
        q->buffer[(q->tail + i) % q->buffer_size] = data[i];
    }
    q->tail = (q->tail + size) % q->buffer_size;
    q->bytes_avail += size;
    return true;
}

bool get(queue_t *q, uint8_t *data, size_t size) {
    if(q->bytes_avail < size){
        return false;
    }
    for(size_t i = 0; i < size; i++){
        data[i] = q->buffer[(q->head + i) % q->buffer_size];
    }
    q->head = (q->head + size) % q->buffer_size;
    q->bytes_avail -= size;
    return true;
}

Обратите внимание, что этот метод put идентичен приведенной ранее варианту, за исключением того, что мы теперь проверяем, что свободного места достаточно прежде чем пытаться писать. Я также намеренно пренебрег логикой синхронизации (которая была бы абсолютно необходима для любого реального применения кольцевого буфера).

Побайтовый тип доступа и модульная арифметика, встроенные в каждую итерацию цикла, делают этот код ужасающе медленным. Вместо этого мы можем реализовать каждую операцию чтения или записи через два вызова memcpy, где один обрабатывает часть сообщения в конце буфера, а другой обрабатывает часть в начале, если мы закольцевались.

static inline off_t min(off_t a, off_t b) {
    return a < b ? a : b;
}
bool put(queue_t *q, uint8_t *data, size_t size) {
    if(q->buffer_size - q->bytes_avail < size){
        return false;
    }
const size_t part1 = min(size, q->;buffer_size - q->;tail);
const size_t part2 = size - part1;

memcpy(q->;buffer + q->;tail, data,         part1);
memcpy(q->;buffer,           data + part1, part2);

q->;tail = (q->;tail + size) % q->;buffer_size;
q->;bytes_avail += size;
return true;

}
bool get(queue_t *q, uint8_t *data, size_t size) {
    if(q->bytes_avail < size){
        return false;
    }
const size_t part1 = min(size, q->;buffer_size - q->;head);
const size_t part2 = size - part1;

memcpy(data,         q->;buffer + q->;head, part1);
memcpy(data + part1, q->;buffer,           part2);

q->;head = (q->;head + size) % q->;buffer_size;
q->;bytes_avail -= size;
return true;

}

А вот пример использования этого кольцевого буфера:

int main() {
    queue_t queue;
queue.buffer      = malloc(128);
queue.buffer_size = 128;
queue.head        = 0;
queue.tail        = 0;
queue.bytes_avail = 0;

put(&amp;q, "hello ", 6);
put(&amp;q, "world\n", 7);

char s[13];
get(&amp;q, (uint8_t *) s, 13);

printf(s); // prints "hello world"

}

Наш код прост и отлично работает. Но почему бы не усложнить его немного?

Встречайте таблицу страниц

На заре персональных компьютеров компьютеры могли одновременно запускать только одну программу. Какую бы программу мы ни запустили, она будет иметь полный прямой доступ к физической памяти. Оказывается, если мы хотим запустить несколько программ вместе, они будут бороться за то, какие области этой памяти они хотят использовать. Решением этого конфликта является виртуальная память, благодаря которой каждая программа считает, что она контролирует всю память, но на самом деле операционная система решает, кто какую память получает.

Ключом к виртуальной памяти является таблица страниц, с помощью которой операционная система сообщает ЦП о сопоставлениях между виртуальными и физическими страницами. Страница - это непрерывная выровненная область памяти (обычно) фиксированного размера, скажем, около 4 КиБ. Когда программе требуется доступ к некоторой (виртуальной) памяти, ЦП определяет, какой странице она принадлежит, а затем использует таблицу страниц для индексации в физической памяти.

Таблица страниц для некоторой программы А может выглядеть следующим:

Номер виртуальной страницы

Номер физической страницы

0

null

1

25

2

12

3

null

4

56

B то же время, для программы B она могла бы выглядеть следующим:

Номер виртуальной страницы

Номер физической страницы

0

11

1

null

2

92

3

21

4

null

Ключевой момент заключается в том, что нет никакой закономерности или скрытого смысла в том, как физические страницы назначены виртуальным страницам. Они могут быть не упорядочены по отношению к виртуальным страницам, а некоторым виртуальным страницам вообще не требуется назначенная им физическая память. Обе наши программы могут использовать один и тот же номер виртуальной страницы для ссылки на разные физические страницы, и поэтому им не нужно беспокоиться о конфликте с памятью друг друга.

Подробности определения номеров страниц, выделения памяти и индексации в таблице страниц выходят за рамки этой статьи, но не являются необходимыми для понимания того, что будет изложено дальше.

Несколько обычных системных вызовов и один, о котором glibc не хотел бы, чтобы вы знали

Прежде чем приступить к использованию таблицы страниц для оптимизации нашего кольцевого буфера, давайте рассмотрим некоторые стандартные системные вызовы Linux.

Системный вызов

Описание

int getpagesize()

Возвращает количество байтов в странице памяти. (Технически не всегда системный вызов, но в любом случае где-то рядом)

int ftruncate(int fd, off_t length)

Устанавливает размер файла равным length, используя его файловый дескриптор fd.

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset)

Отображает файл, описанный fd, в памяти и возвращает указатель на новую область памяти. Можно заставить делать очень плохие штуки посредством конфигурационных флагов (flags).

А теперь менее распространенный системный вызов, который glibc (наш дружественный пользовательский интерфейс ядра) не предоставляет нам:

System Call

Системный вызов

Description

Описание

int memfd_create(const char *name, unsigned int flags)

Возвращает файловый дескриптор анонимного файла (anonymous file), который существует только в памяти.

Интересно! Поскольку glibc его не реализует, нам нужно будет написать небольшую обертку, если мы захотим его использовать.

int memfd_create(const char *name, unsigned int flags) {
    return syscall(__NR_memfd_create, name, flags);
}

Отлично, теперь мы сможем вызывать ее, как любую другую функцию. Это позволит нам выделить память и манипулировать ей как файлом. Поскольку она ведет себя как файл, мы сможем разместить ее в любом месте (внимание спойлер).

Жуткая черная магия взлома таблицы

Ну что ж, перейдем к делу. Давайте сделаем то, что сказано в Википедии, и сделаем так, чтобы две соседние страницы указывали на одну и ту же память. Нам нужно будет изменить структуру нашей очереди и расширить ее API удобным методом инициализации.

typedef struct {
    uint8_t *buffer;
    size_t   buffer_size;
    int      fd;
    size_t   head;
    size_t   tail;
} queue_t;
bool init(queue *q, size_t size){
// Для начала убедимся, что размер кратен размеру страницы
if(size % getpagesize() != 0){
    return 0;
}

//Создаем анонимный файл и устанавливаем его размер
q->;fd = memfd_create("queue_buffer", 0);
ftruncate(q->;fd, size);

// Запрашиваем у mmap адрес локации, где мы можем отобразить обе виртуальные копии буфера
q->;buffer = mmap(NULL, 2 * size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);

// Отображаем буфер по этому адресу
mmap(q->;buffer, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, q->;fd, 0);

// Теперь снова отображаем его на следующей виртуальной странице
mmap(q->;buffer + size, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, q->;fd, 0);

// Инициализируем индексы нашего буфера
q->;head = 0;
q->;tail = 0;

}

Хорошо, что тут происходит? Что ж, сначала мы вызываем memfd_create, который делает именно то, что написано на этикетке. Мы находим место для двух копий буфера и сохраняем его как адрес нашего буфера. Затем мы отображаем виртуальный файл в память по адресу, который нам дал mmap. После этого в игру вступает наша черная магия.

Мы просим mmap снова отобразить наш файл по адресу в size байтах после его исходного местоположения. Теперь таблица страниц будет выглядеть следующим образом (предполагается, что size больше одной страницы):

Виртуальный адрес

Физический адрес

q->buffer

анонимный файл q->fd, смещение 0

q->buffer + getpagesize()

анонимным файл q->fd, смещение getpagesize()

q->buffer + size

анонимным файл q->fd, смещение 0

q->buffer + size + getpagesize()

анонимный файл q->fd, смещение getpagesize()

Да, тут у нас по две виртуальные страницы, которые указывают на одну и ту же физическую страницу для каждой страницы в нашем буфере. Посмотрим, как это повлияет на нашу логику кольцевого буфера:

bool put(queue_t *q, uint8_t *data, size_t size) {
    if(q->buffer_size - (q->tail - q->head) < size){
        return false;
    }
    memcpy(&q->buffer[q->tail], data, size);
    q->tail += size;
    return true;
}
bool get(queue_t *q, uint8_t *data, size_t size) {
    if(q->tail - q->head < size){
        return false;
    }
    memcpy(data, &q->buffer[q->head], size);
    q->head += size;
    if(q->head > q->buffer_size) {
       q->head -= q->buffer_size;
       q->tail -= q->buffer_size;
    }
    return true;
}

Мы позволяем нашему tail-смещению продвигаться во вторую область виртуальной памяти за пределы «реального» буфера. Однако он по-прежнему записывает в ту же физическую память. Точно так же мы можем читать за пределами первой области, а перейдя во вторую область, мы в конечном итоге читаем ту же физическую память, как если бы мы вручную закольцевали ее.

Обратите внимание, что нам нужно делать только один вызов memcpy вместо двух. После чтения данных мы проверяем, что указатель head не переместился во вторую виртуальную копию буфера. Если он все же переместился, мы можем уменьшить оба значения на размер буфера; они будут указывать на одну и ту же физическую память, хотя виртуальные адреса будут разными. API остался прежним.

Насколько это хорошо?

Ключевое отличие здесь в том, что мы можем всегда получить доступ к непрерывным блокам виртуальной памяти, вместо того, чтобы беспокоиться о переносе в середине сообщения. Мы можем написать простой микро-бенчмарк, который проверит эту логику следующим образом:

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>
#define BUFFER_SIZE (1024)
#define MESSAGE_SIZE (32)
#define NUMBER_RUNS (1000000)
static inline double microtime() {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return 1e6 * tv.tv_sec + tv.tv_usec;
}
static inline off_t min(off_t a, off_t b) {
    return a < b ? a : b;
}
int main() {
    uint8_t message[MESSAGE_SIZE];
    uint8_t buffer[2 * BUFFER_SIZE];
    size_t offset;
double slow_start = microtime();
offset = 0;
for(int i = 0; i &lt; NUMBER_RUNS; i++){
    const size_t part1 = min(MESSAGE_SIZE, BUFFER_SIZE - offset);
    const size_t part2 = MESSAGE_SIZE - part1;
    memcpy(buffer + offset, message, part1);
    memcpy(buffer, message + part1, part2);
    offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
}
double slow_stop = microtime();

double fast_start = microtime();
offset = 0;
for(int i = 0; i &lt; NUMBER_RUNS; i++){
    memcpy(&amp;buffer[offset], message, MESSAGE_SIZE);
    offset = (offset + MESSAGE_SIZE) % BUFFER_SIZE;
}
double fast_stop = microtime();

printf("slow: %f microseconds per write\n", (slow_stop - slow_start) / NUMBER_RUNS);
printf("fast: %f microseconds per write\n", (fast_stop - fast_start) / NUMBER_RUNS);

return 0;

}

На моем i5-6400 я получаю довольно единообразные результаты, выглядящие примерно так:

slow: 0.012196 microseconds per write
fast: 0.004024 microseconds per write

Обратите внимание, что код с одним memcpy примерно в три раза быстрее, чем код с двумя memcpy. У нас гораздо большее преимущество по сравнению с наивным побайтовым копированием, которое составило 0,104943 микросекунды на запись. Этот микро-бенчмарк не полностью отражает всю логику очереди (на которую могут влиять такие условия, как сбои страниц и промахи TLB), однако дает нам хорошее указание на то, что наша оптимизация была успешной.

Что только что произошло?

Судя по всему, этот трюк давно известен, но используется редко. Тем не менее, это отличный пример использования низкоуровневого поведения машины для оптимизации программного обеспечения.

Если вам интересно увидеть мою полную реализацию кольцевого буфера, вы можете найти ее на GitHub. В этой версии есть синхронизация, проверка ошибок и доступ к деталям сообщений произвольного размера.


В преддверии старта онлайн-курса «Программист С» приглашаем всех желающих на открытый демо-урок «Жизненный цикл программы на C под UNIX». На этом вебинаре мы рассмотрим полный жизненный цикл программы на языке C под UNIX-подобной ОС на примере системы из семейства BSD. Начнём с исходного кода и закончим загрузкой готового выполняемого файла. По ходу дела посмотрим «под капот» различным низкоуровневым механизмам операционной системы и тулчейна компиляции и познакомимся с инструментарием UNIX для анализа программ. Присоединяйтесь!

- Узнать подробнее о курсе «Программист С»

- Смотреть вебинар «Жизненный цикл программы на C под UNIX»

Теги:кольцевой буферязык cжизненный циклunix
Хабы: OTUS Программирование C
Всего голосов 15: ↑14 и ↓1 +13
Просмотры4.8K

Похожие публикации

Лучшие публикации за сутки