Как стать автором
Обновить
21
0
Евгений Гугучкин @che

Пользователь

Отправить сообщение
Представь уникальный составной ключ (A, B, C) причем уникальных значений в колонке A всего, скажем, дюжина. Данные в таблицы с таким ключем разбиваем на дюжину таблиц с ключем (B, C). Ничего в итоге не дублируется и уникальность по прежнему поддерживается
Я не в коей мере не принижаю достоинств HBase. Возможно вы пропустили это при чтении, но я написал примерно то же что и вы:

В отличие от HBase, в Cassandra нет мастер-ноды и, соответственно, не требуется её дублирование для обеспечения высокой отказоустойчивости


а) данные поступают в систему из kafka, пока блок не окажется в deepstorage то offset в kafka не комитится, данные всегда в очереди доступны
б) так как данные по большему immutable (сегменты полностью immutable, метаданные в реляционке лежат), то гарантии сохранности лежат или на deepstorage (это s3/hdfs зачастую) или как у вас резервирования sql базы сделано.

Но в самом Druid нет (или не было на тот момент?) встроенного механизма позволяющего избежать потери данных при падении реалтайм ноды до того как сегмент сохранился в дип сторадж. Таким образом это должно решаться на уровне приложения, использующего Druid. В частности, при помощи kafka.
Сейчас у нас уже 2 кластера.

Первый, как я говорил, состоит из 9 серверов.
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 4 x HDD
ОЗУ: 128гб. Большая часть используется под Page Cache.
CPU: Intel Xeon. По 32 логическиз ядра на ноду

Второй, новый кластер, на который мы переводим оставшиеся метрики, состоит из 5 серверов:
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 5xSSD
ОЗУ: 64гб
CPU: Intel Xeon. По 24 логическиз ядра на ноду

REST API как я говорил работает на тех же серверах.
К сожалению, я не смогу дать вам детальное описание паттерна чтения. Мы пытались «в лоб» собирать такую статистику, но она слишком «прыгала» и была не показательной. Хотя, если уделить этой задаче должное внимание, мы бы конечно вывели какую-то показательную метрику, для оценки производительности. Можно, например попробовать анализировать, количество прочитанных точек в час.

В данный момент, мы исходим из того что чтения на несколько порядков меньше чем записи, акцентируя таким образом внимание на производительность записи.

Открытые дашборды с авто обновлением — у нас привычная практика, но какая это доля от живых обращений никто не считал, как и зависимость лэтенси от нагрузки.
В целом интересно не только, сколько вы смогли записать, но и как вы читаете? Я имею в виду какого характера запросы, как много метрик и точек вы выбираете и т.п.

Это хороший вопрос! Вернее здесь два вопроса: 1. Какая производительность чтения? и 2. Как мы пользуемся этими данными?

Я начну с последнего вопроса:

У нас есть аномали-детекшн для нескольких тысяч метрик. Причем написанный еще для старого хранилища. Остальные данные мы отображаем в виде графиков. Причем практически все метрики тщательно классифицированы и сгруппированы так что добраться до любой из них можно за несколько кликов. Процесс примерно такой: в случае проблемы срабатывает аномали-детекшн и дальше проблема диагностируется визуальным изучением метрик из проблемной области.

Что касается производительности чтения. При попытке выбрать какую то показательную метрику для измерения скорости чтения всплывает много нюансов от которых зависит результат:
— на сколько много у нас читателей и как интенсивно они читают;
— на сколько часто мы перечитываем одни и те же метрики;
— включают ли запрашиваемые периоды роллапы или состоят только из сырых данных;
— вычитываем ли мы последовательно (для даунсемплинга) или рандомно, и т.д.

Если пропускная способность на запись у нас измерилась естественным образом на реальной нагрузке, то с чтением так не получилось. Реальная нагрузка на чтение — это периодические загрузки дашбордов с графиками. Дашборды разные, в которых задействованы 10 метрик, в некоторых до 5 тысяч. Иногда требуются данные за пол года, но чаще за последние сутки. Так что показателем эффективности для нас является время, которое пользователь проводит в ожидании загрузки.

Проводить специальные объективные эксперименты мы не стали. Для себя мы удовлетворились простым тестом: загрузка дашборда с 100 метриками из старого хранилища и из нового. Получили 700 миллисекунд против 400. При этом, в это значение входит и логика по генерации дашборда. Если приблизительно убрать этот вклад, то получится 600 vs 300, т.е. в два раза лучше чем rrd. Если измерять в точках в секунду, то получается ~ 480000 точек в секунду.

Но, повторюсь, этот показатель полученный на коленке и не учитывающий многие факторы.
1. У него изначально во всех доступных реализациях есть перезапись точек в прошлом (у вас стоит галочка что нету).

Так действительно заявляют авторы Graphite. Я если честно, засомневался в этом, зная что в whisper-формате хранятся только агрегированные данные. И действительно, когда приходит запрос на запись уже существующей точки, то переписывается вся ячейка архива.

Представьте, что у нас какой-то результат измерения дошел до хранилища с запозданием, когда значение ячейки, в которую он попадает, уже сформировано как AVG от других измерений.

Хорошо, если перезаписываемое значение попадет в самый детальный архив, с минутными ячейками, например. Мы можем даже не заметить каких то искажений в данных, а если «уйти» далеко в прошлое, то мы перезапишем, скажем, часовую ячейку полностью. Т.е. мы потеряем часть данных, а именно вклад всех предыдущих значений, которые составили результирующее значение часовой ячейки.

Так что, как перезапись — это действительно сработает. Но пользы от этого нет: что бы корректно это использовать надо где-то (где?) хранить еще и все сырые данные из которых при необходимости мы будем вычислять новый агрегат.

Я бы сказал что это не то что нам хотелось, а именно, «дописывать данные задним числом», как и сказано в статье.

2. У него множество совместимых реализаций с немного отличающимися характеристиками, вы видимо посмотрели бегло на то что в гитхабе graphite-project, без попыток найти решения проблем. Например есть альтернативная реализация демона, осуществляющего запись — go-carbon, которая на одном среднем сервере не напрягаясь выдает 350 тысяч точек в секунду на запись. Кстати проект существует несколько лет уже.

Вы правы, мы не рассматривали все альтернативные реализации. Основная причина почему мы восприняли graphite без энтузиазма — это проблема в его протоколе: вы отправляете данные в сокет и у вас нет информации обработаны ли они как вы ожидали или пропали по какой-то причине. Эту же проблему, судя по всему, унаследовал и go-carbon.

Признаю, 350к в секунду (или 21 миллионов в минуту) на одном севере — выглядит очень интересно! Но возникают вопросы:

Обновляется 350к разных метрик? Или это интенсивное обновление нескольких метрик? Интересно сколько обновляется уникальных метрик, скажем, за 10 минут?

Держит ли go-carbon такую скорость постоянно в течении длительного времени или это пиковая скорость пока данные копятся в кэше?

На сколько мне известно whisper-формат очень похож на формат rrdtool. Т.е. это тоже огромное число файлов и как следствие большое число рандомных IOPs. Вы не могли бы рассказать подробнее за счет чего достигается такая высокая скорость записи в go-carbon?

Обычный carbon на питоне выдал мне 20к в секунду и уперся в CPU (на настольном компьютере). Если бы на настольном комьютере были HDD вместо SSD, я, уверен, в первую очередь узким местом были бы диски. С go-carbon CPU по-идее используется на порядок эффективнее, но проблему с IO он вряд ли решает.

3. Опять же альтернативные реализации стека реализуют и HA и позволяют масштабироваться (см carbonzipper). Проект в OpenSource'е и активно развивается года так с 2013-2014.

Вообще, когда мы рассматривали вариант с развитием распределенного rrdtool-кластера, у нас вырисовывалось что то похожее по архитектуре на graphite. Но, как я говорил, от родовых проблем rrdtool это не избавляло. А проблемы частично похожие на проблемы graphite. Интересно, как авторы carbonzipper решают некоторые из них.

Например, как я понял, carbonzipper позволяет на лету объединять результат одной метрики с разных серверов, где лежат ее реплики. Как решаются конфликты при объединении разных значений одной и той же ячейки?

Автоматизирован ли процесс синхронизации данных между репликами, что бы в итоге привести их к одному виду, что бы можно было безболезненно вывести ноду из кластера, и быть уверенным что все ее данные уже хранятся на соседней ноде? По моему мнению, формат whisper не подходит для таких задач.
В сторону KairosDB смотрели. Я описывал в комментарии выше. Тогда она работала не стабильно. Могла потерять данные, в случае потери коннекта к Cassandra, причем без оповещения клиента. Более подробно мы ее не изучали, так что, подходит ли она нам — это открытый вопрос. В частности, по производительности даунсемплинга. Это надо бы протестировать. Например, в нашей реализации мы не сразу добились приемлемой скорости архивирования.
А как обрабатываете ситуацию, когда данные записались только в часть таблиц, например, в Points данные записались, а при записи в Meta допустим сервис помер…

Конкретно в этом сценарии: допустим клиент отправляет одно значение. Мы сперва делаем вставку в meta — запись о том что у нас появился новый сегмент у метрики, потом запись в points. Если сервис упал перед вставкой в points, мы имеем только запись о том что у метрики есть сегмент но он пустой.

Дальше все зависит от клиентского приложения — как он обрабатывает ошибку при отправке в удаленный сервис, ведь ответ об успешной вставке он не получил. В нашем случае мы делаем несколько попыток отправить данные снова. Если лимит попыток исчерпан, то данные остаются в локальном журнале приложения. Для таких случаев у нас есть скрипт который периодически делает replay по таким журналам.

В случае, если по каким-то причинам приложение забивает на ошибку, и не продолжает обновлять метрику, эта метрика остается с пустым сегментом. В нашей модели данных это валидная ситуация, которая не приводит к ошибкам при чтении.

Для метрик, которые давно не обновлялись у нас есть скрипт который периодически сканирует все записи в таблице meta и «подчищает мусор»:

— Во-первых, удаляет все записи из всех таблиц для устаревшей метрики. При этом из таблицы meta данные удаляются в последнюю очередь, так что при падении этого скрипта он в следующий раз все равно дойдет до этой записи и завершит свое дело.
— Во-вторых, проверяются сегменты которые должны были быть давно задаунсеплены, и, в случае пустых сегментов они просто удаляются.

почему «по возможности»? И в sstable, и в memtable данные лежат упорядоченно по колонкам.

Потому что в случае нескольких sstable часть партиции может оказаться в одном файле, а часть в другом.
Rrd: 100к/сек, 15 серверов, 120кк метрик
Cassandra: 100к/сек, 9 серверов, 180кк метрик.

Вернее уже 190 миллионов в Cassandra

Оставшиеся 15 серверов и 120млн метрик где? На старом кластере в rrd?
— совершенно верно
Больше года назад, когда мы рассматривали готовые решения, мы смотрели и на KairosDB. Некоторые наши претензии к OpenTSDB относились и к KairosDB. Но самое главное, KairosDB как минимум не соответствовал требованию «долговечность» — т.е. терял данные из буфера в памяти, когда не мог сбросить его в Cassandra. Кстати, разработчики, как я сейчас вижу, уже исправили эту проблему пару месяцев назад.
TTL решили не использовать что бы избежать ситуации когда данные удалились по TTL, но по какой-то причине не переложились в rollups. Сейчас же мы можем приостанавливать даунсемплинг (на время обслуживания, например, поскольку дайнсемплинг работает с CL=ALL) не опасаясь потери сырых данных. Можем увеличивать временное окно в котором данные хранятся сырыми. В общем, процесс под контролем.

А раз не используем TTL то и TWCS отпадает как не эффетивный при явно удаляемых данных.

Данные в points у нас удаляются целыми сегментами (партициями) при перекладывании в rollups в агрегированном виде. А в rollups как таковой нет операции удаления, есть перезапись.

Предвосхищая следующие вопросы:
— мы используем LCS для индекса имен и STCS для всех остальных данных.
— да, для points мы имеем лаг между удалением данных и реальным пуржингом с диска: ~ 1.5 недели
— для rollups трудно точно оценить какую часть данных составляют устаревшие версии переписанных значений (по грубой оценке это 30-40% от всего объема rollups), но мы следим за «средним размером метрики» и она нас устраивает.

Использование TTL возможно и имело бы смысл, но при этом всплывут новые проблемы которые потребуют решения.

В общем, мы пока не получили негативный опыт с нашей моделью данных что бы реально задумываться над использованием TTL.
Да, это специальный тип запросов. Внешне в CQL он почти не отличается о обычных, кроме того что в WHERE секции используется функция token(), которая возвращает хэш от ключа, по которому (по хешу) в реальности и упорядочены данные на диске:

SELECT * FROM Table WHERE token(pk) >= :start_token limit 10
SELECT * FROM Table WHERE token(pk) > token('prev_value_pk') limit 10

Это так же дает возможность разбить все данные таблицы на относительно равные сегменты выбрав правильный start_token и end_token из всего возможного диапазона токенов и вычитывать данные параллельно.
Хочу сказать еще немного про сбор данных. Поскольку в нашей компании паттерн использования LSD + time series фреймворк практически один, ребята из нашего отдела написали решение, которое не только автоматизирует этот процесс, но и добавили туда несколько просто офигенных фич.

Так что разработчику достаточно расставить в коде в нужных местах отправку события с вложенной в него структурой данных, и, например, сразу же иметь возможность видеть на дашборде графики, скажем, с количеством уникальных за сутки/час пользователей мужского пола из Парижа, к которым относится отправляемое событие. Внутри мы называет этот продукт Product Metrics, а его авторы выделились в отдельную команду Data-team. Они обещают подготовить для хабра отдельную статью про свое детище.
Мы уже анонсировали чуть выше, что планируем описать наше решение подробнее в полноценной статье.
На Cyanite наше решение похоже мало. Только, пожалуй, тем что это тоже time series на Cassandra. Кроме этого мы смотрели и http://blueflood.io/ и http://opennms.github.io/newts/
Но вдохновлялись мы больше, как это ни странно, rrdtool который, кстати, мы пока тоже продолжаем использовать.
Как у нас устроен сбор данных тема довольно объемная. Если отвечать кратко, то по разному :)

Например, у нас есть целый отдел мониторинга и один из инструментов которым они пользуются является zabbix. К сожалению, я не могу рассказать более подробно, потому как не знаю всех нюансов.

Множество технических и бизнес метрик собирают для себя сами разработчики. Что бы следить за состоянием своей части приложения и понимать что происходит.
Каким образом:
— Очень много метрик мы получаем из Pinba.
— Большую часть данных отправляем прям из приложения и собираем при помощи LSD (см. раздел статьи LSD: Live Streaming Daemon).
— Если говорить про c/go сервисы, каждый сервис написанный у нас предоставляет довольно подробную информацию о своем состоянии.

А уже непосредственно за перекладывание данных в time series хранилище, а так же за навигацию и отображение отвечает наш собственный фреймворк (конечно же на php)
> А пишете/читаете с кворумом или как?
— сырые данные пишем с кворумом, но с DowngradingConsistency RetryPolicy
— ролапы пишем с CL = ALL
— читаем с CL = ONE

>> выход из строя одной ноды в кластере не блокирует ни чтение, ни запись
> Как это работает при RF=2?
Сырые данные пытаемся записать во все живые ноды, но если не получается, то в одну. Для time series мы сочли это приемлемым.

>Ну и вот это уж тоже как всё-таки на самом деле, то
>> около 200 000 значений в секунду
>или
>> 100 000 значений в секунду; 140 миллионов метрик.
Текущий поток данных 100к в секунду но справляемся и с более чем 200к
Подробное описание не уместилось бы в текущий формат, поэтому мы готовим полноценную статью на эту тему.

Если кратко по вашим вопросам:

1. Храним последнюю неделю сырых данных, остальные перекладываем в ролапы. Поддерживаются разные агрегирующие функции, но используем пока только AVG. Все вычисляется на php.

2. Сырые данные партицируются по идентификатору метрики и левой границе периода (сутки). Данные для разных метрик поступают с разной периодичностью, минимальная — раз в минуту. Так что, по факту, размер партиции от 1440 строк и меньше. Но есть и сильно разреженные метрики.

Если говорить про ролапы, то сейчас одна партиция — один архив. У метрики может быть несколько архивов. Размер зависит от retention и детализации.

3. Все верно, при запросе данных возвращается результат объединения партиций. А именно, данные из подходящего ролапа + сырые данные свернутые с такой же детализацией и агрегирующей функцией как и выбранный архив.

4. DateTieredCompactionStrategy — сейчас является depricated. Он заменен на TWCS. Но и от него мы отказались в пользу обычного STCS, в первую очередь потому что данные мы удаляем при перекладывании в ролапы, а не при помощи TTL. Да, требуется еженедельный major compaction.

5. Не используем, есть мнение что SASI-индексы пока не достаточно стабильные.
В текущем виде решение в opensource не готово, но мы планируем прийти к этому.
С ClickHouse не сравнивали, он вышел после того как мы запустили наш прототип.
Кластер Cassandra у нас в одном ДЦ.
1

Информация

В рейтинге
Не участвует
Откуда
Москва, Москва и Московская обл., Россия
Зарегистрирован
Активность

Специализация

Backend Developer
Senior
От 350 000 ₽
Git
MySQL
PHP
Nginx
Golang
High-loaded systems