Как стать автором
Обновить
253.52
Тензор
Разработчик системы СБИС

Агрегаты в БД — эффективная обработка потока «фактов»

Время на прочтение5 мин
Количество просмотров4.7K

Предположим, вам надо обработать на PostgreSQL большое (не, не так... БОЛЬШОЕ) количество записей, чтобы посчитать какие-нибудь агрегаты. В предыдущей статье были разобраны различные варианты, как это можно организовать, а в этой посмотрим, как при этом особо никого не заблокировать, включая "набегающий поток" данных.

Например, это может быть пересчет остатков и ведение сводных продаж по товарам при их постоянных отгрузках, или агрегация сальдо и оборотов по бухгалтерским счетам, при массовых изменениях проводок, или что-то еще... В любой управленческой системе подобных задач наберется горка, и СБИС тоже не является исключением.

Но у всех этих ситуаций есть общий момент - количество изменений сильно больше количества целевых агрегатов. Например: тысячи товаров, по каждому десятки тысяч отгрузок в день.

В дальнейших рассуждениях на эту модель "Amazon с товарами" и будем опираться.

Целевая задача - сводные продажи за день

Хотим иметь агрегаты по продажам в разрезе товар/день/кол-во.

Конкретно в этом случае мы будем делать агрегаты "прямо в БД", чтобы иметь возможность быстро и целостно получать их для различных отчетов.

Тут, конечно, можно бы немного пожертвовать целостностью и выгрузить данные во внешнюю БД, которая позволяет получать агрегаты дешевле - ClickHouse, например. Но, представьте, что у нас данные не по товарам, а по бухгалтерским проводкам, среди которых одна, конкретно ваша зарплатная, при экспорте исказилась в неприятную сторону, и сальдо теперь не в вашу пользу...

Поэтому приходится либо обвешивать такую нетранзакционную в целом схему массой кросс-сверок (что очень сильно сказывается на производительности), либо строить 2PC-схемы с координаторами распределенных транзакций, или все-таки оставаться в рамках одной БД.

Первый момент, на который обратим внимание - разных товаров у нас достаточно много, поэтому обрабатывать последовательно, один за другим - очень долго, и не наш вариант. С другой стороны, самих "фактов" отгрузок очень много, поэтому обработать их все за одну итерацию мы тоже не сможем.

Что же делать?.. Тут нам на помощь приходит...

Конкурентная обработка очередей

Про общую модель работы с очередями я уже рассказывал, демонстрировал, как разбираться с проблемами "распухания" таблицы-очереди - тоже, поэтому нам необходимо совсем чуть-чуть - определиться, что будет являться независимыми "квантами" расчета, и что будет ключом "параллелизации" между потоками расчета.

В нашей модели явно напрашивается сделать ключом - "товар", а квантом - "товар-на-дату".

Почему бывает выгодно "прибивать" ключ к одному потоку, я рассказывал в презентации, посвященной алгоритмам расчета себестоимости (видео).

Трансляция операций в "факты"

В нашей модели конечный пользователь работает с "отгрузками". То есть создает, как-то корректирует, а иногда и вовсе удаляет их.

Каждую такую операцию мы будем триггером транслировать в таблицу "фактов" нашего потока данных. То есть в исходной таблице происходит INSERT, UPDATE или DELETE, но в таблице "потока" - всегда только INSERT. Как известно, в PostgreSQL две параллельные вставки записей в одну таблицу не блокируют друг друга, если не пересекаются по какому-нибудь unique-индексу.

Безусловно, "лить в поток" надо только операции, которые хоть как-то влияют на целевые агрегаты. Всякие изменения комментария, времени в рамках одного дня и прочее - незачем, они только "раздуют" flow-таблицу.

Трансляция "фактов" в "очередь"

На flow-таблице тоже висит триггер, который в таблицу очереди вставляет/обновляет "заголовочную" запись "такой-то товар в такую-то дату надо посчитать".

Почему "обновляет"? Для защиты от конфликта "я удалил из очереди, пока ты вставлял" - про это можно посмотреть слайд 58 из упомянутой выше презентации о расчете себестоимости.

