Badoo corporate blog
High performance
Programming
NoSQL
Comments 32
+2
Cassandra позволяет последовательно сканировать всю таблицу или какую-то её часть. И за счёт этого удаётся избегать неэффективного рандомного обращения к диску.

Вы как-то специально говорите Кассандре, что нужно последовательно сканировать таблицу, или это автоматически получается?

+7
Да, это специальный тип запросов. Внешне в CQL он почти не отличается о обычных, кроме того что в WHERE секции используется функция token(), которая возвращает хэш от ключа, по которому (по хешу) в реальности и упорядочены данные на диске:

SELECT * FROM Table WHERE token(pk) >= :start_token limit 10
SELECT * FROM Table WHERE token(pk) > token('prev_value_pk') limit 10

Это так же дает возможность разбить все данные таблицы на относительно равные сегменты выбрав правильный start_token и end_token из всего возможного диапазона токенов и вычитывать данные параллельно.
+1
Добрый день!

Вы не упомянули или я пропустил
1) вы используете TTL для записи метрик и TWCS? Или как вы чистите устаревшие данные?
2) Как происходит аггрегация в следующий роллап — вычитываете все данные в нужном диапазоне и аггрегируете? Как организован и распараллелен этот процесс?

Заранее спасибо за ответы. Может еще возникнут вопросы.
+5
TTL решили не использовать что бы избежать ситуации когда данные удалились по TTL, но по какой-то причине не переложились в rollups. Сейчас же мы можем приостанавливать даунсемплинг (на время обслуживания, например, поскольку дайнсемплинг работает с CL=ALL) не опасаясь потери сырых данных. Можем увеличивать временное окно в котором данные хранятся сырыми. В общем, процесс под контролем.

А раз не используем TTL то и TWCS отпадает как не эффетивный при явно удаляемых данных.

Данные в points у нас удаляются целыми сегментами (партициями) при перекладывании в rollups в агрегированном виде. А в rollups как таковой нет операции удаления, есть перезапись.

Предвосхищая следующие вопросы:
— мы используем LCS для индекса имен и STCS для всех остальных данных.
— да, для points мы имеем лаг между удалением данных и реальным пуржингом с диска: ~ 1.5 недели
— для rollups трудно точно оценить какую часть данных составляют устаревшие версии переписанных значений (по грубой оценке это 30-40% от всего объема rollups), но мы следим за «средним размером метрики» и она нас устраивает.

Использование TTL возможно и имело бы смысл, но при этом всплывут новые проблемы которые потребуют решения.

В общем, мы пока не получили негативный опыт с нашей моделью данных что бы реально задумываться над использованием TTL.
+1
А вы расматривали kairosDB, есди да? то почему не подошел.
Мyе кажеться что проект имплеметирует именно вашу задачу.
+4
Больше года назад, когда мы рассматривали готовые решения, мы смотрели и на KairosDB. Некоторые наши претензии к OpenTSDB относились и к KairosDB. Но самое главное, KairosDB как минимум не соответствовал требованию «долговечность» — т.е. терял данные из буфера в памяти, когда не мог сбросить его в Cassandra. Кстати, разработчики, как я сейчас вижу, уже исправили эту проблему пару месяцев назад.
-4
Опять вы везде путаетесь в числах.
Вы в декабре говорили
девять серверов;
10 Тб данных;
100 000 значений в секунду;
140 миллионов метрик.


А теперь в одном месте
300 миллионов метрик
200 000 значений в секунду.
16 Тб на 24 серверах

и тут же рядом
180 миллионов метрик;
объём данных – приблизительно 9 Тб;

Как-то уж определитесь какой у вас большой чем хвастаетесь.

Оставшиеся 15 серверов и 120млн метрик где? На старом кластере в rrd?
+4
Rrd: 100к/сек, 15 серверов, 120кк метрик
Cassandra: 100к/сек, 9 серверов, 180кк метрик.

Вернее уже 190 миллионов в Cassandra

Оставшиеся 15 серверов и 120млн метрик где? На старом кластере в rrd?
— совершенно верно
+2
элементы каждого такого ассоциативного массива хранятся на диске по возможности последовательно


почему «по возможности»? И в sstable, и в memtable данные лежат упорядоченно по колонкам.

А как обрабатываете ситуацию, когда данные записались только в часть таблиц, например, в Points данные записались, а при записи в Meta допустим сервис помер…
+2
почему «по возможности»? И в sstable, и в memtable данные лежат упорядоченно по колонкам.

Потому что в случае нескольких sstable часть партиции может оказаться в одном файле, а часть в другом.
+3
А как обрабатываете ситуацию, когда данные записались только в часть таблиц, например, в Points данные записались, а при записи в Meta допустим сервис помер…

Конкретно в ��том сценарии: допустим клиент отправляет одно значение. Мы сперва делаем вставку в meta — запись о том что у нас появился новый сегмент у метрики, потом запись в points. Если сервис упал перед вставкой в points, мы имеем только запись о том что у метрики есть сегмент но он пустой.

Дальше все зависит от клиентского приложения — как он обрабатывает ошибку при отправке в удаленный сервис, ведь ответ об успешной вставке он не получил. В нашем случае мы делаем несколько попыток отправить данные снова. Если лимит попыток исчерпан, то данные остаются в локальном журнале приложения. Для таких случаев у нас есть скрипт который периодически делает replay по таким журналам.

В случае, если по каким-то причинам приложение забивает на ошибку, и не продолжает обновлять метрику, эта метрика остается с пустым сегментом. В нашей модели данных это валидная ситуация, которая не приводит к ошибкам при чтении.

Для метрик, которые давно не обновлялись у нас есть скрипт который периодически сканирует все записи в таблице meta и «подчищает мусор»:

— Во-первых, удаляет все записи из всех таблиц для устаревшей метрики. При этом из таблицы meta данные удаляются в последнюю очередь, так что при падении этого скрипта он в следующий раз все равно дойдет до этой записи и завершит свое дело.
— Во-вторых, проверяются сегменты которые должны были быть давно задаунсеплены, и, в случае пустых сегментов они просто удаляются.

+2
InfluxDB
Это решение казалось идеальным. Не хватало только одного, самого важного, пункта – оно не масштабировалось «из коробки». Впрочем, его open-source-версия не масштабируется и сейчас: авторы работали над кластеризацией больше года и в итоге решили закрыть этот функционал. А жаль, мы очень рассчитывали…

Функционал перенесли в платную Enteprise версию, там есть кластеризация.

Как вариант еще можно нарезать single-instance и балансировать запись в зависимости в зависимости от метрик (метрики Х пишем в этот инстанс, метрики У в другой), а сам интсанс резервировать через Influx Relay, но это конечно не так красиво, как кластеризация из коробки.
+4
Такой вариант уже существовал в виде rrd-кластера (за вычетом резервирования), и менять шило на мыло выглядит не вполне целесообразно.
+1
В сторону kairosdb не смотрели? Это как раз time-series сервис хранящий данные в Кассандре. По сути это дб схема + апи.
+1
В сторону KairosDB смотрели. Я описывал в комментарии выше. Тогда она работала не стабильно. Могла потерять данные, в случае потери коннекта к Cassandra, причем без оповещения клиента. Более подробно мы ее не изучали, так что, подходит ли она нам — это открытый вопрос. В частности, по производительности даунсемплинга. Это надо бы протестировать. Например, в нашей реализации мы не сразу добились приемлемой скорости архивирования.
+2
Справедливости ради, вы сделали некорректную оценку Graphite'а. Конкретнее:
1. У него изначально во всех доступных реализациях есть перезапись точек в прошлом (у вас стоит галочка что нету).
2. У него множество совместимых реализаций с немного отличающимися характеристиками, вы видимо посмотрели бегло на то что в гитхабе graphite-project, без попыток найти решения проблем. Например есть альтернативная реализация демона, осуществляющего запись — go-carbon, которая на одном среднем сервере не напрягаясь выдает 350 тысяч точек в секунду на запись. Кстати проект существует несколько лет уже.
3. Опять же альтернативные реализации стека реализуют и HA и позволяют масштабироваться (см carbonzipper). Проект в OpenSource'е и активно развивается года так с 2013-2014.

В целом интересно не только, сколько вы смогли записать, но и как вы читаете? Я имею в виду какого характера запросы, как много метрик и точек вы выбираете и т.п.
+1
1. У него изначально во всех доступных реализациях есть перезапись точек в прошлом (у вас стоит галочка что нету).

Так действительно заявляют авторы Graphite. Я если честно, засомневался в этом, зная что в whisper-формате хранятся только агрегированные данные. И действительно, когда приходит запрос на запись уже существующей точки, то переписывается вся ячейка архива.

Представьте, что у нас какой-то результат измерения дошел до хранилища с запозданием, когда значение ячейки, в которую он попадает, уже сформировано как AVG от других измерений.

Хорошо, если перезаписываемое значение попадет в самый детальный архив, с минутными ячейками, например. Мы можем даже не заметить каких то искажений в данных, а если «уйти» далеко в прошлое, то мы перезапишем, скажем, часовую ячейку полностью. Т.е. мы потеряем часть данных, а именно вклад всех предыдущих значений, которые составили результирующее значение часовой ячейки.

Так что, как перезапись — это действительно сработает. Но пользы от этого нет: что бы корректно это использовать надо где-то (где?) хранить еще и все сырые данные из которых при необходимости мы будем вычислять новый агрегат.

Я бы сказал что это не то что нам хотелось, а именно, «дописывать данные задним числом», как и сказано в статье.

2. У него множество совместимых реализаций с немного отличающимися характеристиками, вы видимо посмотрели бегло на то что в гитхабе graphite-project, без попыток найти решения проблем. Например есть альтернативная реализация демона, осуществляющего запись — go-carbon, которая на одном среднем сервере не напрягаясь выдает 350 тысяч точек в секунду на запись. Кстати проект существует несколько лет уже.

Вы правы, мы не рассматривали все альтернативные реализации. Основная причина почему мы восприняли graphite без энтузиазма — это проблема в его протоколе: вы отправляете данные в сокет и у вас нет информации обработаны ли они как вы ожидали или пропали по какой-то причине. Эту же проблему, судя по всему, унаследовал и go-carbon.

Признаю, 350к в секунду (или 21 миллионов в минуту) на одном севере — выглядит очень интересно! Но возникают вопросы:

Обновляется 350к разных метрик? Или это интенсивное обновление нескольких метрик? Интересно сколько обновляется уникальных метрик, скажем, за 10 минут?

Держит ли go-carbon такую скорость постоянно в течении длительного времени или это пиковая скорость пока данные копятся в кэше?

На сколько мне известно whisper-формат очень похож на формат rrdtool. Т.е. это тоже огромное число файлов и как следствие большое число рандомных IOPs. Вы не могли бы рассказать подробнее за счет чего достигается такая высокая скорость записи в go-carbon?

Обычный carbon на питоне выдал мне 20к в секунду и уперся в CPU (на настольном компьютере). Если бы на настольном комьютере были HDD вместо SSD, я, уверен, в первую очередь узким местом были бы диски. С go-carbon CPU по-идее используется на порядок эффективнее, но проблему с IO он вряд ли решает.

3. Опять же альтернативные реализации стека реализуют и HA и позволяют масштабироваться (см carbonzipper). Проект в OpenSource'е и активно развивается года так с 2013-2014.

Вообще, когда мы рассматривали вариант с развитием распределенного rrdtool-кластера, у нас вырисовывалось что то похожее по архитектуре на graphite. Но, как я говорил, от родовых проблем rrdtool это не избавляло. А проблемы частично похожие на проблемы graphite. Интересно, как авторы carbonzipper решают некоторые из них.

Например, как я понял, carbonzipper позволяет на лету объединять результат одной метрики с разных серверов, где лежат ее реплики. Как решаются конфликты при объединении разных значений одной и той же ячейки?

Автоматизирован ли процесс синхронизации данных между репликами, что бы в итоге привести их к одному виду, что бы можно было безболезненно вывести ноду из кластера, и быть уверенным что все ее данные уже хранятся на соседней ноде? По моему мнению, формат whisper не подходит для таких задач.
+1
Так что, как перезапись — это действительно сработает. Но пользы от этого нет: что бы корректно это использовать надо где-то (где?) хранить еще и все сырые данные из которых при необходимости мы будем вычислять новый агрегат.

Да, такая проблема есть если у вас есть тенденция переписывать очень старые данные. В случаи с графитом стараются все таки иметь детальный архив размеров от 1 дня, отчасти чтобы позволить данным приходить с некоторой разумной задержкой. Я честно говоря слабо представляю себе use-case когда данные приходят с задержкой на 2-3 дня.

Обновляется 350к разных метрик? Или это интенсивное обновление нескольких метрик? Интересно сколько обновляется уникальных метрик, скажем, за 10 минут?

