Как стать автором
Обновить

Очередь задач в PostgreSQL

Время на прочтение 7 мин
Количество просмотров 30K

Очередь слонов - pixabay.com


Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.


Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.


Ограничения применимости


Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.


Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.


Суть метода в пяти словах


select ... for update skip locked

Базовая очередь


Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.


Таблица для простейшей очереди содержит саму задачу и её статус:


create table task
(
    id          bigint not null primary key,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__idx on task (status);

Добавление задачи:


insert into task (id) values ($1) on conflict (id) do nothing;

Получение следующей задачи:


with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи:


update task
set
    status = 2
where id = $1;

Очередь с приоритетами


В простом случае id задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id).


Либо для приоритета добавляется отдельный столбец:


create table task
(
    id          bigint not null primary key,
    priority    integer not null,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__priority__idx on task (status, priority);

Добавление задачи:
insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;

Получение следующей задачи:
with next_task as (
    select id from task
    where status = 0
    order by priority
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Выделенный столбец позволяет менять приоритет задачи "на лету".


Очередь с повтором "упавших" задач


В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.


create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    delayed_to  timestamp null,
    error_text  text null
);

create index task__status__delayed_to__idx on task (status, delayed_to);

Как видно, расширился список статусов и добавились новые столбцы:


  • attempt — номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на 10 * attempt минут);
  • delayed_to — время следующей попытки выполнения задачи;
  • error_text — текст ошибки.

Текст ошибки нужен для группировки по типам ошибки.


Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом "ошибка". Выполняем запрос:


select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;

За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.


Получение следующей новой задачи:
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:
with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:
update task
set
    status = 2,
    delayed_to = null,
    error_text = null
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:
update task
set
    status = 3,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:
update task
set
    status = 4,
    delayed_to = null,
    error_text = $2
where id = $1;

Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or может очень плохо сочетаться с сортировкой order by.


Сбор метрик


Добавляем такие атрибуты:


  • время создания задачи;
  • время изменения задачи;
  • время начала и завершения выполнения задачи.

create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    begin_time  timestamp null,
    end_time    timestamp null,
    delayed_to  timestamp null,
    error_text  text null,
    created     timestamp not null default localtimestamp,
    updated     timestamp not null default localtimestamp
);

create index task__status__delayed_to__idx on task (status, delayed_to);
create index task__updated__idx on task (updated);

Учитываем добавленные столбцы во всех запросах.


Получение следующей новой задачи:
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:
with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:
update task
set
    status = 2,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:
update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2,
    updated = localtimestamp
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:
update task
set
    status = 4,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = $2,
    updated = localtimestamp
where id = $1;

Примеры, для чего это может быть нужно


Поиск и перезапуск повисших задач:


update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp,
    error_text = 'hanged',
    updated = localtimestamp
where status = 1
  and updated < localtimestamp - interval '1 hour';

Удаление старых задач:


delete from task
where updated < localtimestamp - interval '30 days';

Статистика по выполнению задач:


select
    date_trunc('hour', end_time),
    count(*),
    sum(end_time - begin_time),
    avg(end_time - begin_time)
from task
where status = 2
  and end_time >= '2019-12-16'
group by 1
order by 1;

Повторный запуск ранее выполненных задач


Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.


create table task
(
    id              bigint not null primary key,
    task_updated_at timestamp not null default localtimstamp,
    status          integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    begin_time      timestamp null,
    end_time        timestamp null,
    delayed_to      timestamp null,
    error_text      text null,
    created         timestamp not null default localtimestamp,
    updated         timestamp not null default localtimestamp
);

Здесь для времени обновления задачи добавлен столбец task_updated_at, но можно было бы использовать поле created.


Добавление или обновление (перезапуск) задачи:


insert into task (id, task_updated_at) values ($1, $2)
on conflict (id) do update
set
    task_updated_at = excluded.task_updated_at,
    status = case when status = 1 then 1 else 0 end,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where task_updated_at < excluded.task_updated_at;

Что здесь происходит. Задача становится "новой", если она сейчас не исполняется.


В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.


Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.


Успешное завершение задачи:


update task
set
    status = case when begin_time >= updated then 2 else 0 end,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус "новая".


Pipeline


Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.


Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.


create table task
(
    id      bigint not null primary key,
    stage   integer not null default 0,
    status  integer not null default 0
);

create index task__stage__status__idx on task (stage, status);

Получение следующей задачи на заданной стадии:


with next_task as (
    select id from task
    where stage = $1
      and status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи с переходом на указанную стадию:


update task
set
    stage = $2,
    status = 2
where id = $1;

Или переход на следующую по порядку стадию:


update task
set
    stage = stage + 1,
    status = 2
where id = $1;

Задачи по расписанию


Это вариация очереди с повтором.


У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).


create table task
(
    id              bigint not null primary key,
    period          integer not null,               -- периодичность запуска в секундах
    status          integer not null default 0,     -- 0 - новая, 1 - в работе
    next_run_time   timestamp not null default localtimestamp
);

create index task__status__next_run_time__idx on task (status, next_run_time);

Добавление задачи:


insert into task (id, period, next_run_time) values ($1, $2, $3);

Получение следующей задачи:


with next_task as (
    select id from task
    where status = 0
      and next_run_time <= localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи и планирование следующего запуска:


update task
set
    status = 0,
    next_run_time = next_run_time + make_interval(secs => period)
where id = $1

Вместо заключения


В создании специализированной очереди задач средствами РСУБД нет ничего сложного.


"Самопальная" очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.


Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.

Теги:
Хабы:
+24
Комментарии 58
Комментарии Комментарии 58

Публикации

Истории

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн