Как стать автором
Обновить

Комментарии 58

В создании специализированной очереди задач средствами РСУБД нет ничего сложного
… кроме возможных проблем с конкурентным доступом и модификацией данных.

Если вы проектируете очередь, то извольте либо сделать ее thread-safe, либо явным образом объявите, что, скажем, метод получения очередной задачи thread-safe, а отметки об ее завершении — thread-unsafe.
Без этого никогда нельзя делать вот так:
update X set status = 1 where id = @id
, так как другой поток мог уже модифицировать статус, и оба потока отметят задачу дважды. Правильно делать так:
update X set status = 1 where id = @id and status = 0
и далее проверять по числу records affected, удалось это текущему потоку, или нет.

Та же ситуация с переходом задач на следующую стадию — оно все требует либо serializable транзакции поверх всех экзерсисов (что делает систему фактически однотредовой), либо просит явного указания на однопоточное использование. С повтором же возможна ситуация, когда долго работающая задача отмечает свое успешное выполнение уже после того, как внешний скрипт отметил задачу, как подлежащую повтору — и она будет выполнена дважды
select… for update skip locked
Из за этого запроса не может быть ситуации когда задачу получат разные потоки или я не прав?
Получит, естественно, единственная коннекция — поэтому в плане получения задачи все потокобезопасно — я и пишу в исходном комментарии, что метод получения задачи — thread-safe.
Однако, нигде нет утверждения, что а) задача *обрабатывается* в той же коннекции, в которой была получена, и б) что обработка задачи — однопоточная.
Допустим даже, что прикладной уровень, получив от бакенда задачу, распараллелил ее на несколько тредов, грамотно подождал их окончания, и уже в единственном треде вызвал SQL-код, отмечающий ее, как выполненную (это весьма реальный сценарий) — но отсутствие потокобезопасности между фоновым SQL-процессом, который перезапускает задачи и кодом, который их отмечает, как выполненные — это явный фейл. В этом случае *никаким* построением прикладного уровня проблему повторного перезапуска успешно выполненной на грани таймаута задачи не решить — соответственно, бакенд кривой.
Поэтому я утверждаю, что если данное решение очереди рекомендуется в качестве образоцовой реализации очереди средствами SQL-бакенда, в нем должно быть явно указано отсутствие потокобезопасности в перечисленных случаях — а именно: 1) завершение задачи, 2) продвижение задачи в следующую стадию, 3) перезапуск по таймауту (последнее — нерешаемо на уровне архитектуры приложения). Либо… все эти места доработаны так, чтобы быть thread-safe. Тогда этот код можно рекомендовать другим без оговорок частных случаев использования — а это уже гораздо лучше — при том, что доработки нужны минимальные.

PS. автору статьи — тактика «минус-в-коммент, минус-в-карму» — ассоциируется с цитатой из М. Жванецкого — «зачем спорить с хромым об искусстве, если можно сразу ему сказать, что он хромой» :)
Но вот же запрос который атомарно делает недоступной задачу для других конекшинов/потоков:
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;
Он делает атомарным только *взятие* задачи из очереди — и я с этим не спорю, т.к. for update skip locked гарантирует, что ID одной и той же задачи получит строго один поток — остальные либо не получат ничего (если в очереди больше нет задач на взятие), либо получат ID следующих незалоченных невзятых задач. Взятие сделано грамотно и оно — потокобезопасно.

Я же говорю о дальнейших действиях — по ID возвращенной нам задачи слой приложения выбирает из нее payload, начинает с ним работать, и по завершению работы либо обновляет номер фазы, либо отмечает задачу как сделанную или зафейленную. И проблемы конкурентности возникают как раз здесь.

Например, при взятии многофазной задачи из очереди нужно атомарно возвращать не только task.id, но и номер фазы, и при продвижении на следующую фазу подставлять в условие update не только ее ID, но и полученный номер фазы + все условия, которые говорят о валидности задачи на момент ее продвижения (то есть то, что она не стоит в перезапуске, не снята по таймауту, не поставлена в статус «выполнена» или «зафейлена» итд) — и далее смотреть, сколько строк саффекчено этим update. Если 0 — значит, нас опередили, и тред нашего приложения должен выкинуть ошибку и выбросить результат своей работы в помойку. Вот тогда будет потокобезопасно.
Нормально все у автора с реализацией захватов и обновлений состояний задач. Просто он рассматривает только СУБД часть реализации, но с ней все в порядке. Неважно в сколько потоков будет обрабатываться задача, когда закоммитится транзакция БД в которой было обновление статуса, оно произойдет. Ну важно только чтобы обработка задачи была в рамках одной транзакции БД, не делать коммит транзакции БД до завершения обработки задачи.

Вы подучите как работают транзакции в БД. У автора с этим все в порядке. И не надо там никаких дополнительных проверок статуса при апдейте.

Потокобезопасность это вопрос уровня приложения, и если в приложение неправильно работает с потоками, то
where ... status = 0

вам не поможет
Потокобезопасность это вопрос уровня приложения,
— это пять! Кривой код в БД, который потенциально способен привести к тому, что одна и та же задача может быть выполнена дважды, отмечена и как ошибочная, и как успешная, а для многофазных заданий — еще и проскочить несколько фаз — это вопрос уровня приложения?! Я построил новый дом, но вы заходите туда строго по-одному, а то у него пол провалится :)

по поводу where… status = 0 — это тоже очень наивное заблуждение, что просто «транзакции» нам здесь помогут. Помогут только serializable транзакции — а это сразу прощай параллельность работы очереди. Кроме того, почитайте мой коммент снизу, зачем нужны все эти ухищрения с атомарным взятием, сборщиком подвисших заданий, перезапуском итд

Я конечно никогда не работал с этим, но судя по документации select for update skip locked должен работать с блокировками на уровне строки именно так как это и предполагает автор статьи.

Кривой код в БД

В БД совершенно нормальный код. Вопрос потокобезопасности приложения он не относится к рассматриваемогу автором коду в БД. Да он должен быть потокобезопасным. Но к коду в БД это не относится.
Еще как относится. Вот, например, такой код будет работать до тех пор, пока из двух коннекций одновременно не попытаются вставить одно и то же значение @SomeUniqueField:
IF NOT EXISTS(
   SELECT 1 FROM SomeTable
   WHERE SomeUniqueField = @SomeUniqueField
 )
	INSERT INTO SomeTable (SomeUniqueField)


Первый же студенческий возглас — так надо транзакцию! — все равно не поможет, т.к. оба потока прочитают из таблицы через shared lock, убедятся, что такой записи нет, и затем попытаются ее добавить. И кто-то один по-прежнему обломается.

Следующий шаг (уровень джуна) — давайте сделаем serializable-транзакцию. Она, действительно, помогает, но одновременно с этим ведет себя, как средневековая проказа — любой объект, до которого вы дотронулись под serializable-транзакцией, оказывается заразным (получает exclusive lock). Именно поэтому такой уровень изоляции помогает — другие потоки даже прочитать ничего не могут. Но serializable — это СИЛЬНЕЙШИЙ антибиотик, который лечит нашу болезнь, но одновременно напрочь лишает базу возможности параллельной работы (см выше).

А всего-то надо было написать:
	INSERT INTO SomeTable (SomeUniqueField)
	SELECT Q.SF
	FROM (
		SELECT @SomeUniqueField AS SF
	) Q
	WHERE NOT EXISTS(
		SELECT 1 FROM SomeTable S
		WHERE S.SomeUniqueField = Q.SF
	)


и мы, используя встроеннум атомарность, решим задачу без внешних транзакций.
Если же нам надо узнать, наш ли именно поток фактически вставил запись в таблицу, или это был кто-то другой, а наш обломался, то сразу после стейтмента вставки достаточно проверить:
	IF @@ROWCOUNT > 0 ...		-- наш

	ELSE ...			-- чужой

Указанные проблемы конечно имеют место быть, но к указанной теме (неблокируемый забор задачи из очереди) они отношения не имеют.
Указанная тема не
неблокируемый забор задачи из очереди
— он-то как раз сделан правильно, а
Очередь задач в PostgreSQL
— в реализации которой есть несколько методов, часть из которых thread-unsafe. Я даю иллюстрацию к тому, что использовать встроенную атомарность можно и нужно для того, чтобы простыми средствами сделать эти методы *тоже* потокобезопасными. Ведь ни у кого не вызывает вопросов способ, которым взятие задания сделано потокобезопасным, хотя он сделан без всяких транзакций :)

Тот факт, что в конкретном случае мой пример для PostgreSQL решается через ON CONFLICT, не имеет значения, т.к. суть метода в том, чтобы проверить входное условие перед модификацией записи, чтобы убедиться, что запись не была изменена конкурентно. Я пишу под MS SQL, там нет ON CONFLICT ;)

Методика сходна с проверкой на модификацию записи перед обновлением добавлением значения rowversion к значению ключа
в предикат условия выборки/обновления — что гарантирует отсуствие изменений в период между взятием записи и ее обновлением
Во-первых, как правильно уже отметили, ваш пример никак не относится к задаче в статье.
Во-вторых, ваш пример правильно решается кодом
INSERT INTO SomeTable (SomeUniqueField) VALUES (@SomeUniqueField)
 ON CONFLICT (SomeUniqueField) DO NOTHING;

а не тем что вы наизобретали.
Ну и в-третьих, без транзакций не получится, БД работают только с транзакциями, это обязательная часть.

Идите и учите как работают базы данных, и не будет для вас открытий и странных изобретений ugly кода

Краткое содержание:
Статья "как плохо решить задачу неподходящими инструментами"
Комментарий "Оно просто не работает как очередь задач (MQ), потому как не соответствует требованиям MQ"
Ответ: "а тот факт, что оно не выполняет возложенных задач, никак не относится к статье"


От себя добавлю, что выбор постгреса в качестве очереди задач (помимо того, что решение попросту неэффективно) крайне неудачен ввиду особенностей реализации MVCC: постгрес на каждый update создаёт новую запись. Режим работы — минимум 1 update для каждой записи.


Прекратите собирать троллейбус из буханки хлеба, и после с умным видом рассказывать о корректности использования ржаного хлеба. Умные люди уже придумали, как правильно решать такие задачи


Изюминка ситуации в том, что в моём текущем проекте заказчик настоял на использовании БД (постгрес) в качестве резервной очереди задач. И я реализовал нечто подобное.
Разница в том, что с самого начала я знал, что творю маразм

Работа инженера — выбрать подходящий инструмент. В некоторых случаях нет готового хорошего инструмента, приходится выбирать из плохих, как-то адаптировать и как-то с этим жить. Статья как раз о тех случаях, в которых MQ не удовлетворяет требованиям, и приходится делать самописную очередь.

В самом начале, как раз, указан пример требований (очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач) и отмечены ограничения применимости.

Вот давайте попробуем выполнить эти требования при помощи MQ. Давайте даже ослабим требования, исключив приоритет задач.

Возьмём MQ, которая умеет отложенную доставку сообщений. Например, ActiveMQ.

Шлём упавшие задачи в ту же очередь с заданной задержкой доставки. Всё ok, но однажды возникает вопрос: сколько в этой очереди новых задач и сколько повторных?

Тогда давайте разделим очередь на две: в одну пишем новые, в другую — упавшие с задержкой. Отлично. Как теперь балансировать нагрузку на воркеры? 50% на новые, 50% на упавшие? А если упавших мало, то 50% мощностей будут простаивать? Брать задачи из каждой очереди по очереди?

Допустим, как-то решили этот вопрос. Далее. Как узнать, какие основные причины, по которым задачи попадают в очередь упавших? Ну, наверное, надо логи погрепать…

Ладно. А как быть уверенным, что в очереди находятся действительно все упавшие задачи, и какая-то не потерялась? Ну, например, воркер может писать в какую-нибудь БД, когда он взял в работу задачу. И поискать в этой БД невыполненные задачи, которые давно не обновлялись (не брались в работу).

