28 April

Эффективные надежные микросервисы

Одноклассники corporate blogHigh performanceJavaDistributed systemsMicroservices


В Одноклассниках запросы пользователей обслуживает более 200 видов уникальных типов сервисов. Многие из них совмещают в одном JVM-процессе бизнес-логику и распределенную отказоустойчивую базу данных Cassandra, превращая обычный микросервис в микросервис с состоянием. Это позволяет нам строить высоконагруженные сервисы, управляющие сотнями миллиардов записей с миллионами операций в секунду на них.


Какие преимущества появляются при совмещении бизнес-логики и БД? Какие нюансы надо учесть, прибегая к такому подходу? Что с надёжностью и доступностью сервисов? Расскажем подробно об этом всём.


Те, кому удобнее видеоформат, могут посмотреть запись, а для всех остальных дальше идёт текстовая версия.



Меня зовут Олег Анастаcьев, я работаю главным инженером в Одноклассниках. А кроме меня, в Одноклассниках есть более 6 000 машин, на которых работает 15 000 задач, как просто на железных серверах, так и в виде контейнеров — в наших собственных четырёх облаках, о которых я ранее уже рассказывал.


Эти задачи предоставляют более двух сотен различных сервисов. Сегодня мы вместе с вами попытаемся построить один из таких микросервисов, связанных с задачей мессенджинга.


Итак, в мессенджере каждый пользователь имеет какие-то чаты. Может создать чат один на один с пользователем, а может сразу с несколькими.



Вариантом чата с несколькими пользователями являются каналы (здесь просто есть ограничение, могут ли пользователи писать или только читать).


Очевидно, что средний пользователь участвует в ограниченном количестве чатов одновременно. По нашим данным, 95% активности пользователя приходится на 5% чатов.


Основное, что хранят чаты — это сообщения. В 80% случаев мы видим в чате не более 13 последних сообщений. На примере выше видно только три и часть четвёртого, но для этой иллюстрации окно браузера было уменьшено — при его обычном размере мы видим приблизительно 13 сообщений.


Но эти цифры не работают, когда нужно получить последние сообщения для отображения в списке чатов:



Маловероятно, что эти сообщения будут востребованы второй раз. В web-клиенте такие сообщения нужно достать только для пары десятков видимых чатов, а для мобильных устройств их может понадобиться значительно больше.


Мы будем строить сервис для хранения сообщений, который сам по себе достаточно масштабный и нагруженный:


  • 600 миллиардов сообщений
  • 5 миллиардов чатов
  • Около 100 терабайт чистых данных без учёта репликации
  • Около 120 тысяч запросов в секунду на чтение
  • Около 8 тысяч запросов в секунду на запись

Как и любой современный мессенджер, наш должен поддерживать полнотекстовый поиск. Введя запрос в поле поиска, можно получить все чаты с соответствующими сообщениями. А это значит, что сообщения нужно периодически обходить для индексирования. Есть и другие задачи, связанные с обходом и анализом полного набора данных, вроде антиспама.


Сами сообщения — это не только текст, но и множество другой необходимой информации:



В разных колонках этой таблицы — ID чата, ID сообщения в чате (я обозначил его временем создания этого сообщения), автор, тип сообщения (оно может быть текстом, видеозвонком, может быть помеченным как спам), далее текст или какие-то данные, относящиеся к сообщению определённого типа, и прикреплённые файлы.


То есть у нас вот такая табличка передаёт структуру этих сообщений:


CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
)

И на эти данные у нас поступают следующие операции:


  • getMessages(viewer, chat, from, to), который для указанного пользователя в указанном чате получает список последних сообщений за определённый промежуток времени. Разные пользователи могут видеть разный набор сообщений (например, не видеть спам).
  • getLastMessages(viewer, chats) получает список последних сообщений для большого списка чатов. Это самый тяжёлый запрос в системе, поскольку он идёт по списку чатов.
  • Естественно, для мессенджинга у нас есть операция добавления сообщения add(chat, message), операция поиска search(viewer, text) и операция индексации сообщений для поиска indexMessages().

Давайте попробуем реализовать это всё с использованием микросервисной архитектуры.




Современные микросервисы


Пользователи с мобилки или с веба (либо напрямую, либо через какие-то фронты) работают с нашим сервисом сообщений, в котором реализована бизнес-логика, через какой-нибудь наш любимый протокол.


Данные же мы храним отдельно в постоянном хранилище. Отказоустойчивом и надёжном, естественно. Мы используем Cassandra.


Для администрирования этой самой Кассандры мы наняли солидного бородатого дядьку DBA, а мы, нашей хипстерской персоной в кепке, отвечаем только за свой сервис:



Всё, пора запускаться. Попробуем для начала реализовать самый частый запрос getMessages().


Для этого нам нужно выполнить приблизительно вот такой запрос в базу данных:


SELECT FROM Messages
    WHERE chatId = ? AND
    msgId BETWEEN :from AND :to

Есть одна небольшая проблема: таких запросов поступает более ста тысяч в секунду. Есть некоторая нагрузка, и сервис у нас начинает ложиться.


Ну что ж. Масштабирование — это то, что мы умеем в микросервисах, правильно? Добавляем инстансов нашего сервиса. И обнаруживаем, что нагрузка проваливается в базу и ложится теперь уже она:



Что дальше? Мы можем либо масштабировать базу, либо воспользоваться знанием, что на 5% активных чатов приходится 95% активности. Будем кешировать.


Вот и бородатый DBA подсказывает, что в нашей базе данных есть кеш часто используемых данных. Пробуем, помогает, но немного.


Оказывается, есть тонкости в других методах сервиса. Вызовы getLastMessages() и indexMessages() работают не только с активными, но с абсолютно всеми чатами. И хоть вызовов мало, они приводят к тому, что в кеш попадают те 95% записей, которые нам в общем-то не нужны, и вытесняют оттуда нужные нам 5%. Встроенные алгоритмы кеширования БД не могут учитывать специфику приложения и чаще всего ограничены каким-нибудь общим алгоритмом, как, например, LRU. То есть в нашем сервисе происходит замусоривание кеша ненужными данными.


Чтобы не увеличивать на порядок количество памяти, расходуемой под кеш базы данных, мы должны управлять кешами явно из прикладного кода, помещая туда только те сообщения и тогда, когда это нужно приложению.


