Евгений Гугучкин @che
Пользователь
Информация
- В рейтинге
- Не участвует
- Откуда
- Москва, Москва и Московская обл., Россия
- Зарегистрирован
- Активность
Специализация
Backend Developer
Senior
От 350 000 ₽
Git
MySQL
PHP
Nginx
Golang
High-loaded systems
Пользователь
Но в самом Druid нет (или не было на тот момент?) встроенного механизма позволяющего избежать потери данных при падении реалтайм ноды до того как сегмент сохранился в дип сторадж. Таким образом это должно решаться на уровне приложения, использующего Druid. В частности, при помощи kafka.
Первый, как я говорил, состоит из 9 серверов.
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 4 x HDD
ОЗУ: 128гб. Большая часть используется под Page Cache.
CPU: Intel Xeon. По 32 логическиз ядра на ноду
Второй, новый кластер, на который мы переводим оставшиеся метрики, состоит из 5 серверов:
Диски:
— Под систему и commitlog HDD
— Под данные JBOD 5xSSD
ОЗУ: 64гб
CPU: Intel Xeon. По 24 логическиз ядра на ноду
REST API как я говорил работает на тех же серверах.
В данный момент, мы исходим из того что чтения на несколько порядков меньше чем записи, акцентируя таким образом внимание на производительность записи.
Открытые дашборды с авто обновлением — у нас привычная практика, но какая это доля от живых обращений никто не считал, как и зависимость лэтенси от нагрузки.
Это хороший вопрос! Вернее здесь два вопроса: 1. Какая производительность чтения? и 2. Как мы пользуемся этими данными?
Я начну с последнего вопроса:
У нас есть аномали-детекшн для нескольких тысяч метрик. Причем написанный еще для старого хранилища. Остальные данные мы отображаем в виде графиков. Причем практически все метрики тщательно классифицированы и сгруппированы так что добраться до любой из них можно за несколько кликов. Процесс примерно такой: в случае проблемы срабатывает аномали-детекшн и дальше проблема диагностируется визуальным изучением метрик из проблемной области.
Что касается производительности чтения. При попытке выбрать какую то показательную метрику для измерения скорости чтения всплывает много нюансов от которых зависит результат:
— на сколько много у нас читателей и как интенсивно они читают;
— на сколько часто мы перечитываем одни и те же метрики;
— включают ли запрашиваемые периоды роллапы или состоят только из сырых данных;
— вычитываем ли мы последовательно (для даунсемплинга) или рандомно, и т.д.
Если пропускная способность на запись у нас измерилась естественным образом на реальной нагрузке, то с чтением так не получилось. Реальная нагрузка на чтение — это периодические загрузки дашбордов с графиками. Дашборды разные, в которых задействованы 10 метрик, в некоторых до 5 тысяч. Иногда требуются данные за пол года, но чаще за последние сутки. Так что показателем эффективности для нас является время, которое пользователь проводит в ожидании загрузки.
Проводить специальные объективные эксперименты мы не стали. Для себя мы удовлетворились простым тестом: загрузка дашборда с 100 метриками из старого хранилища и из нового. Получили 700 миллисекунд против 400. При этом, в это значение входит и логика по генерации дашборда. Если приблизительно убрать этот вклад, то получится 600 vs 300, т.е. в два раза лучше чем rrd. Если измерять в точках в секунду, то получается ~ 480000 точек в секунду.
Но, повторюсь, этот показатель полученный на коленке и не учитывающий многие факторы.
Так действительно заявляют авторы Graphite. Я если честно, засомневался в этом, зная что в whisper-формате хранятся только агрегированные данные. И действительно, когда приходит запрос на запись уже существующей точки, то переписывается вся ячейка архива.
Представьте, что у нас какой-то результат измерения дошел до хранилища с запозданием, когда значение ячейки, в которую он попадает, уже сформировано как AVG от других измерений.
Хорошо, если перезаписываемое значение попадет в самый детальный архив, с минутными ячейками, например. Мы можем даже не заметить каких то искажений в данных, а если «уйти» далеко в прошлое, то мы перезапишем, скажем, часовую ячейку полностью. Т.е. мы потеряем часть данных, а именно вклад всех предыдущих значений, которые составили результирующее значение часовой ячейки.
Так что, как перезапись — это действительно сработает. Но пользы от этого нет: что бы корректно это использовать надо где-то (где?) хранить еще и все сырые данные из которых при необходимости мы будем вычислять новый агрегат.
Я бы сказал что это не то что нам хотелось, а именно, «дописывать данные задним числом», как и сказано в статье.
Вы правы, мы не рассматривали все альтернативные реализации. Основная причина почему мы восприняли graphite без энтузиазма — это проблема в его протоколе: вы отправляете данные в сокет и у вас нет информации обработаны ли они как вы ожидали или пропали по какой-то причине. Эту же проблему, судя по всему, унаследовал и go-carbon.
Признаю, 350к в секунду (или 21 миллионов в минуту) на одном севере — выглядит очень интересно! Но возникают вопросы:
Обновляется 350к разных метрик? Или это интенсивное обновление нескольких метрик? Интересно сколько обновляется уникальных метрик, скажем, за 10 минут?
Держит ли go-carbon такую скорость постоянно в течении длительного времени или это пиковая скорость пока данные копятся в кэше?
На сколько мне известно whisper-формат очень похож на формат rrdtool. Т.е. это тоже огромное число файлов и как следствие большое число рандомных IOPs. Вы не могли бы рассказать подробнее за счет чего достигается такая высокая скорость записи в go-carbon?
Обычный carbon на питоне выдал мне 20к в секунду и уперся в CPU (на настольном компьютере). Если бы на настольном комьютере были HDD вместо SSD, я, уверен, в первую очередь узким местом были бы диски. С go-carbon CPU по-идее используется на порядок эффективнее, но проблему с IO он вряд ли решает.
Вообще, когда мы рассматривали вариант с развитием распределенного rrdtool-кластера, у нас вырисовывалось что то похожее по архитектуре на graphite. Но, как я говорил, от родовых проблем rrdtool это не избавляло. А проблемы частично похожие на проблемы graphite. Интересно, как авторы carbonzipper решают некоторые из них.
Например, как я понял, carbonzipper позволяет на лету объединять результат одной метрики с разных серверов, где лежат ее реплики. Как решаются конфликты при объединении разных значений одной и той же ячейки?
Автоматизирован ли процесс синхронизации данных между репликами, что бы в итоге привести их к одному виду, что бы можно было безболезненно вывести ноду из кластера, и быть уверенным что все ее данные уже хранятся на соседней ноде? По моему мнению, формат whisper не подходит для таких задач.
Конкретно в этом сценарии: допустим клиент отправляет одно значение. Мы сперва делаем вставку в meta — запись о том что у нас появился новый сегмент у метрики, потом запись в points. Если сервис упал перед вставкой в points, мы имеем только запись о том что у метрики есть сегмент но он пустой.
Дальше все зависит от клиентского приложения — как он обрабатывает ошибку при отправке в удаленный сервис, ведь ответ об успешной вставке он не получил. В нашем случае мы делаем несколько попыток отправить данные снова. Если лимит попыток исчерпан, то данные остаются в локальном журнале приложения. Для таких случаев у нас есть скрипт который периодически делает replay по таким журналам.
В случае, если по каким-то причинам приложение забивает на ошибку, и не продолжает обновлять метрику, эта метрика остается с пустым сегментом. В нашей модели данных это валидная ситуация, которая не приводит к ошибкам при чтении.
Для метрик, которые давно не обновлялись у нас есть скрипт который периодически сканирует все записи в таблице meta и «подчищает мусор»:
— Во-первых, удаляет все записи из всех таблиц для устаревшей метрики. При этом из таблицы meta данные удаляются в последнюю очередь, так что при падении этого скрипта он в следующий раз все равно дойдет до этой записи и завершит свое дело.
— Во-вторых, проверяются сегменты которые должны были быть давно задаунсеплены, и, в случае пустых сегментов они просто удаляются.
Потому что в случае нескольких sstable часть партиции может оказаться в одном файле, а часть в другом.
Cassandra: 100к/сек, 9 серверов, 180кк метрик.
Вернее уже 190 миллионов в Cassandra
— совершенно верно
А раз не используем TTL то и TWCS отпадает как не эффетивный при явно удаляемых данных.
Данные в points у нас удаляются целыми сегментами (партициями) при перекладывании в rollups в агрегированном виде. А в rollups как таковой нет операции удаления, есть перезапись.
Предвосхищая следующие вопросы:
— мы используем LCS для индекса имен и STCS для всех остальных данных.
— да, для points мы имеем лаг между удалением данных и реальным пуржингом с диска: ~ 1.5 недели
— для rollups трудно точно оценить какую часть данных составляют устаревшие версии переписанных значений (по грубой оценке это 30-40% от всего объема rollups), но мы следим за «средним размером метрики» и она нас устраивает.
Использование TTL возможно и имело бы смысл, но при этом всплывут новые проблемы которые потребуют решения.
В общем, мы пока не получили негативный опыт с нашей моделью данных что бы реально задумываться над использованием TTL.
SELECT * FROM Table WHERE token(pk) >= :start_token limit 10
SELECT * FROM Table WHERE token(pk) > token('prev_value_pk') limit 10
Это так же дает возможность разбить все данные таблицы на относительно равные сегменты выбрав правильный start_token и end_token из всего возможного диапазона токенов и вычитывать данные параллельно.
Так что разработчику достаточно расставить в коде в нужных местах отправку события с вложенной в него структурой данных, и, например, сразу же иметь возможность видеть на дашборде графики, скажем, с количеством уникальных за сутки/час пользователей мужского пола из Парижа, к которым относится отправляемое событие. Внутри мы называет этот продукт Product Metrics, а его авторы выделились в отдельную команду Data-team. Они обещают подготовить для хабра отдельную статью про свое детище.
Но вдохновлялись мы больше, как это ни странно, rrdtool который, кстати, мы пока тоже продолжаем использовать.
Например, у нас есть целый отдел мониторинга и один из инструментов которым они пользуются является zabbix. К сожалению, я не могу рассказать более подробно, потому как не знаю всех нюансов.
Множество технических и бизнес метрик собирают для себя сами разработчики. Что бы следить за состоянием своей части приложения и понимать что происходит.
Каким образом:
— Очень много метрик мы получаем из Pinba.
— Большую часть данных отправляем прям из приложения и собираем при помощи LSD (см. раздел статьи LSD: Live Streaming Daemon).
— Если говорить про c/go сервисы, каждый сервис написанный у нас предоставляет довольно подробную информацию о своем состоянии.
А уже непосредственно за перекладывание данных в time series хранилище, а так же за навигацию и отображение отвечает наш собственный фреймворк (конечно же на php)
— сырые данные пишем с кворумом, но с DowngradingConsistency RetryPolicy
— ролапы пишем с CL = ALL
— читаем с CL = ONE
>> выход из строя одной ноды в кластере не блокирует ни чтение, ни запись
> Как это работает при RF=2?
Сырые данные пытаемся записать во все живые ноды, но если не получается, то в одну. Для time series мы сочли это приемлемым.
>Ну и вот это уж тоже как всё-таки на самом деле, то
>> около 200 000 значений в секунду
>или
>> 100 000 значений в секунду; 140 миллионов метрик.
Текущий поток данных 100к в секунду но справляемся и с более чем 200к
Если кратко по вашим вопросам:
1. Храним последнюю неделю сырых данных, остальные перекладываем в ролапы. Поддерживаются разные агрегирующие функции, но используем пока только AVG. Все вычисляется на php.
2. Сырые данные партицируются по идентификатору метрики и левой границе периода (сутки). Данные для разных метрик поступают с разной периодичностью, минимальная — раз в минуту. Так что, по факту, размер партиции от 1440 строк и меньше. Но есть и сильно разреженные метрики.
Если говорить про ролапы, то сейчас одна партиция — один архив. У метрики может быть несколько архивов. Размер зависит от retention и детализации.
3. Все верно, при запросе данных возвращается результат объединения партиций. А именно, данные из подходящего ролапа + сырые данные свернутые с такой же детализацией и агрегирующей функцией как и выбранный архив.
4. DateTieredCompactionStrategy — сейчас является depricated. Он заменен на TWCS. Но и от него мы отказались в пользу обычного STCS, в первую очередь потому что данные мы удаляем при перекладывании в ролапы, а не при помощи TTL. Да, требуется еженедельный major compaction.
5. Не используем, есть мнение что SASI-индексы пока не достаточно стабильные.
С ClickHouse не сравнивали, он вышел после того как мы запустили наш прототип.
Кластер Cassandra у нас в одном ДЦ.