А зачем нам тогда очередь, если у нас уже есть таблица с задачами, их статусом и временем последней попытки выполнения?

По поводу update-ов в БД. Для одной задачи в лучшем случае выполняется 1 insert и 2 update (взять задачу, завершить задачу). Во время выполнения задачи в БД могут совершаться десятки и сотни запросов на вставку и изменение строк.

Эти программисты так любят что-то менять в БД, они что, не понимают, что при этом каждый раз создаётся новая запись?! Давайте им запретим.

Изюминка ситуации в том, что в одном из моих текущих проектов очень активно используется ActiveMQ. И её использование в некоторых случаях не соответствует эксплуатационным требованиям. И я с самого начала знал, что она не будет им соответствовать. И вот сейчас, когда самописная очередь прошла проверку на менее критичных задачах в менее критичных проектах, придёт, наконец, время заменить один не очень хороший инструмент на другой, тоже не очень хороший, но получше.
В некоторых случаях нет готового хорошего инструмента

Правда чтоль? https://en.wikipedia.org/wiki/Message_queue#See_also
И правда, ни одного готового хорошего инструмента


В самом начале, как раз, указан пример требований (очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач) и отмечены ограничения применимости.

Тут следует оговориться: я щупал MQ только через API Celery (по ссылке выше его кстати отнесли к MQ, хотя формально он пользователь MQ):
https://github.com/kai3341/celery-pool-asyncio/blob/master/celery_pool_asyncio/executors.py#L108
Из всего этого важно: accept_callback, callback (on_success), timeout_callback, error_callback. Всё это позволяет в полном объёме обработать исключения. API Celery также умеет в отложенную доставку.


сколько в этой очереди новых задач и сколько повторных?

Смотрю в DEBUG-лог Celery. Перед попаданием таски в пул в stdout печатается само сообщение. Там фигурирует 'retries': 0. Здесь тело интересующего нас сообщения инкапсулировано в структуру, передающую также метаданные.


Тогда давайте разделим очередь на две: в одну пишем новые, в другую — упавшие с задержкой. Отлично. Как теперь балансировать нагрузку на воркеры? 50% на новые, 50% на упавшие? А если упавших мало, то 50% мощностей будут простаивать? Брать задачи из каждой очереди по очереди?

Если мне не изменяет память, о балансировке рассказывал Олег Чуркин в докладе "50 оттенков Celery".


Как узнать, какие основные причины, по которым задачи попадают в очередь упавших? Ну, наверное, надо логи погрепать…

Ну помимо логов, в error_callback передаётся также информация об ошибке


А как быть уверенным, что в очереди находятся действительно все упавшие задачи, и какая-то не потерялась?

В этом смысл accept_callback, success_callback и error_callback. Все они предназначены в том числе и для отслеживания выполнения задач


А зачем нам тогда очередь, если у нас уже есть таблица с задачами

Допустим, Вы решаете задачу электрификации. Для того, чтобы провести линию в некий удалённый посёлок, нужно очистить площадь от деревьев, поставить столбы, на них поставить изоляторы, и уже на изоляторы положить кабель. Очистка территории от леса дело дорогое, столбы тоже дорогие. И тут вам приходит светлая мысль: А зачем нам вырезать деревья и после ставить столбы, если можно прикрутить изоляторы прямо к деревьям?


Мораль грузинский басня прост: из схожести интерфейсов не следует, что одно решение можно заменить другим.


Эти программисты так любят что-то менять в БД, они что, не понимают, что при этом каждый раз создаётся новая запись?! Давайте им запретим.

Ближайший аналог — конкатенация иммутабельных строк в python. Вам никто не запрещает выполнять их конкатенацию в цикле. Только вычислительная сложность под капотом выходит O(N!). А что пользователи? Подождут, куда денутся


активно используется ActiveMQ. И её использование в некоторых случаях не соответствует эксплуатационным требованиям

Ну так не используйте её в этих некоторых случаях.

, не делать коммит транзакции БД до завершения обработки задачи.

Такие транзакции являются https://en.wikipedia.org/wiki/Long-lived_transaction и это скорее плохо чем хорошо. Такие транзакции очень часто используются в сообществе программистов Interbase/Firebird судя по информации с некоторых форумов.


Также такие транзакции нередко реализованы в некоторых ORM (например sequelize/node.js)


Вцелом, "хорошим", более обычным применением транзакций является их выполнение в одном запросе с клиента к серверу.

Например, при взятии многофазной задачи из очереди нужно атомарно возвращать не только task.id, но и номер фазы,
id задачи — это первичный ключ, поэтому не может существовать задачи с тем же id, но другой фазой, поэтому возвращать фазу нет необходимости.
и при продвижении на следующую фазу подставлять в условие update не только ее ID, но и полученный номер фазы + все условия, которые говорят о валидности задачи на момент ее продвижения (то есть то, что она не стоит в перезапуске, не снята по таймауту, не поставлена в статус «выполнена» или «зафейлена» итд) — и далее смотреть, сколько строк саффекчено этим update. Если 0 — значит, нас опередили
Но вы же согласились, что взятие задачи тут выполнено корректно и потокобезопасно. А это значит, никакой другой поток/клиент не может работать с этой задачей (в том числе, менять её статус или фазу), если следует соглашениям и взял задачу корректно.
Конечно взятие реализовано корректно. Но нигде не оговорено, что *обработка* задачи всегда должна идти в том же потоке, который осуществил взятие. Поэтому я и утверждаю, что в данном случае желательно делать все методы работы с очередью потокобезопасными уже на уровне БД, тем более, что а) это практически ничего не стоит, и б) очередь — довольно стандартный широкоиспользуемый компонент, и вы никогда не знаете, насколько корректно код приложения будет работать с вами.

Если вы в приложении взяли задачу одним потоком, а обрабатывайте его в другом, то это не проблема РСУБД и не зона его ответсвенности, т. к. ни чего она про ваше приложение не знает и знать не должна. Да и в принципе не может. На уровне соединений при конкурентной доступе она безопасность обеспечивает.

нигде не оговорено, что *обработка* задачи всегда должна идти в том же потоке, который осуществил взятие
Проблема возникает не тогда, когда обработка выполнится не в том потоке, который осуществил взятие (это как раз нормально и проблем не вызовет), а когда два потока пытаются обновить одновременно одну строку таблицы.

А этого не будет, если только программист специально не напишет такой сценарий: взяли элемент, стартовали два потока, которые поставят этому элементу признак «завершён».

Но зачем такое писать в здравом уме? Можно просто написать «delete from tablename» без «where», и сказать «ваша система небезопасна, раз позволяет такое»?
Очень дельное замечание (как и другие ваши комментарии в ветке), спасибо.

Я не утверждаю эталонность подхода. Считаю, что нельзя просто копировать код из этих ваших интернетов в свой проект. Всё нужно обдумывать, критически оценивать и адаптировать к своим условиям.

При подготовке статьи код берётся из работающей системы, обобщается и упрощается. Здесь легко потерять некоторые нюансы или не донести контекст.

Например, отмеченные вами проблемы завершения задач в контексте процессов моей системы проблемами не являются, поскольку

1) перезапуск уже завершённой задачи допустим и к катастрофе не приведёт;

2) за исполнителями наблюдает супервизор и принудительно завершает их по таймауту, перезапуская взятые исполнителями задачи. Пример перезапуска задач в статье нужен для ситуации, когда аварийно завершился супервизор (и при этом были убиты все подчинённые ему исполнители).

Я искренне рад, что вы понимаете критичные для построения очередей в БД моменты (и вообще, критичные для эффективно работы с БД принципы), и упрощения в конкретной реализации очереди являются не результатом технического просчета, а частным минимально-необходимым решением задачи.

Поведение нормальной СУБД от потоков клиента не зависит (в общем то она и не знает есть ли они), для изоляции есть a — транзакции и b — явные блокировки, что и описано в статье.
Флап соединения с БД и приехали. Соседний воркер берет ту же задачу и начинает выполнять.
Нужно добавлять приличное количество костылей для защиты от таких ситуаций.
не знаю что такое «флап», но что по вашему делает следующий запрос?
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Если поток упал, то задача так и будет в статусе «в работе».

С такой реализацией при попадании соединения или при обычном падении воркера таск залипнет. И костыли нужны для разруливания этой ситуации.


А в ситуации select for update и ничего не апдейтим пока таск выполняется будут дубли при пропадании соединения. И тоже костыли нужны.


Проблема всех простых очередей именно в таких ситуациях. Разруливание всех ситуаций когда что-то идет не так.

будут дубли при пропадании соединения
А что значит при пропадании соединения? Ведь если не закоммитим и пропадет соединение то БД просто сделает ролбек, а воркер не сможет сделать коммит или я в чем то неправ?

Таск перевели в статус запущен. Закомитили. А в комплит перевести не смогли. Ошибка сети, ошибка БД, ошибка воркера, любая другая ошибка. Теперь статус таска неизвестен (вечный работает) и что с ним делать непонятно.


Если не комитить а держать открытую транщакцию, то при падении сессии в БД и соотвественно транзакции другой воркер возьмет этот таск и будет выполнять. Что при это будет делать первый воркер непонятно.

Так нет идеальных очередей, например с RabbitMQ тоже есть нюансы.
Если все происходит в рамках одного сервера, то можно в базу писать PID процесса и по нему отлавливать грохнувшийся задачи и их перезапускать. Главно при реализации не забыть, что PID является переиспользуемым и чисто проверка на PID может вызывать проблемы.

Обычно для задач устанавливается и таймаут для отслеживания аварийно завершившихся тасков (которые не смогли перевести себя в статус=3 в статусах автора статьи). При этом начинаются проблемы когда таск может быть равно как и долгоиграющим так и зависшим.

И по закону Мерфи такие проблемы будут точно.

Не нужно никаких костылей. Нужно просто выучить как работают транзакции в СУБД. Все там нормально будет, соседний воркер не получит ту же задачу
Нужно просто понимать, что данный набор методов *не может* идти под сквозной транзакцией от получения задачи до отметки о ее выполнении — иначе все механизмы, связанные с очередью, станут однозадачными.
Вся идея такой реализации очереди — в том, что используются не внешние транзакции, а встроенная атомарность SQL-движка, и в результате реализуется механизм двухфазных транзакций:
1) задание атомарно достается из очереди и отмечается, как взятое без всяких внешних транзакций
2) в течение времени, не превышающем время таймаута поток-обработчик задания его выполняет. Обработчик может использовать транзакции, а может и нет — это зависит от его логики обработки. Главное, чтобы действия, связанные с манипуляциями с очередью, либо не использовали внешних транзакций, либо совершались в транзакции с минимальным количеством затронутых объектов БД
3) поток-обработчик атомарно отмечает задание, как выполненное. Здесь тоже должна использоваться встроенная атомарность, чтобы не было необходимости во внешней транзакции. Как-то так

Я недавно создал пост https://habr.com/ru/post/458608/ о нескольких реализациях job queue.
Вцелом хорошо что Вы рассматриваете разные типы задач, в том числе и задачи выполняемые по расписанию. А также хорошо что предусматриваете повторное выполнение задач в случае ошибки.


По опыту тестирования некоторых готовых библиотек, в частности agenda, которая основана также на базе данных (mongodb) могу сказать что решение с тяжелой базой данных существенно медленнее чем решение основанные например на redis. Но даже не это главное.Та же agenda, которая по идее должна работать медленно но стабильно в реальности после какого-то предела просто останавливается и перестает выбирать задачи из очереди. Я это говорю к тому, что в боевых условиях умозрительные построения могут начать работать не так как задумывал их автор.


Так же у очередей заданий желательно иметь масштабирующий и "сглаживающий" функционал, то есть возможность запускать несколько воркеров на выполнение одной задачи. А также на сглаживание "пиков" задерживая выполнение очередной задачи пока не завершаться текущие задачи.

Проблема в том что постоянно надо пуллить и смотреть появились ли задачи в очереди. Вот бы как-нибудь сделать так чтобы запускать задачу только при наличии задачи в очереди без постоянного опроса ее. Такое можно как-нибудь сделать?

Поллинг обычно не проблема, если задач в очереди много. Т.е. за время обработки предыдущих выбранных задач данным воркером накапливаются новые, которые он счастливо и поллит.

мой комментарий прямо под вашим )

у постгреса есть механизм LISTEN/NOTIFY, очень удобен для малоактивных очередей (обработчик не должен постоянно опрашивать сервер в ожидании задания).


create index task__status__priority__idx on task (status, priority);

иногда бывает полезно делать вот так:


create index task__status0__priority__idx on task (priority) where status=0;
Есть, но как PG запустит приложение?

речь не про это.


вот у вас есть воркер, которые ждёт задачу. а очередь пуста. что остаётся делать воркеру? периодически слать select bla-bla-bla в ожидании задания.
будешь слать часто — ненужная нагрузка на сервер. редко — время реакции на новое задание неприемлемо высоко.


LISTEN/NOTIFY как раз и решает эту проблему — воркер делает LISTEN и ждёт.

А есть ли вариант послать NOTIFY из триггера на вставку строки и насколько это рабочий вариант?
Нормальный это вариант. Работает. Но только для закомиченых транзакций (т.е. не сработает пока не будет COMMIT). Если слушателя при этом нет (отвалился), то ошибки не будет, но событие потеряется, т.е. PG в себе их не хранит.
Вы, кстати, зря про то, что этот способ плохо масштабируется. Нормально он масштабируется — вернее, упирается только в одновременное кол-во коннектов к бд (которых можно поставить и 5000 штук для таких простых запросов). Выгребать очередь в 5000 воркеров — это очень и очень неплохо.

Громадный плюс данного подхода (вернее, не в точности его, а когда блокировка строки не отпускается воркером при захвате задачи — так тоже можно сделать) в том, что при смерти процесса-воркера (kill -9) его коннекшены автоматом закрываются, и все блокировки отпускаются. Т.е. у данного решения нет проблемы, когда одну и ту же задачу могут взять два воркера — а значит, не нужно никакого геморроя с heartbeat-ами и т.д. (Естественно, при условии, что воркер существенно использует тот же самый коннекшн в своей работе.)

И еще одно. Можно же иметь не одну таблицу, а 10. Хоть бы и партицированную. И натравливать N воркеров не на одну таблицу, а на 10. Это практически линейно все отмасштабирует (потому что для субд все эти запросы очень просты, она их как орешки щелкает, они все попадают в индекс).

Да, еще вот https://cadenceworkflow.io — на нем весь Uber держится, и там очереди именно так реализованы (правда, на MySQL, но сути не меняет).

Извините, но когда вы пишете про 5000 коннектов к БД, вы понимаете, как PG устроен под капотом?
Держать блокировку воркером на протяжении всего времени выполнения задачи — это совсем не масштабируемое решение. Для использования в продакшене оно мало подходит.
А если отпускать блокировку после захвата (как правильно делать), то нужно очень аккуратно реализовывать перезапуск\смену статуса у тасок, что, собственно, обсуждается в треде выше.

5000 коннектов — это, например, дефолтный лимит в AWS Aurora Postgres. 5000 практически ничего не делающих процессов — не так и много: если каждый съест по 10М, то получится 50Г в памяти — по современным меркам приемлемо. (Но, опять же, 5000 — очень большая цифра, если блокировки отпускать, то коннектов потребуется на 1-2 порядка меньше.)


Насчет перезапуска тасков — это очень тонкий момент, нужно 10 раз подумать, действительно ли так уж много заданий и нельзя обойтись долгими сессионными блокировками (pg_try_advisory_lock). Работает воркер — захватил блокировку, умер — его коннект тоже умер, блокировка снялась, новый воркер может подхватить таску. (Это, правда, выходит за рамки данной статьи.) Реально очень сильно облегчает жизнь, особенно если данные, с которыми работает воркер, находятся в той же бд, где захватывается блокировка (тогда блип коннекта не страшен).

Прочитал статью и проглядел комменты, но так и не увидел упоминания PGQ. Почему бы его не взять? Более того, Алексей Лесовский в своём докладе «Топ ошибок со стороны разработки при работе с PostgreSQL» явно жалуется на самописные очереди.
Ночь потому что, хорошие dba спят ;-)

Самописная реализация очереди привлекает разработчиков, «что тут делать, фигак фигак и в продакшен». А что случается с продакшеном даже всего под сотней запросов к очереди в секунду — узнают потом. И то бывает игнорируют. Ну и что что 99% времени база лопатит эту недоочередь, ну и что что любая транзакция свыше десятка минут кладёт проект — но работает же!
Да вот не просто. Хотите очередь в базе (а это действительно удобно из-за помещения события в очередь в той же самой транзакции, rollback транзакции и consumer не увидит событие как будто ничего и не было) — то вам нужен pgq. У него большие проблемы с документацией — но это та реализация очереди-в-базе которая под нагрузкой живёт нормально потому что учитывает как именно работает эта СУБД.

По крайней мере пока не появилась реализация table access method с UNDO. Или вовсе что-то ещё более специфичное именно под очереди. Появившиеся в pg12 api tableam это уже позволяют сделать.
1.
create index task__status__idx on task (status);

Бессмысленный индекс в таком виде. Значение 2 (выполнена) из него нужно выкинуть сразу. В штатном режиме все задачи будут иметь финальный статус 2, их будет много, индекс не будет использоваться.

2.
error_text  text null

Лучше так не делать в production'е.
Люди начинают писать в поле длинные стек-трейсы, таблица тостируется, вы теряете в перфомансе.
Ограничивайте длину поля явно.

3. Что касается очереди с повторами «упавших» задач, то не увидел самого главного: как отличить упавший воркер от долго работающего воркера?
Если воркер упал во время работы, кто перезапустит таску на выполнение? Кто проставит ошибку?
1. По финальному статусу обычно не ищут. Для нефинальных — cardinality обычно хороший.
3. Это «беда» любых воркеров. Если он не сообщает свой статус, долгоиграющий не отличить от зависшего. Вариант — контролировать таймауты и принудительно прерывать, с соответствующими статусами.

В общем случае это называется heartbeats. Раз в N секунд воркер с номером W пытается отрепортить, что он еще живой. Если во время такого репорта ему говорят, что появился какой-то другой, более молодой, воркер с тем же номером W, то старый воркер совершает сепукку.


Правда, можно представить, какую нагрузку это все создает, когда воркеров много, и что все равно есть промежуток времени, когда оба воркера могут работать одновременно. Это-то как раз и можно решить advisory-блокировкой на значение W в основной бд, с которой работает воркер W. В этом случае сам коннект к бд выступает таким своеобразным каналом heartbeat-ов.

НЛО прилетело и опубликовало эту надпись здесь
Если появился молодой, значит старый исчезал на какое-то время.
Лучше прибить старого, у него какие-то проблемы.

Вообще такая ситуация ненормальна и не должна возникать. Лучше всего когда воркер не получает ответ на хартбит он самоубивается. Естесвенно с некоторой защитой от флапа. Например, таймаут у воркера до смерти 5 секунд без хартбитов. А на планировщике таймаут до появления молодого воркера 10 секунд.

И тогда старый и молодой работающие одновременно становятся черезвычайно редкой и как правило ошибочной ситуацией.

Это все применимо к надежным каналам. Пропадение хартибитов не должно быть частой ситуацией. Допустим 1 раз на 100 средних задач.
3. Супервизор.
Супервизор запускает и поддерживает заданное количество работающих исполнителей.
Супервизор следит за исполнителями. При аварийном завершении исполнителя перезапускает задачу. При превышении заданного времени работы исполнитель убивается, задача перезапускается.
Спасибо за статью. Сначало из-за слоников подумал что тут про php :D
Реализация подобной очереди давно используется в Яндекс.Деньгах и выложена на Гитхабе: github.com/yandex-money-tech/db-queue
Её тестировали на нагрузке до 1000 транзакций (авторизаций) в секунду, и очередь вполне справляется.
Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Изменить настройки темы

Истории