Для этого мы можем использовать любое внешнее хранилище ключей-значений в оперативной памяти, например, Memcached, Redis или Tarantool, полностью сняв кеширование с базы данных. При росте нагрузки можем масштабировать кластер memcache независимо.


Вот мы и пришли к типовой архитектуре, к которой приходят многие при построении нагруженного сервиса: прикладной микросервис без состояния, реализующий прикладную логику и использующий внешнюю БД для постоянного хранения состояния и внешний кеш для кеширования:



Давайте проанализируем потери, возникающие в такой архитектуре.




Потери типовых микросервисов


Первая проблема — затрата ресурсов CPU на маршалинг и демаршалинг. Маршалинг — это процесс преобразования данных, хранящихся в оперативной памяти, в формат, пригодный для хранения или передачи. Прикинем, где он происходит в нашей схеме.


Наше приложение, обращаясь к memcache, должно замаршаллировать данные и запрос. Кеш должен демаршаллировать эти данные, понять, что это за запрос, обработать, замаршаллировать ответ. Приложение же, получив этот ответ, должно его демаршаллировать на своей стороне.


Те же самые операции маршалинга и демаршалинга будут происходить и при взаимодействии приложения с БД:



Насколько велики потери при этом? Интуитивно кажется, что немного.


Есть недавно проведенные исследования. Atul Adya, Robert Grandl, Daniel Myers
из Google и Henry Qin из Стэнфордского университета, переписывая одну из своих систем с подобной архитектуры, замерили это и обнаружили, что маршалинг и демаршалинг увеличивают нагрузку на процессор на 85%.



И это даёт плюс 27% к медианной задержке, что не очень хорошо, если мы строим low-latency сервис.


Вторая проблема — чрезмерные чтения и запись. Они возникают, когда приложение читает больше данных из memcache, чем это необходимо.


В нашем примере с хранением последних 13 сообщений чата в memcache, чтобы получить последнее сообщение из каждого чата, нам пришлось бы прочитать все 13, сериализовать, передать их по сети, десериализовать на стороне сервиса, после чего отбросить 12 из 13, оставив только одно самое свежее сообщение.


Согласно тому же исследованию, если запись содержит только 10% необходимой информации, то мы тратим на 46% больше ресурсов CPU и на 86% больше ресурсов сети, чем это действительно необходимо.


Третья проблема — сетевые задержки. Тут мы теряем время как просто на удалённые походы на memcache и базу данных, так и на время передачи нужного объёма данных в запросе-ответе, особенно в сочетании с излишними чтениями.


Но хуже всего, что эти потери растут прямо пропорционально тому, сколько обращений в кеш и базу данных необходимо для обслуживания единственного запроса от клиента.



Это число N может быть достаточно большим, оно сильно зависит от креативности разработчика. В нашем примере с получением последних сообщений оно легко может достигать десятков.


Кто виноват — разобрались. Теперь давайте разберёмся, что с этим делать.




Чтожеделать


Поскольку большинство нагрузки попадает на memcache и его связь с прикладной логикой, в индустрии в последнее время большинство усилий было направлено на оптимизацию именно этого: memcache и этой связи.


Некоторые компании концентрируют усилия на оптимизации перцентильных задержек memcache до 99.99.


Redis и Tarantool пытаются уменьшить чрезмерные чтения путём поддержки структуры данных на стороне memcache, что позволяет выбирать не всё значение под ключом, а только часть.


NetCache — решение, которое пытается уменьшить сетевые задержки путём построения memcache прямо на инфраструктуре сетевых свитчей.


KV-Direct борется с демаршаллингом путем переноса задач маршалинга и демаршалинга на сетевые карты.


Тут легко заметить, что большинство проблем возникает именно из-за факта удалённости необходимых данных от приложений. И таким образом, мы можем решить проблемы, просто двинув данные к нашей логике. Так мы приходим к микросервису с состоянием.




Микросервис с состоянием


Микросервис с состоянием объединяет прикладную логику сервиса с кешем в памяти одного и того же процесса. Это специализированный кеш, в нём реализуются операции, необходимые для прикладной логики.



Это решает все перечисленные выше проблемы:


Во-первых, позволяет полностью избавиться от потерь, связанных с сетевыми задержками. Теперь между этими компонентами нет сети.


Во-вторых, прикладной интерфейс встроенного кеша позволяет реализовать нужную функциональность без чрезмерных операций чтения/записи.


В-третьих, так как теперь прикладная логика и кеш обмениваются данными в памяти одного процесса, мы можем выбрать приемлемый для обоих формат данных, что позволяет полностью убрать или сильно снизить затраты на демаршалинг.


Совмещение базы данных в одном процессе с сервисом решает абсолютно те же самые проблемы, но просто на второй «ноге» нашей архитектуры. И позволяет просто интегрировать все компоненты в случае необходимости.


Ну и вредный дядька DBA нам больше не нужен. Теперь мы сами себе девопсы и отвечаем за все компоненты системы.


Таким образом, микросервис с состоянием будет всегда быстрее при одинаковой реализации приложения за счёт экономии почти в два раза на потерях, которых в этом случае просто нет. Но не может же быть всё хорошо, правильно? Может, оно будет глючить, если не будет тормозить?


Давайте посмотрим, что там с отказоустойчивостью.




Отказоустойчивость


Давайте вспомним, какие проблемы возможны в простейшей распределённой системе, состоящей из двух машин — клиента и сервера. Клиент посылает запрос серверу, сервер его обрабатывает и возвращает. Что может пойти не так?


1, 2 Это внезапная пропажа участника (сервера или клиента) из сети: остановка приложения, падение, ребут сервера.
3, 4: Потеря исходящего и входящего сообщения.
5 Таймаут. Участник отвечает вне указанных временных рамок.
6 Неправильный ответ — участник отвечает таким значением, которое другой участник не может понять. Например, неправильный формат данных в memcache.
7 Произвольный отказ, он ещё называется византийским. Участник отправляет произвольные сообщения в произвольное время. Это могут быть какие-то умышленные действия, типа различных атак, а могут быть retries, вышедшие из-под контроля.


Итак, давайте посчитаем, какие сценарии отказов будут у нашей системы, построенной как микросервис без состояния. Запрос поступает на наш сервис, в рамках обработки которого он обращается в memcache и базу данных.



Что в этой схеме может отказать? Может ли отказать memcache? Допустим, memcache пишут волшебные эльфы, и его софт не отказывает. Но может отказать машина или сеть (мы пока не рассматриваем репликацию).


База данных? Ещё более волшебные эльфы пишут ещё более волшебно базу данных, она вообще никогда не крэшится. Но сеть и машина всё равно могут отказать.
И memcache и БД могут отказать либо по-отдельности, либо одновременно, в любой последовательности. Естественно, по закону Мёрфи они будут отказывать в самой неблагоприятной.


Таким образом, на обеих «ногах» архитектуры у нас получается по семь сценариев отказа. И наш микросервис тоже может отказать, там уже волшебных эльфов не водилось, и с этим тоже надо как-то иметь дело.



Интересно здесь то, что все эти отказы связаны между собой, они не независимые.


Поэтому для того, чтобы правильно запрограммировать нужную обработку, нам необходимо отдельно рассматривать все отказы и комбинации между ними. Что не делает наш код проще, а вероятность ошибиться и накосячить в таких случаях всегда не равна нулю.


Давайте посмотрим, что будет с отказами в нашей системе, построенной как микросервис с состоянием. Те же самые три машины, организованные по варианту сервиса с состоянием. Здесь схема шардирования будет немного другой. Если в первом случае все три машины отвечали за множество ключей K, то во втором случае каждая из машин будет отвечать только за диапазон равный 1/3 K, которые не пересекаются друг с другом.


Соответственно, коммуникация будет идти только в прикладную логику, ведь кэш и база данных более не требуют сетевой коммуникации. На каждой из этих связей тоже возможны те же самые сценарии отказа.



Но действия, которые необходимо предпринять разработчику при отказе первой трети и остальных одинаковы. Если вы обработали отказы по первой трети, остальные надо обрабатывать точно также. Это потому, что эти отказы никак не связаны между собой в рамках обработки одной операции. А значит, комбинации отказов рассматривать не надо и код становится проще.


Давайте теперь посчитаем вероятность отказа машины. Сначала для микросервиса без состояния. Пока предположим для простоты, что репликации у нас нет.


Что будет, если откажет база данных? Будет ли работать у нас сервис по всем ключам? Не будет. Если откажет memcache или сервис, тоже не будет. То есть при отказе любого из компонентов мы получим недоступность сервиса по всем ключам из множества K.


Значит, если вероятность отказа любой конкретной машины обозначим p, то общая вероятность отказа P будет приблизительно вот такой:


P(K) = 1 – (1 – p)3


В варианте же сервиса с состоянием при отказе любой машины из трех мы получим недоступность только тех ключей, за которые отвечает эта машина. То есть такой будет вероятность недоступности не всех ключей, а только трети:


P(1/3 K) = 1 – (1 – p)3


А чтобы получить отказ по всему множеству K, должны отказать все три машины. То есть формула вероятности отказа для всех ключей сразу будет приблизительно такой:


P(K) = p3


По формулам не очень очевидно, что лучше, для наглядности подставим цифры.


Допустим, у нас вероятность отказа конкретной машины p составляет 0.1 (то есть 10%), это очень много, но здесь мы возьмем эту величину для наглядности:


Для сервиса без состояния получим
P(K) = 1 – (1 – p)3 = 1 – (1 – 0.1)3 = 0.271


А для сервиса с состоянием:
P(K) = p3 = 0.13 = 0.001


Так получается, что архитектура сервиса с состоянием на порядки надежнее.


Конечно, в реальности никто не ставит по одной реплике. Каждая реплика, добавляемая к БД, memcache или микросервису в идеальном случае снижает вероятность отказа каждого компонента на порядок. Но это непринципиально: взаимосвязи компонентов никуда не делись, а поэтому формулы, по которым мы считаем вероятности тоже — мы просто будем подставлять вместо вероятности отказа компонента p = 0.01 для 2 реплик, p = 0.001 для трех, и т.п. Но точно так же мы можем реплицировать и сервис с состоянием, реплицировав каждую уникальную 1/3 K в две (получив вероятность недоступности каждой трети ключей p = 0.01), три (p = 0.001) реплики.


Получается, что сервис с состоянием всегда будет надёжнее при одинаковом и даже меньшем количестве машин.




Вперёд, к реализации!


С чего начнём? Мы хотим:


  • Чтобы наш сервис был высокодоступным. Значит, должна быть обеспечена репликация и определены какие-то гарантии консистентности данных между репликами.
  • Чтобы он был масштабируемым, и при увеличении нагрузки мы могли на ходу добавлять мощности. А поскольку сервис теперь совмещён с данными, решардинг, то есть перераспределение данных по масштабируемым нодам, тоже должен поддерживаться.

Конечно, это можно реализовать самим, это интересная задача, работы тут примерно на несколько лет, но намного быстрее взять что-то готовое и допилить. Слова «репликация», «консистентность» и «решардинг» намекают, что мы можем взять что-то в современных БД, поэтому в первую очередь надо определиться с ней.


У нас есть еще одно важное требование к базе данных — она должна быть на языке приложения. Это позволит нам минимизировать демаршалинг и упростить интеграцию с нашим приложением. У нас в Одноклассниках почти всё пишется на Java, соответственно, мы хотим иметь базу данных на Java. Также БД должна иметь открытый код: мы собираемся её использовать нестандартным способом — а значит, будем что то допиливать.


Итак: open source, на Java, высокодоступное масштабирование — исходя из всего этого, в качестве БД мы и выбрали Cassandra.


Теперь её нужно встроить в наш сервис. Если посмотреть скрипты запуска Cassandra, становится понятно, что запуском занимается класс CassandraDaemon.


А значит, всё, что нам нужно сделать — включить необходимые библиотеки в classpath нашего сервиса ( -cp cassandra/lib/*.jar ) и вызвать Кассандра-демона вот таким заклинанием:


System.setProperty( "cassandra.config", "file://whatever/cassandra.yaml" );
CassandraDaemon.instance.activate();

В первой строчке мы устанавливаем, откуда CassandraDaemon будет брать конфигурацию (тот, кто запускал Cassandra, знает, что cassandra.yaml — это её конфигурационный файл). Во второй строчке делается вызов activate(), которым мы и запускаем ноду базы данных.


Файл сassandra.yaml использовать необязательно, есть интерфейс ConfigurationLoader, который можно реализовать, чтобы интегрировать ноду Cassandra с системой централизованного управления.


Вот и все — мы встроили базу данных в наш сервис — а значит теперь у нашего сервиса есть состояние. Давайте посмотрим, как это меняет маршрутизацию клиентских запросов.


Маршрутизация запросов


Предположим, у нас есть пользователь Лёха. Он хочет попереписываться в чате, ключ которого попадает в определённое множество ключей B. Для этого Лёха использует какой-то фронт, который должен решить, куда отправить запрос Лёхи для обработки. Для сервиса без состояния фронту всё равно, какой конкретно работоспособный инстанс сервиса выбрать, последовательные запросы Лёхи могут в итоге попасть на каждый из них:



Со всех этих инстансов цена обслуживания запроса будет одинаковой. Каждому нужно будет отправлять запросы за необходимыми для обработки запросов данными по сети на кеши и на базу данных.


А у микросервисов с состоянием каждый из инстансов сервиса имеет какую-то часть данных локально — один из интервалов ключей, которые я обозначил A, B и C. И теперь инстансу, владеющему интервалом ключей В, незачем куда-то ходить по сети: все данные для выполнения запросов доступны либо в памяти процесса, либо на локальных дисках.


У остальных же инстансов нет данных нужного Лёхе интервала B, и им придётся доставать эти данные по сети. А значит, цена обработки запроса сразу увеличится на демаршалинг и сеть.



Для того, чтобы запрос по интервалу ключей обрабатывался эффективно, мы не должны обращаться к нелокальным данным. А сделать мы можем это только в случае, если клиенты будут вызывать сразу нужный инстанс сервиса B. Нам нужна маршрутизация запросов, учитывающая топологию кластера, то есть информацию о распределении данных по нодам.


Мы можем реализовать такой роутинг на фронте в виде библиотеки. Она делала бы запросы по ключам сразу на нужные ноды, и эти ноды могли бы работать только с локальными данными.


Распределение данных диктует база данных. Мы только что встроили Cassandra в наш сервис. Давайте посмотрим, как она распределяет данные.


В Cassandra каждый ряд имеет ключ, составленный из двух компонентов:
Partition Key (chatId) — на основании него выбирается нода.
Clustering Key (msgId), который в распределении данных по нодам не участвует, но определяет порядок, в котором записи будут упорядочены внутри партиции.


CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
) 

Записи с разным Partition Key попадают в разные партиции и могут быть распределены на разные ноды. А записи, где различается только Clustering Key, находятся в одной партиции и никак не могут быть распределены по разным нодам — то есть всегда будут находиться вместе. Подробнее об этом можно почитать тут.


Поскольку в мессенджере порядок следования сообщений в чате важен, а порядок разных чатов между собой — нет, то для нас в качестве Partition Key правильно будет выбрать ID чата, а Clustering Key — ID сообщения.



С точки зрения кода в распределении данных в Cassandra участвуют следующие компоненты:


  • Partitioner — на основании значения Partition Key вычисляет токен, все доступные значения которого принято отображать в виде кольца. Partitioner задаётся один раз на весь кластер и не может быть изменён впоследствии. Среди всех вариантов Partitioner наиболее общеприменим тот, который получает токен путем применения hash-функции murmur3 к значению ключа.
  • TokenMetadata — глобальная структура, в которой хранится разбиение кольца на интервалы значений токенов и соответствие этих интервалов нодам, на которых хранится соответствующая токену партиция.
  • Replication Strategy определяет, как организуется хранение реплик партиции и сколько их вообще должно быть. Можно указать разные Replication Strategy для разных таблиц, поместив их в разные keyspace.

Этой информации достаточно для того, чтобы в три строчки составить полную карту нахождения всех данных и их реплик в нашем кластере.


SortedMap<Token, List<InetAddress>> endpointMap = …

AbstractReplicationStrategy replication = …

for (Token token : tokenMetadata.sortedTokens() ) {
    endpointMap.put( token, replication.getNaturalEndpoints( token ) );
}

В приведённом примере endpointMap будет содержать соответствие между максимальным токеном интервала и адресами всех реплик попадающих в интервал ключей. Передав этот endpointMap клиенту, легко сделать нужную нам маршрутизацию. Стоит иметь в виду, что топология кластера меняется время от времени, ноды добавляются и уходят, поэтому эту информацию нужно периодически обновлять.


Мы, например, обновляем её раз в 30 секунд. Также стоит подумать о том, что делать, если клиент вызывает сервис, пользуясь устаревшей топологией. Например, вскоре после расширения кластера, запросы могут уйти не на те ноды. Тут могут быть несколько вполне очевидных стратегий: и редирект, и push-нотификации с новыми нодами. Стоит подумать, какая конкретно подходит под задачу.


С клиентами теперь всё нормально, давайте разберёмся, как нам работать с данными.


Работаем со встроенной БД


У нас есть вот такая таблица:


CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
) 

И нам нужно получить список сообщений, то есть выполнить вот такой запрос:


SELECT FROM Messages
    WHERE chatId = ? AND
    msgId BETWEEN :from AND :to

Возможно, отфильтровав ненужные записи потом.


Как мы сделали бы это через драйвера базы данных? Вот пример, который я взял прямо из мануала.


import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;

try (CqlSession session = CqlSession.builder().build()) { // (1)
    ResultSet rs = session.execute("select release_version from system.local"); // (2)
    Row row = rs.one();
    System.out.println(row.getString("release_version")); // (3)
}

Что тут происходит (в строках, помеченных числами 1-3):


  1. Строим сессию с кластером, устанавливаем соединение с кластером
  2. Выполняем запрос, получаем ResultSet
  3. Используем его

На самом деле всё немного сложнее, потому что нужно где-то хранить конфигурацию куда на какой кластер ходить, работать с сессиями и прочее.


Для работы же со встроенной в наше приложение Cassandra мы будем работать с классом QueryProcessor, у которого есть метод execute , который достаточно вызвать для выполнения запроса:


package org.apache.cassandra.cql3;

import java.nio.ByteBuffer;

public class QueryProcessor
{
    public static UntypedResultSet execute(String query,
        ConsistencyLevel cl, Object… values)
        throws RequestExecutionException
}

Обратите внимание, что он — статический. Почему? Дело в том, что нам не нужна никакая сессия ни с каким кластером. Наша локальная нода и есть уже член кластера, ей никуда не нужно устанавливать соединение. Поэтому этот метод статический, вы его вызываете и, естественно, вызываете в контексте своего собственного кластера.


Соответственно, для getMessages() получится примерно такой код выполнения этого запроса:


UntypedResultSet rs = QueryProcessor.execute(
        "SELECT * FROM Messages "
        + "WHERE chatId = ? AND msgId < ? AND msgId > ?"
        ConsistencyLevel.QUORUM, chatId, from, to );

rs.forEach ( row -> {} );

Аналогичным способом для add() мы должны выполнить запрос добавления записи приблизительно таким же образом:


QueryProcessor.execute( "INSERT INTO Messages VALUES (?,?,?,...)", cl, values );

Всё просто. Если покопаться в QueryProcessor, можно найти и более эффективный способ получения записей из базы данных. Он не так просто выглядит, поэтому здесь я его не показал, но даже этот вариант будет быстрее стандартного драйвера за счет экономии на маршалинге и сетевой задержке.




Кеш сообщений


Как мы определились ранее, скорости БД нам не хватает и мы решили реализовывать прикладной кеш сообщений. Для начала давайте посчитаем, сколько данных он будет хранить. Итак, у нас всего:


  • 600 миллиардов сообщений
  • 100 терабайт данных
  • 5 миллиардов чатов
  • Из этих чатов только 5% активных
  • И из каждого мы хотим хранить в кеше 13 последних сообщений

Все исходные данные есть, можем посчитать, что в память у нас попадает довольно большое количество сообщений и данных:


  • 3+ миллиарда сообщений
  • 250 миллионов чатов
  • 500 гигабайт данных

Это без учёта репликации. Нужно их ещё поделить на количество нод в кластере и умножить на replication factor.


Такой кеш сообщений должен обладать прикладным интерфейсом, соответственно, мы должны реализовать какие-то методы из нашего списка (getMessages(), getLastMessages(), add(), search(), indexMessages()). На практике нужны только первые три, потому что мы хотим, чтобы поиск и индексация никогда ходили в кеш.


Начнём с реализации getMessages() на примере чатика Лизы с Лёхой:


  • Пользователь Лиза открывает ok.ru и выбирает чат с Лёхой. То есть, мы через веб-фронт получаем на некоторый инстанс запрос getMessages().
  • Далее прикладной код проверяет, есть ли этот чат у нас в кеше.
  • Обнаруживает, что нет, вызывает QueryProcessor, получает данные из БД.
  • Помещает его в кеш.
  • Наконец, возвращает актуальное состояние чата Лизе.

Вот эти пять шагов:



Лёха через мобильное приложение делает то же самое и удачно попадает на другую реплику.


Выполняет тот же самый запрос, точно так же в кеше другой реплики данных нужного чата нет, загружаем его, получаем актуальное состояние чата:



Заботясь о состоянии Лёхи, Лиза предлагает ему сходить пообедать, то есть вызывает метод add(). Нода помещает запись о новом сообщении и в базу данных, и в кеш:



Теперь в кеше этой реплики у нас три сообщения, а в кеше Лёхиной по-прежнему два:



Лёха, получив пуш на телефон, спрашивает getMessages(), обращается в кеш. А там уже есть данные, но они устаревшие. Мы получаем их, и Лёха остаётся голодным:



Актуальность данных


Очевидно, мы должны как-то сообщать другим репликам, что появилось новое сообщение.


Мы можем использовать информацию о топологии кластера: точно так же, как мы это делали на клиенте, вычислить все реплики и нотифицировать их все с новым сообщением. Например, вызвав какой-то спецметод added():



И тогда мы вставим новое сообщение кеши всех остальных реплик, и на всех репликах всегда будет актуальное состояние кешей. Или нет?


Проблема в том, что этот запрос пойдёт через сеть на другую реплику, а, следовательно, он может не пройти:



И что на самом деле произошло с репликой, до которой не удалось доставить это сообщение, мы не знаем. Может быть, она крэшнулась? У нее вылетели диски? Может, какие-то нарушения в сети произошли?


Время восстановления удаленной реплики спрогнозировать невозможно, а ждать до бесконечности завершения операции add() мы не можем. Однозначно то, что при такой схеме не все нотификации будут достигать всех реплик, поэтому для некоторых записей состояние кешей будет устаревшим. А такая несогласованность имеет тенденцию к накапливанию со временем.


Давайте посмотрим, как эту же ситуацию будет отрабатывать Cassandra. При выполнении INSERT она тоже определит необходимые реплики данных, которые нужно изменить, сформирует для них требования об изменении (мутацию) и попытается послать это требование по сети.



Поскольку наш кеш и нода желтой реплики — это один процесс, то такая мутация не может быть доставлена точно в силу тех же самых сетевых и прочих проблем.



В этом случае Cassandra сохраняет такие мутации локально и персистентно.


Когда связь с желтой репликой снова восстановится, такие пропущенные мутации будут отосланы ей при первой же возможности. Это называется Hint.



Обычно такие пропущенные мутации сохраняются некоторое ограниченное время, иначе место на дисках работоспособных нод могло бы закончиться из-за одной-единственной достаточно долго не работающей ноды.


Для восстановления согласованности в случаях долгой неработоспособности ноды служит Read Repair — сравнение реплик при чтении. В результате обнаружения расхождения версий формируется мутация на устаревшие реплики.



И, наконец, Streaming Repair — это пакетный фоновый процесс обхода и сравнения всех или какой-то части данных всех реплик.


В результате такого процесса формируется файл, содержащий несовпадающие между репликами данные, который передаётся на устаревшую ноду целиком через отдельное соединение. Это называется Streaming.



Очевидно, что мы могли бы всё вот это реализовать сами. Это ещё одна интересная задача на несколько лет.


Но мы с вами люди занятые, давайте посмотрим, что мы хотим. Мы хотим, чтобы, когда база данных любым способом (Hint, Read Repair или Streaming Repair) получает данные, локальная нода база данных могла бы нотифицировать нашу прикладную логику, а она в ответ на это могла бы сделать необходимые действия и добавить новые данные в кеш.



То есть мы хотели бы определить такой интерфейс для listener'а изменений в Cassandra и реализовать его в нашей прикладной логике:


interface ApplyMutationListener
{
    void onApply(ByteBuffer key,
        DeletionTime deletion,
        Iterator<Unfiltered> atoms);
}

Естественно, сам по себе такой интерфейс работать не будет, и нам надо дописать в Cassandra немного кода. Оказывается, таких мест всего два.


Все мутации, независимо от того, какая подсистема их сформировала, проходят через вот этот метод.


package org.apache.cassandra.db

public class Keyspace
{
    public void apply( Mutation mutation,
        boolean writeCommitLog,
        boolean updateIndexes.
        boolean isDroppable )

// ...
}

Объект Mutation здесь содержит список новых данных для различных таблиц ноды. Соответственно, можно просто брать эти данные и передавать его в listener.


Весь Streaming, независимо от причины, по которой он был запущен (из-за Repair, бутстрапа новых нод в кластере или из-за выемки нод из кластера), заканчивается вот таким OnCompletionRunnable:


package org.apache.cassandra.streaming

public class StreamReceiveTask extends StreamTask
{
    // holds references to SSTables received
    protected Collection<SSTableReader> sstables;

    private static class OnCompletionRunnable implements Runnable {

    // ...
    }
}

Взяв список таблиц, достаточно просто пройтись по ним итератором и передать данные в наш ApplyMutationListener.


Потеря состояния кеша


Давайте рассмотрим ту же ситуацию с другого ракурса: мы добавляем запись, мутация не может быть доставлена, но не потому, что были какие-то временные проблемы на сети, а потому, что сервер пошел в ребут.



Это означает, что состояние нашего кеша в памяти пропало. И тогда Hints нам не помогут, потому что они содержат только новые данные:



А для получения старых мы должны были бы на getMessages() их дочитать из базы данных. После старта это привело бы к лавинообразному пику чтений из базы данных, и есть шанс, что мы просто уложим весь кластер возросшей нагрузкой.



Очевидно, мы должны сохранять текущее состояние кеша для последующего восстановления. В системах, ориентированных на хранение данных в памяти, для этого часто используют периодический снапшот кеша и постоянно пишущиеся WAL. Тогда в момент старта состояние кеша можно восстановить, загрузив последний снапшот и дочитав изменения с момента его записи из WAL. В Cassandra подобное, оказывается, уже есть, и нам это отдельно программировать не нужно. Мы можем создать вот такой keyspace, указав для него недокументированную стратегию репликации LocalStrategy:


CREATE KEYSPACE Caches
    WITH REPLICATION = {
    'class': 'LocalStrategy'
    }

LocalStrategy устанавливает, что таблицы такого keyspace не нужно реплицировать, это локальные данные ноды.


И в таком keyspace мы можем создать вот такую таблицу:


CREATE TABLE Caches.MessagesSnapshot (
    rowkey blob,
    value blob,
    PRIMARY KEY ( rowkey )
)

Дальше мы на каждый put в кеш можем делать в неё запись о том, что у нас есть новые данные в кеше, а на каждый eviction из кеша — удалять из неё запись:



И тогда при старте приложения мы проверяем, пустой ли наш кеш, если пустой, мы его грузим таким вот способом:


SELECT * FROM MessagesSnapshot

Загружаем все старые данные с диска в кеш, а через хинты к нам приезжают новые данные — те, которые нода приложения не видела, пока лежала.


Всё это происходит достаточно быстро для нашей задачи — около полумиллиона чатов в секунду — что позволяет загрузить кеш с диска за несколько минут.


Но быстро — понятие относительное, и мы не очень хотим ждать вот этой загрузки с диска при плановом рестарте (допустим, из-за выкладки новой версии). Это можно сделать с помощью разделяемой памяти.


Разделяемая память для нас интересна тем, что данные в ней не пропадают при рестарте приложения. Для нас самый простой способ получить туда доступ — открыть mapped byte buffer например на такой файл: /dev/shm/msgs-cache.mem.


Но лучше использовать уже готовую структуру, например, доступную в нашей библиотеке one-nio, которая уже давно есть на GitHub. Обратите внимание на SharedMemoryMap и его потомков, там есть кеши на разный вкус и цвет, умеющие использовать SharedMemory.


Этот класс можно использовать, замонтировав /dev/shm под tmpfs, обычно это делается по дефолту. Но чаще мы используем hugetlbfs, что позволяет нам делать более крупные страницы в памяти на уровне операционной системы, а, значит, уменьшить количество TLB misses и ускорить работу с памятью.


Так, при плановом рестарте данные никуда ниоткуда грузить не нужно, SharedMemoryMap очень быстро только проверяет валидность данных в разделяемой памяти, это происходит практически мгновенно.


Ожидание согласованности


Итак, наша ситуация с рестартом реплики: теперь старт стал быстрым, и часть запросов успевает проходить до того, как придут хинты. Иии… Лёха снова получает устаревшую версию кеша и снова остаётся голодным.



Почему так? Процесс посылки хинтов в Кассандре асинхронный by design. Поскольку при чтении Cassandra обращается к нескольким репликам для определения согласованности данных и может отбросить устаревшие данные с рестартнутой ноды, хинты не обязательны для соблюдения согласованности.


Наше же приложение в 15 раз больше нагружено именно на чтение, чем на запись, и вариант разрешения согласованности при чтении для нас слишком тяжёл.


Поэтому мы делаем так: Мы запускаем сначала сетевые сервисы нашей встроенной БД, ждём, пока она примет хинты от остальных нод кластера, и только после этого запускаем обслуживание прикладных запросов сервиса. С этого момента на сервис могут поступать запросы от клиентов, нода теперь находится в согласованном состоянии.



В принципе, такое ожидание, получение всех нод — это задача распределённой координации. Мы должны координировать все реплики и решить для себя, все ли хинты мы получили.


Мы пробовали применять несколько различных стратегий, но в итоге остановились на самой простой: нода как после старта периодически опрашивает другие ноды, которые могут иметь реплики, «есть ли хинты для меня». И повторяет этот запрос до тех пор, пока все ноды не ответят «Нет». После этого старт продолжается.


Логика на самом деле не настолько простая: нужно учитывать отказы остальных нод, которые могут произойти в момент ожидания, изоляцию ноды от всех реплик тоже нужно учитывать, массовые аварии нужно учитывать, но всё это решаемые вопросы.




Мы хотим большего!


С этим разобрались, поехали дальше.


Теперь мы можем подступиться к самому тяжёлому запросу getLastMessages(chats[]) — получить последнее сообщение для каждого чата из списка.


Этот запрос должен рассмотреть списки сообщений по множеству чатов, разные чаты могут быть расположены на разных нодах. А значит, нет одной ноды, владеющей всеми данными.


В большинстве случаев загружаются данные неактивных чатов, а значит, этих данных в кеше нет. И загружать их в кеш смысла мало, потому что маловероятно, что они станут активными в ближайшее время.


Данные таких чатов, если они не в кеше, скорее всего, уже согласованы. Так как на старых данных, скорее всего, уже произошли все необходимые процедуры согласования в Cassandra: и hints, и read repair, и streaming repair.


А если чат становится активным, то он будет помещён в кеш и перестанет быть старым. Возможно, мы что-то сможем сэкономить на этом в будущем, посмотрим.


Как бы мы реализовали этот метод в сервисе без состояния? Клиент вызвал бы в сервисе метод, прикладная логика которого опросила бы все memcaches для каждого чата из списка, и базы данных для тех старых чатов, которые уже были вытеснены из кеша:



В сервисе с состоянием нет единого сервиса с полным набором данных, а проксировать вызовы с одного инстанса на другие неэффективно. Наиболее эффективно иметь реализацию этого метода на всех участниках — и на сервисах, и на фронте клиента.


Такая «клиентская» реализация использует информацию о топологии кластера и разбивает список ключей чатов на несколько списков (для каждого инстанса сервиса свой), выбирает реплику и вызывает её, передав ей список с её ID чатов.



То же самое с остальными репликами — сервисы отрабатывают запросы и возвращают данные (каждый свою часть), а клиент, дождавшись ответа по всем нужным ключам, объединяет результат.


Такая реализация выглядит сложной, но не обязательно писать её каждый раз руками, мы вполне можем реализовать такой алгоритм один раз для всех подобных случаев.


Локальные чтения


Внутри же каждой реплики выполнение было бы таким. Бизнес-логика обращается в кеш, оттуда получает данные для активных чатов, а из базы данных мы запрашиваем неактивные. Но, поскольку наша база данных распределённая, при обработке запроса будет происходить сетевой запрос для возможного согласования реплик данных, а это дополнительная сетевая коммуникация:



Мы знаем, что согласованность на старых чатах нам не так важна, скорее всего, данные согласованы. Поэтому мы могли бы убрать большую часть сетевой коммуникации в нашем самом тяжёлом запросе, запросив данные у базы данных только с локальной реплики:



В стандартном API Cassandra нет способа сделать такое. Но наш-то сервис работает вместе с Cassandra в одной и той же JVM, поэтому он не ограничен публичным API.


Мы можем выполнить какой-то такой код:


public Message getLastMessage(Long chatId) {

    Prepared prepared = QueryProcessor.prepareInternal( "SELECT … WHERE chatId=?" );
    SelectStatement select = (SelectStatement) prepared.statement;

    QueryOptions opt = QueryProcessor.makeInternalOptions( prepared, chatId );

    ReadQuery query = select.getQuery( opt, FBUtilities.nowInSeconds() );

    UnfilteredPartitionIterator partitions = query.executeLocally( query.executionController() );

    UnfilteredRowIterator partition = partitions.next();
    partition.forEachRemaining( atom -> { /* unmarshalling code */ } );
}

Здесь очень много какой-то магии, но на самом деле всё то же самое: мы делаем statement, передаём параметры, а отличие только в вызове executeLocally(). Он значительно быстрее, так как много накладных расходов, связанных с сетевой коммуникацией, просто не возникает.


Полнотекстовый Поиск


Теперь мы добрались до самого сложного: полнотекстового поиска. Мы хотим ввести что-то в поисковой строке и быстро получить список чатов и сообщений.



Искать простым просмотром всех чатов, даже с использованием локального чтения — не вариант. Сто терабайт данных быстро не просмотришь даже на кластере из ста машин (если, конечно, пользователь не один-единственный и очень терпеливый).


Соответственно, нам нужно построить полнотекстовый индекс с использованием, например, Apache Lucene.


Один большой инвертированный индекс на сто терабайт будет глючить и тормозить. Это отдельная интересная тема, но обсуждение этого выйдет далеко за рамки этой статьи ( о проблемах в эксплуатации больших индексов можно посмотреть еще тут ). Поэтому мы хотим строить много отдельных индексов — для каждого чата свой собственный.


Строить индексы заранее ( а значит и хранить их ) стоит только для больших чатов, для маленьких чатов мы можем построить эти индексы в момент поиска.


Итак, как бы мы могли это сделать. Давайте попробуем по-простому. При добавлении записи в методе add(), как только мы вставляем сообщение в таблицу, и, как часть бизнес-логики, вызываем IndexWriter.addDocument(), и IndexWriter.commit():



Но есть одна большая проблема: мы сделали слишком много коммитов, восемь тысяч в секунду.


А слишком частые коммиты приводят к тому, что у нас будет слишком много сегментов этого индекса. А для того, чтобы сегменты слить, требуется дисковая операция. То есть мы таким способом уложим диски нашей ноды.


Не получается. Хорошо. Вызовем IndexWriter.addDocument(), но вызывать IndexWriter.commit() не будем. Будет работать? Недолго, поскольку переписка идёт в реальной системе по множеству чатов одновременно, у нас будет открыто очень много индексных сегментов множества чатов. Они будут в открытом состоянии и соответственно будут жрать память, и мы уложим нашу ноду по памяти.


Что же делать? То, что мы на самом деле хотели бы, это какой-то фоновый процесс, который предварительно объединял бы множество сообщений одного чата в один пакет по мере их появления. Но не очень часто, так, чтобы мы могли успевать коммитить индексы, чтобы они успевали мерджиться друг с другом.


Такой процесс в Cassandra уже есть, нам не нужно делать его самим. Он называется Compaction. Это процесс слияния различных поколений данных, который Cassandra делает сама по себе для поддержания необходимой ей структуры данных на диске.


Данные при этом в пределах чата просматриваются в порядке следования их Clustering Key, что для нас означает, что, вклинившись в этот процесс, мы получим данные в нужном нам порядке.


В Cassandra этим процессом занимается вот такой класс:


package org.apache.cassandra.db.compaction;

public class CompactionManager implements CompactionManagerMBean {
// …
}

Дописать туда небольшой дополнительный процессинг нетрудно, и теперь у нас есть бесплатная массовая обработка данных, причём локальная, которую можно использовать в этом и других подобных случаях.


Когда пользователь делает поиск, мы используем основной индекс, сформированный в процессе Compaction, а для того, чтобы искать и среди свежих сообщений — доиндексируем сообщения добавленные с момента последнего компакшена непосредственно перед поиском. Надеюсь в будущем расскажем о том как работает поиск более подробно.




Эксплуатация


Понаписали мы тут кода, давайте теперь попробуем с этим всем пожить и поэксплуатировать.


Первое, с чем столкнёмся — увеличившееся время раскатки новой версии.


Это происходит из-за того, что теперь наш сервис стартует дольше. Почему?


  • Потому что теперь у нас в сервисе есть база данных, а ей нужно инициализировать свои данные на дисках. То есть скорость старта зависит от скорости диска с нашими данными и скорости проигрывания write-ahead лога.
  • Вторая часть, которую нам нужно ждать — загрузка кеша из той самой локальной таблицы Caches.MessagesSnapshot. Тут зависит от размера этого кеша — чем больше, тем дольше ждать. Ещё влияют коллизии ключей в кеше и мощность CPU.
  • Мы должны подождать согласованности, то есть, подождать, пока все пропущенные хинты к нам придут.
  • И время выкладки растет пропорционально количеству обновляемых реплик.

Что будем делать?


Если раньше мы выкладывали реплики по одной, то имеет смысл параллелизовать выкладку реплик по зонам доступности.


Мы делаем так: сначала выкладываем одну реплику для проверки того, что новая версия работает, а затем все остальные реплики первого дата-центра мы выкладываем параллельно. Потом параллельно перекладываются все реплики второго дата-центра, затем все третьего.


Вторая проблема: более частые рестарты базы данных.


Понятно: база данных теперь вместе с приложением, и когда мы обновляем приложение, то выключаем и его базу данных и перезапускаем по дата-центрам.


Внутренний DBA нам говорит: нельзя перезапускать базу данных.



Можно уволить DBA из компании, но нужно ещё выжечь его внутри себя. Что же делать?


А на самом деле, ничего не делать, это хорошо. Это позволяет нам регулярно тестировать отказоустойчивость нашего сервиса в контролируемых условиях — в тот момент, когда мы на него смотрим и делаем контролируемые перезапуски. Лучше так, чем Вселенная протестирует отказоустойчивость сама, ночью, когда у нас внезапно выключится дата-центр.


А вот что является реальной проблемой — взаимное влияние прикладного кода на базу данных.


Опять же, база данных у нас теперь вместе с приложением, в той же самой JVM. И в нашей прикладной логике можно легко написать что-нибудь такое, от чего, например, закончится хип. И, выложив на все базы данных наш новый классный код, который пожирает память огромными темпами, мы можем уложить весь кластер базы данных.


Что делать? Когда есть такой код, или есть подозрение на него, или просто в качестве дефолтной практики можно делать так: нам нужно выложить наш новый код на один дата-центр и некоторое время подождать, что же будет.


Максимум, что мы можем сломать — это один дата-центр, но все остальные будут работать и обслуживать клиентов. А мы сможем откатить проблемную версию, если что-то пошло не так.


Второй вариант — если есть какая-то совсем опасная ситуация, то у вас должны быть рубильники, которым вы можете на ходу отключить опасную функциональность.


Ну и ещё одна фобия, с которой придётся столкнуться — это апгрейд на новую версию встроенной базы данных.


Мы тут понаписали всякого кода и поиспользовали внутренние API, которые незадокументированны. Что говорит наш внутренний DBA: они же всё поменяют там в этом незадокументированном API, мы застрянем на старой версии и всё потеряем! Что же делать?


На самом деле, архитектура базы данных остается неизменной годами. Наблюдение за Cassandra, за развитием их внутреннего API, показало, что сущности как были, так и остались. Максимум, что поменялось — они переименовываются время от времени.


Хорошие распределённые базы данных, в том числе и Cassandra, поддерживают rolling upgrades. Для них нормальная ситуация, когда на некоторых нодах кластера стоят разные версии базы данных. И это хорошо. Потому что для нас в микросервисах c состоянием апгрейд базы данных означает просто то, что мы подключаем нашу новую базу данных как новую версию встраиваемой библиотеки, выкатываем всё это на один дата-центр, ждём, что сломается, и откатываем, если что-то сломалось.


Если мы ещё и данные умудрились запороть — тоже небольшая проблема, мы можем их откатить и восстановить эти данные из других дата-центров с помощью штатной процедуры замены отказавшей ноды.




Итог


  • Микросервисы с состоянием эффективнее и надёжнее, потому что у них отсутствуют потери на маршалинг и нет сети между компонентами.
  • Теперь у нас есть бо́льшие гарантии консистентности кеша. Поскольку мы совмещаем кеш и базу данных в одном процессе, мы можем гарантировать, что наш кеш будет консистентен с базой данных. Вопросы консистентности кеша часто вообще не адресуются в микросервисах без состояния.
  • Всё это относительно просто реализуется, самое сложное есть в Cassandra и в one-nio.
  • Мы это давно и широко используем. Множество сервисов построены именно по этому паттерну, есть высоконагруженные, есть сервисы, в которых очень много данных: лента, обсуждения, посты, нотификации, классы.

Если вы хотите узнать больше об Одноклассниках или о том, как применяется этот паттерн, то вы можете посмотреть еще вот эти доклады:


Tags:микросервисыstatefulcassandra
Hubs: Одноклассники corporate blog High performance Java Distributed systems Microservices
+35
9.2k 87
Comments 22
Top of the last 24 hours