Как стать автором
Обновить

Stackless Python и Concurrence

Время на прочтение8 мин
Количество просмотров14K
Перед тем, как перейти собственно к возможностям Stackless и Concurrence, рассмотрим самый простой способ написать сетевое приложение, обрабатывающее несколько одновременных соединений:

socket()
bind()
listen()
accept()
fork() ->
    read()
    write()
    ...
    close()

Под каждое новое входящее соединение процесс создаёт свою копию через fork(). Это чрезвычайно накладный способ, у которого, к тому же, есть сложности с синхронизацией между процессами. В простом случае они решаются через создание каналов (pipes) между родительским и дочерним процессами и сериализацию данных. В более сложных потребуются примитивы межпроцессной синхронизации. Вспомним ещё про затраты на создание, разрушение и переключение процессов. Это очень ресурсоёмкие операции — как по памяти, так и по вычислительной мощности. Поэтому обработать много одновременных соединений будет весьма сложно.

Однако у этого подхода есть одно важное преимущество — код получается исключительно простым. Логика работы приложения напрямую переносится в соответствующие конструкции языка — если нужно в цикле получать по сети какие-то данные, то это будет оператор цикла. Если сначала нужно выполнить одно действие, а за ним другое, то это будут просто два последовательных оператора в программе, и так далее.

Если вместо создания новых процессов создавать отдельные потоки в рамках одного процесса, то от части проблем мы избавимся — обмениваться данными между потоками станет намного проще. Для выделения памяти под общие объекты достаточно будет пользоваться обычными средствами языка, безопасно передавать ссылки на общие объекты между потоками и не тратить ресурсы на сериализацию. Это экономит нам много процессорных ресурсов, однако не избавляет от необходимости явной синхронизациии доступа к общим объектам. Кроме того, каждый поток операционной системы имеет свой собственный стек, который занимает несколько килобайт памяти, которые будучи умноженными на количество одновременных соединений, могут занять несколько сот мегабайт. Но если с потерями памяти можно смириться (она стоит дёшево), то вычислительные затраты на создание и разрушение потоков, на переключение контекста и на синхронизацию будут весьма заметны. Вдобавок, над Python висит проклятие GIL, которое ещё больше снижает эффективность многопоточных приложений.

Следующим шагом инженерной мысли в увеличении производительности стали однопоточные приложения, основанные на явном задании конечного автомата. Каждое соединение представляется автоматом, в каждом из состояний которого входные данные обрабатываются каким-либо своим образом, вызывая, в свою очередь, дальнейшие изменения состояния. Поскольку состояние автомата представляет собой лишь небольшую структуру данных, то создавать, хранить и разрушать их оказывается значительно быстрее, чем отдельные потоки операционной системы. И памяти они занимают намного меньше, чем потоки.

В приложениях, основанных на явном задании конечного автомата, главный цикл представляет собой опрос всех открытых соединений на предмет возникновения каких-либо событий — пришли данные, произошла ошибка или освободилось место в буфере отправки. По каждому из событий вызывается обработчик у конечного автомата. Для оптимизации этого опроса (а быстро опросить десятки тысяч соединений — непростая задача) в современных операционных системах предусмотрены различные очень эффективные, но не совместимые друг с другом интерфейсы (kqueue, epoll и другие). Чтобы писать переносимые сетевые приложения, были разработаны специальные библиотеки типа libevent, скрывающие от программиста детали реализации опроса соединений.

Однако, если мы задаём конечный автомат в явном виде (каждое состояние отдельным участком программы), то структура приложения становится сложной и нечитаемой. Фактически, переход между состояниями в этом случае аналогичен использованию операторов goto — когда меняется состояние, нам нужно заново искать по всему исходнику, где находится обработчик следующего состояния. Вот пример структуры приложения, реализующего простой сетевой протокол:

select()
    -> read_ready ->
        read(cmd)
        if state == "STATE1":
            if cmd == "CMD1":
                state = "STATE2"
            else:
                invalid_command()
        elif state == "STATE2":
            if cmd == "CMD2":
                state = "STATE1"
            else:
                invalid_command()

Каждый обработчик никогда не блокируется, и за счёт этого достигается асинхронность обработки данных. Если при такой архитектуре приложению потребуется сделать запрос к базе данных, ему нужно будет отправить запрос, перейти в состояние ожидания ответа и вернуть управление главному циклу. Когда от базы придёт ответ, будет вызван обработчик, который примет данные и обработает их. Если схема запрос-ответ будет многофазной (например в SMTP вызвать connect, отдать управление, дождаться установления соединения, начать ждать данные, дождаться HELO от сервера, отправить свой HELO, отдать управление, дождаться ответа, прочитать ответ и т.д.), то явное задание состояний превращается в кошмар программиста.

При всей сложности реализации, такой подход обладает несомненным преимуществом — он не требует практически никакой синхронизации между обработчиками соединений. В самом деле, переключение между ними производится кооперативно — лишь в моменты, когда они явно отдают управление главному циклу. Обработчик ни при каких обстоятельствах не может быть прерван, а значит, все его операции по доступу к глобальным объектам гарантированно атомарны. Забываем про мьютексы, семафоры и прочие неприятности, на которые тратились драгоценные такты процессора.

Stackless

Есть способ совместить производительность конечных автоматов и простоту первого решения. Как раз для этого нам и понадобится Stackless Python. Stackless Python — это усовершенствованная версия интерпретатора Python. Она позволяет программисту пользоваться преимуществами многопоточного программирования без снижения производительности на примитивах синхронизации и без проблем с «гонками» (race conditions). Если правильно использовать дешёвые и лёгкие микропотоки Stackless, они позволяют улучшить структуру программы, получить более читаемый код и увеличить производительность труда программиста. Посмотрим, как это работает.

С точки зрения программиста, создание тасклета (микропотока в терминах Stackless) ничем не отличается от создания нового потока операционной системы: stackless.tasklet(your_func)(1, 2, 3). Мы запускаем выполнение функции your_func(1, 2, 3) в контексте нового тасклета. Выполнение этой функции продолжится до тех пор, пока тасклет явно не отдаст управление ядру (stackless.schedule()), либо заблокируется, ожидая отправки или получения какой-то информации. Например, тасклет хочет получить данные из сетевого сокета, а они ещё недоступны. В этот момент тасклет встаёт в очередь ожидания ввода-вывода, и управление передаётся следующему по очереди тасклету. Когда придут ожидаемые данные, первый тасклет получит управление и продолжит обработку данных.

По сути, та же самая логика работала и в схеме с конечными автоматами (кооперативная многозадачность, необходимость использования диспетчера сокетов и отсутствие необходимости в примитивах синхронизации для доступа к общим структурам данных), главное отличие — в том, что задача описывается обычным линейным кодом на Python. Для примера, обращения к сетевым сервисам могут описываться так:

val = memcached.get("some-object-123")
if val is None:
    res = list(mysql.query("select val from tbl where id=%d", 123))
    if len(res):
       val = res[0]
       memcached.set("some-object-123", val)

Каждая из сетевых операций (обращения к memcached, базе данных, выполнение HTTP-запросов, отправка писем по SMTP и т.д.) будет приостанавливать выполнение тасклета до получения её результа. Во время ожидания будут выполняться другие тасклеты.

Тасклеты могут отправлять друг другу данные, пользуясь каналами (channels). Канал — это объект, имеющий два основных метода — send() и receive(). Если один тасклет отправляет в канал данные ch.send(some_object), то другой может эти данные получить: some_object = ch.receive(). Если на канале нет ожидающего данных тасклета, то отправляющий будет заблокирован до тех пор, пока данные не будут получены. И наборот, если в канале нет ожидающих данных, то принимающий тасклет заблокируется до их появления. Одним каналом может пользоваться множество тасклетов, каждый из которых может принимать или отправлять данные. Каналы — это основной метод синхронизации между тасклетами. К примеру, если вы хотите реализовать пул из ограниченного числа постоянных соединений с базой данных, то операция взятия соединения из пула может быть такой:

def get():
    if len(self._pool):
        return self._pool.pop(0)
    else:
        return self._wait_channel.receive()

Если в пуле есть свободные соединения, то будет взято одно из них. Иначе тасклет заблокируется на канале и будет ждать, пока кто-нибудь не освободит соединение. Заблокированные на каналах тасклеты не потребляют ни такта машинного времени. Логика каналов автоматически поставит тасклет в очередь планировщика, как только в канал будут положены данные. Операция помещения соединения обратно в пул будет такой:

def put(conn):
    if self._wait_channel.balance < 0:
        self._wait_channel.send(conn)
    else:
        self._pool.append(conn)

Если «баланс канала» меньше нуля, то это означает, что на этом канале ждут данные какие-то тасклеты. В этом случае возвращаемое в пул соединение будет помещено в канал, откуда его тут же выхватит тасклет, который первым встал в очередь, и именно он продолжит выполнение.

Сам Stackless представляет собой систему переключения контекста тасклетов, планировщик, механизм каналов и сериализацию тасклетов, позволяющую сохранять их на диск, передавать по сети, а затем продолжать выполнение с того места, где он был прерван. Есть ещё пакет greenlets, который представляет собой урезанную версию Stackless. В нём реализованы только микропотоки (собственно greenlets), а вся остальная логика, включая планировщик, ложится на плечи программиста. Из-за этого Greenlets немного (процентов на 10-25) медленее, чем Stackless, зато они не требуют специальной версии интерпретатора.

Concurrence

Для написания реальных сетевых приложений нужна библиотека для работы с неблокирующими сокетами, которая будет включать в себя диспетчер сокетов, блокирующий тасклеты на сетевых операциях и продолжающий их исполнение при возникновании сетевых событий. Таких библиотек есть несколько: простая мелочь, Eventlet (только для Greenlets), gevent (только для Greenlets) и Concurrence (для Greenlets и Stackless). Именно про последнюю я и хочу рассказать.

Concurrence основан на libevent, его главный цикл и система буферов соединений реализованы на C и дают отличную производительность сетевых операций. Кроме собственно диспетчера сокетов, Concurrence предоставляет возможность создания таймеров, использования функций типа sleep(s), в нём реализованы многие популярные протоколы (HTTP-клиенты, HTTP-серверы (WSGI), Memcached, MySQL — да-да, настоящая асинхронная клиентская библиотека MySQL, XMPP). Пример, приведённый выше (с обращениями к Memcached и MySQL) написан именно на Concurrence. Вот как сделать с его помощью минимальный веб-сервер:

def hello_world(environ, start_response):
    start_response("200 OK", [])
    return ["<html>Hello, world!</html>"]

def main():
    server = WSGIServer(hello_world)
    server.serve(('localhost', 8000))

dispatch(main)

Функция dispatch запускает главный цикл Concurrence и ставит в очередь самый первый тасклет, выполняющий функцию main. Далее запускается WSGIServer, который и будет принимать соединения. Под каждое соединение запускается отдельный тасклет, выполняющий функцию hello_world. Последняя может быть произвольной сложности и включать в себя любые асинхронные операции. Пока система будет ожидать их выполнения, будут продолжать приниматься новые соединения.

Теперь ложка дёгтя. К сожалению, похоже, Concurrence заброшен и более не поддерживается. На письма автор не отвечает, в том числе и на багрепорты с патчами. Поэтому я взял на себя смелость опубликовать свою версию Concurrence с исправленными багами, которые нашёл, и с несколькими дописанными фичами, в частности, с поддержкой HTTP PUT для WebDAV, с реализованным SMTP-клиентом и поддержкой Thrift. Репозиторий лежит на github.

Всех, кто планирует использовать Stackless, Concurrence или другие технологии асинхронного программирования на Python, приглашаю подписаться на список рассылки ru-python-async.

Ссылки

Stackless Python — www.stackless.com
Истории успеха — www.stackless.com/wiki/Applications
Concurrence — opensource.hyves.org/concurrence
Моя версия Concurrence — github.com/JoyTeam/concurrence
Теги:
Хабы:
+58
Комментарии58

Публикации

Истории

Работа

Python разработчик
142 вакансии
Data Scientist
63 вакансии

Ближайшие события