Вот тут нас может поджидать блокировка, если у нас пересекутся во времени две длительные транзакции, которые будут работать с одним и тем же товаром в рамках одной даты. Но раз мы предполагаем, что таких операций даже в рамках одного товара у нас тысячи-в-день, то мы их делаем очень быстро, и вероятность такой блокировки мала.

Обработка очереди с ограничением времени

Теперь у нас все хорошо, данные текут, очередь заполняется, мы "даем" потоку расчета пару товар-дата и… И вот тут нас ждет грандиозный fail, если мы просто делаем обработку потока примерно так:

DELETE FROM flow WHERE (it, dt) = (1, '2018-07-29') RETURNING *;

Давайте вспомним, что в предыдущем пункте мы сформулировали одно из ключевых требований - "нет длинных транзакций". Но тут мы сами такую транзакцию и сделаем, если внезапно товара стало продаваться не 1K/день, а 10K/день.

Давайте поступим поумнее:

SET statement_timeout = 1000;

Все хорошо, теперь ни один запрос не сможет выполняться дольше секунды! Но, вот незадача, данные-то из flow-таблицы так никуда и не делись, а обработать их мы теперь не можем вовсе. Печаль…

Чтение из курсора

Ключевая проблема в предыдущем пункте в том, что мы попытались "проглотить" весь пирог одним куском, хотя нас никто не заставлял так делать. Если "рубить хвост по частям", то все проходит гораздо успешнее.

В пределе, можно читать из flow вообще по одной записи, и сразу ее удалять, но этот метод имеет слишком уж сильный оверхед на открытие каждой новой транзакции и "изолированную" вычитку записи.

Но, оказывается, обе проблемы вполне решаемы! Мы можем начать транзакцию однократно, а потом читать данные блоками с помощью курсора:

DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;
-- в цикле, блоками, пока не кончатся данные
    FETCH %d FROM curs;
    DELETE FROM flow WHERE ctid = ANY(...);

Здесь мы вместе с каждой записью flow вычитываем ее ctid - "физический" идентификатор для максимально быстрого доступа при последующем удалении обработанных записей.

SAVEPOINT спешит на помощь

Но мы ведь не знаем, сколько будет занимать обработка каждого отдельного кусочка "пирога". Каким поставить значение %d для FETCH? Слишком мало - вырастет оверхед, слишком много - сработает таймаут и откатит всю транзакцию... Или не всю?

PostgreSQL поддерживает так называемые "точки сохранения" с помощью операторов SAVEPOINT/ROLLBACK TO, когда после словленного исключения можно оперативно откатиться на "запомненное" в рамках транзакции состояние и продолжить как ни в чем не бывало.

Теперь наш алгоритм выглядит так:

  1. Читаем записи из курсора, на каждой итерации удваивая их количество (до какого-то разумного предела, конечно).

  2. Генерируем точку сохранения.

  3. Если в процессе чтения/обработки словили таймаут - откатываемся на последнюю точку и COMMIT'им транзакцию.

  4. Если просто вышло время, которое мы готовы выделить на всю транзакцию обработки - просто COMMIT'им.

  5. Если вычитанных записей оказалось меньше, чем просили - то они кончились, и "заголовочную" запись можно смело удалять из очереди (с проверкой версии, конечно).

  6. Конечно же, не забываем закрывать курсор!

BEGIN;
  DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;

  FETCH 1 FROM curs;
  DELETE FROM flow WHERE ctid = ANY(...);
  -- processing
  INSERT INTO agg ...
  SAVEPOINT _1;
  
  FETCH 2 FROM curs;
  DELETE FROM flow WHERE ctid = ANY(...);
  -- processing
  INSERT INTO agg ...
  SAVEPOINT _2;
  
  FETCH 4 FROM curs;
  DELETE FROM flow WHERE ctid = ANY(...);
  -- processing...
  INSERT INTO agg ...
  -- oops! timeout exception!
  ROLLBACK TO _2;
  
  CLOSE curs;
COMMIT;

На этом сегодняшняя магия заканчивается. Знайте, умейте, применяйте!


Мини-серия "Агрегаты в БД":

Теги:
Хабы:
+13
Комментарии0

Публикации

Информация

Сайт
sbis.ru
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия