Как стать автором
Обновить
0

Celery — распределенная очередь заданий

Время на прочтение 3 мин
Количество просмотров 84K
На этот раз мы решили рассказать о замечательном продукте, который мы используем в нашей работе. Речь пойдет о Celery — «distributed task queue». Это распределенная асинхронная очередь заданий, которая обладает широким функционалом. В нашем конструкторе сайтов нам часто приходиться запускать асинхронные с точки зрения ответа пользователю задачи. На хабре, к сожалению, не много информации по данному продукту, а он заслуживает отдельного упоминания, это мы и хотим исправить.

Итак, что же умеет Celery:

  • Выполнять задания асинхронно или синхронно
  • Выполнять периодические задания(умная замена crond)
  • Выполнять отложенные задания
  • Распределенное выполнение (может быть запущен на N серверах)
  • В пределах одного worker'а возможно конкурентное выполнение нескольких задач(одновременно)
  • Выполнять задание повторно, если вылез exception
  • Ограничивать количество заданий в единицу времени (rate limit, для задания или глобально)
  • Routing заданий (какому worker'у что делать)
  • Несложно мониторить выполнение заданий
  • Выполнять подзадания
  • Присылать отчеты об exception'ах на email
  • Проверять выполнилось ли задание (удобно для построения Ajax приложений, где юзер ждет факта завершения)

Заинтересовало? Просим под кат.

Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ, но мы пока ограничились ghettoq, через MongoDB. Также поддерживается Redis и РСУБД.

celeryconfig.py:

CARROT_BACKEND = "ghettoq.taproot.MongoDB"

BROKER_HOST = "xxx"  
BROKER_PORT = 27017         
BROKER_VHOST = "celery"    

CELERY_SEND_TASK_ERROR_EMAILS = True
ADMINS = ( ('Admin', 'admin@localhost'), )

CELERYD_MAX_TASKS_PER_CHILD = 5

CELERY_IMPORTS = ("tasks", )
CELERY_DISABLE_RATE_LIMITS = True

CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "xxx",
    "port": 27017,
    "database": "celery",
    "taskmeta_collection": "my_taskmeta_collection",
}


Запуск демона: celeryd -l INFO -B
Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat

Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:

from celery.decorators import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab

@periodic_task(run_every=timedelta(seconds=60))
def mail_queue():
    
    print "Task is executed every minute"
    
@periodic_task(run_every=crontab(hour=0, minute=10))
def transactions():
    print "Task is executed every day on 0:10"
    

@task
def delayed_function(id):
    some_function()
    
@task
def delayed_heavy_function(id):
    some_heavy_function()


Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:

from tasks import delayed_function, delayed_heavy_function

delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд

r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме


Теперь для того чтобы отследить результат и факт завершения последнего задания выполним:

r.ready() # Вернет True если задание отработало
r.result # Вернет значение выполненной функции или None если еще не выполнено(асинхронно)
r.get() #Будет ждать выполнения задания и вернет ее результат(синхронно)

Переменную r можно прогнать через cPickle, положить значение в кеш и аяксом опрашивать статус задания. Либо можно получить task id, и положить в кеш его. Кроме того, task id вы можете задавать самостоятельно, главное чтоб он был уникальным.

После плотного использования celery мы обнаружили несколько ошибок, связанных с отложенным выполнением задач, с менеджером очередей ghettoq, но они все были поправлены автором в день создания issue на github, за что ему спасибо.

Не так давно вышла версия 2.0, которая перестала быть django-зависимой, а интеграция с django теперь вынесена в отдельный подпроект celery-django.

Из ограничений celery можно выделить два, точнее это просто особенности: на штатной FreeBSD worker'ы не будут работать, т.к. там нет питоновкого multiprocessing, хотя в сети есть рецепты по сборке ядра для celery; для перегрузки заданий необходимо рестартовать воркер, чтобы он загрузил новый python-код заданий и связанных функций. На linux работает замечательно.
Теги:
Хабы:
+8
Комментарии 27
Комментарии Комментарии 27

Публикации

Информация

Сайт
cms.biggo.ru
Дата регистрации
Дата основания
2008
Численность
2–10 человек
Местоположение
Россия

Истории