go-carbon естественно имеет некоторый in-memory кэш, в котором для каждой метрики копятся данные (с целью в том числе снизить рандомность записи). То есть в таком тесте это некоторое разумное количество записей в файл в секунду (мы у себя ограничиваем например 30к, что примерно соответствует 25% загрузке дисков наших). В целом 350к точек в секунду может быть и 350к разных метрик и это стабильная скорость при наличии достаточного количества памяти чтобы иметь кэш (я бы сказал 350к при наличии средненьких ССДшек можно получить при 16-32ГБ оперативной памяти под кэш, вероятно даже меньше). Естественно в жертву приносится некоторая надежность данных в случаи смерти железки (все что в кэше, но не на диске будет потеряно). Ноиз плюсов — при 128-256ГБ оперативной памяти оно может довольно спокойно жить на чем-то типа SAN на обычных дисках.
В случаи чтения, данные выбираются как с диска, так и из кэша.

Например, как я понял, carbonzipper позволяет на лету объединять результат одной метрики с разных серверов, где лежат ее реплики. Как решаются конфликты при объединении разных значений одной и той же ячейки?

На текущий момент считается что первая вернувшаяся и есть правильная.

Процесс «лечения» данных идет в фоне фактически набором скриптов — периодически процесс пробегает и сравнивает файлы.

Для миграции можно использовать buckytools, там проблема переноса данных на новую ноду довольно неплохо решена.
+1
В целом интересно не только, сколько вы смогли записать, но и как вы читаете? Я имею в виду какого характера запросы, как много метрик и точек вы выбираете и т.п.

Это хороший вопрос! Вернее здесь два вопроса: 1. Какая производительность чтения? и 2. Как мы пользуемся этими данными?

Я начну с последнего вопроса:

У нас есть аномали-детекшн для нескольких тысяч метрик. Причем написанный еще для старого хранилища. Остальные данные мы отображаем в виде графиков. Причем практически все метрики тщательно классифицированы и сгруппированы так что добраться до любой из них можно за несколько кликов. Процесс примерно такой: в случае проблемы срабатывает аномали-детекшн и дальше проблема диагностируется визуальным изучением метрик из проблемной области.

Что касается производительности чтения. При попытке выбрать какую то показательную метрику для измерения скорости чтения всплывает много нюансов от которых зависит результат:
— на сколько много у нас читателей и как интенсивно они читают;
— на сколько часто мы перечитываем одни и те же метрики;
— включают ли запрашиваемые периоды роллапы или состоят только из сырых данных;
— вычитываем ли мы последовательно (для даунсемплинга) или рандомно, и т.д.

Если пропускная способность на запись у нас измерилась естественным образом на реальной нагрузке, то с чтением так не получилось. Реальная нагрузка на чтение — это периодические загрузки дашбордов с графиками. Дашборды разные, в которых задействованы 10 метрик, в некоторых до 5 тысяч. Иногда требуются данные за пол года, но чаще за последние сутки. Так что показателем эффективности для нас является время, которое пользователь проводит в ожидании загрузки.

Проводить специальные объективные эксперименты мы не стали. Для себя мы удовлетворились простым тестом: загрузка дашборда с 100 метриками из старого хранилища и из нового. Получили 700 миллисекунд против 400. При этом, в это значение входит и логика по генерации дашборда. Если приблизительно убрать этот вклад, то получится 600 vs 300, т.е. в два раза лучше чем rrd. Если измерять в точках в секунду, то получается ~ 480000 точек в секунду.

Но, повторюсь, этот показатель полученный на коленке и не учитывающий многие факторы.
0
Интересно, спасибо.

Еще хотелось бы уточнить про характер нагрузки. Насколько принято у вас держать dashboard'ы постоянно открытыми с автообновлением, например? Или мониторы в офисе и т.п.

То есть насколько сильный постоянная составляющая для запросов, а насколько часто приходят живые люди с их задачами?

Еще не сравнивали ли вы как меняется скорость запросов в случаи вашей базы и rrd в зависимости от разных time frame'ов и количества метрик и от количества одновременных пользователей? Например довольно частая проблема систем что у них разная зависимость времени выполнения работы от этих параметров. На 1 запросе по 100 метрик может победить одна система, а на 50 запросах в секунду по 1000 метрик в среднем — уже другая (а та которая победила в первом случаи вообще сложиться и откинуть копыта).
0
О, прошу прощения что еще отдельным комментарием, но интересно еще примерно какое железо вы использовали под backend/frontend.
0
Сейчас у нас уже 2 кластера.

Первый, как я говорил, состоит из 9 серверов.
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 4 x HDD
ОЗУ: 128гб. Большая часть используется под Page Cache.
CPU: Intel Xeon. По 32 логическиз ядра на ноду

Второй, новый кластер, на который мы переводим оставшиеся метрики, состоит из 5 серверов:
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 5xSSD
ОЗУ: 64гб
CPU: Intel Xeon. По 24 логическиз ядра на ноду

REST API как я говорил работает на тех же серверах.
0
К сожалению, я не смогу дать вам детальное описание паттерна чтения. Мы пытались «в лоб» собирать такую статистику, но она слишком «прыгала» и была не показательной. Хотя, если уделить этой задаче должное внимание, мы бы конечно вывели какую-то показательную метрику, для оценки производительности. Можно, например попробовать анализировать, количество прочитанных точек в час.

В данный м��мент, мы исходим из того что чтения на несколько порядков меньше чем записи, акцентируя таким образом внимание на производительность записи.

Открытые дашборды с авто обновлением — у нас привычная практика, но какая это доля от живых обращений никто не считал, как и зависимость лэтенси от нагрузки.
0
Хорошо, спасибо.

Жаль конечно что не изучали детально чтение.

И наверное последний вопрос который пришел в голову — а вы считали сколько у вас байт занимает 1 точка сейчас и сколько было раньше? Тоже интересный показатель как по мне.
0
пару замечаний:

1) HBase — Есть мастер нода – SPOF. мастеров в кластере несколько, выбираются кворумом, выпадение даже текущего не влияет на работу кластера (сам делал rolling restart всего кластера под нагрузкой, вышестоящие приложения лишь замечали небольшой скачек latency, драйвер сам детектил поведение и повторял запрос в новую ноду)

2) Druid — Долгове́чность — немного не понял, так как:
а) данные поступают в систему из kafka, пока блок не окажется в deepstorage то offset в kafka не комитится, данные всегда в очереди доступны
б) так как данные по большему immutable (сегменты полностью immutable, метаданные в реляционке лежат), то гарантии сохранности лежат или на deepstorage (это s3/hdfs зачастую) или как у вас резервирования sql базы сделано.

если я что-то неправильно понял, то уточните
0
Я не в коей мере не принижаю достоинств HBase. Возможно вы пропустили это при чтении, но я написал примерно то же что и вы:

В отличие от HBase, в Cassandra нет мастер-ноды и, соответственно, не требуется её дублирование для обеспечения высокой отказоустойчивости


а) данные поступают в систему из kafka, пока блок не окажется в deepstorage то offset в kafka не комитится, данные всегда в очереди доступны
б) так как данные по большему immutable (сегменты полностью immutable, метаданные в реляционке лежат), то гарантии сохранности лежат или на deepstorage (это s3/hdfs зачастую) или как у вас резервирования sql базы сделано.

Но в самом Druid нет (или не было на тот момент?) встроенного механизма позволяющего избежать потери данных при падении реалтайм ноды до того как сегмент сохранился в дип сторадж. Таким образом это должно решаться на уровне приложения, использующего Druid. В частности, при помощи kafka.
0
ну дублирование не совсем корректное название, особенно что она очень легковесная, но хорошо.

по поводу друида: вы данные в него предполагали напрямую заливать?
просто везде где сталкивался с метриками мы гнали их через очередь, мало ли как индексатор себя поведет
независимо это cassandra/hbase/druid
+1
> Elasticsearch
> И ещё мы столкнулись со странной особенностью: чем больше шардов в индексе, тем медленнее он работает на запись (хотя здравый смысл подсказывает, что должно быть наоборот).

Есть такое. Лечится группировкой данных в bulk запросах по шардам. То есть в одном bulk запросе все документы должны отправляться в один шард. Этого поидее должно быть достаточно, чтобы избавиться от горлышка на синхронизации запросов в шардах. Но в идеале — смотрим в исходниках elasticsearch алгоритм вычисления номера шарда по routing (https://gist.github.com/ei-grad/c6794b151f6df49b6b6d94befff877e6), получаем через allocation explain API на какой ноде сейчас находится primary-реплика этого шарда, и шлём этот bulk запрос напрямую в неё, а не через Client-ноду (и уж тем более не через произвольную data-ноду).

У меня при такой схеме количество шардов влияет на скорость индексации линейно (на самом деле нет, я просто стал в питонячий клиент упираться на подготовке данных :(, увеличивать число шардов дальше не хочется, и так уже по 10 шардов на 4-ядерную ноду с учетом реплик, а текущих 100к записей в секунду, до которых успевает разогнаться импорт на получасовых батчах вполне хватает).
Only those users with full accounts are able to leave comments., please.