Pull to refresh

Comments 10

Как по мне, так нагородили огород с зоопарком.
А в сухом остатке обычная репликация со standby структурой гетерогенных СУБД.
Отслеживать изменения в бд сложно? Думается, что Через некоторый интервал времени код и правда станет похож на творчество Кафки

Kafka Connect — это не только базы данных. Kafka Connect — это фреймворк, покрывающий наиболее популярные области использования Apache Kafka. Можете подобрать коннектор под свои нуждны, ознакомившись основным репозиторием коннекторов — www.confluent.io/hub
Или посмотреть на github.
Данная статья описывает принцип и некоторые особенности работы только с 2мя коннекторами.
Так же в статье указаны причины, по которым ты стали использовать Kafka Connect в нашей компании. Кратко:
— отказ от shared database
— перенести данные ближе к сервису использования
— выделение контекста домена и переход на рельсы EventBus

Очень интересно, как будет решаться кейс с высоконагруженными большими таблицами в случае, если таки какое-то время прослойка между бд и кафкой, или сама кафка будут лежать, а логи уже уедут вперед и перепишутся. Часть данных никогда не доедет и придется с нуля всю таблицу перекачивать.

Тут вопрос, как вы относитесь к вашей шине событий.
Для надежности у кафки есть такие механизмы как репликация.
Кафка Коннект так же умеет балансировать нагрузку и поддерживать свое состояние (см гифку в статье)

Что касается аварийных случаев.
— Полностью упала Кафка
В этом случае Кафка Коннект не сможет писать сообщения в Кафку и тоже упадет. Следует исправлять причину поломки.
Скорее всего — это проблемы с сетью.
После восстановления коннекторы будут продолжать писать/читать с последнего оффсета.

За то, на сколько долго у вас будут хранится офсеты отвечает настройка retention time. Это каксается и топиков Кафки и binlog'a mysql.

Дефолтное значение 7 дней. Но для наиболее нагруженных мы опустили до 3х дней. За 3 дня точно необходимо решить проблемы с шиной событий.

Если же все таки возникла ситуация, когда пропали оффсеты, то можно перечитать всю таблицу.

Так же стоит учитывать, какие это данные. Для значимых данных, рекомендуется использовать cleanup.policy=compact. Коннекторы используют в качестве key сообщения — primary key таблицы. И это гарантирует, что в топике всегда будет актуальное состояние таблицы.
как Вы решаете проблему обновления схемы в мастер базе? Были ли случай нарушения консистентности данных? Что будете делать, если в целевую базу потребуется добавить новый столбец из мастер базы, который ранее не синхронизировался?
как Вы решаете проблему обновления схемы в мастер базе?

Да, есть проблема обновления схемы. И это проблема не Кафка Коннект, а в целом проблема обратной совместимости сообщений в шине событий.
На данном этапе, у нас есть общие рекомендации для внесения изменений в схему базы. Это общие рекомендации:
— не менять тип данных у полей
— не добавлять поля без дефолтных значений
— не удалять поля без дефолтных значений
Так же у нас все коннекторы запущены на stage окружении, и при возникновении проблем со схемой, мы это заблаговременно это увидим и примем меры, чтобы коллеги исправили эти миграции.

Были ли случай нарушения консистентности данных?

Да, есть проблема с foreign key. Так как кафка не гарантирует доставку сообщений из разных топиков (и партиций тоже) в том порядке, что они былы записаны, то может произойти ситуация, когда запись ссылкается на другую таблицу, а та еще не получила данных. Для решения данной проблемы можно добавить ретраи и задержки. Но мы решили просто не использовать foreign key в базе приемнике.

Что будете делать, если в целевую базу потребуется добавить новый столбец из мастер базы, который ранее не синхронизировался?

Эта ситуация решается при сохранении прямой/обратной совместимости (см выше)
То есть, если добавить поле в таблицу, то у нее обязательно должно быть значение по умолчанию. И в базе приемнике тоже. После того, как это поле будет наполнятся значениями, то это уже будут обычные UPDATE, с которыми коннекторы прекрасно работают
Kafka Connect (а точнее воркеры) это действительно stateless приложение и деплоить сами воркеры не составляет никаких проблем. А вот как автоматизировать lifecycle самих connector-ов не очень понятно. Они то как раз statefull и например персоздав коннектор с новыми настройками нет никаких гарантий что всё будет гладко. Потому что топики и схемы данных сами не пересоздаются, а остаются прежними. У вас для этого есть какое-то решение?
Все таки коннектор — это именно процессы на воркерах. И они так же stateless.
И свое состояние коннекторы хранят в спец топиках в кафке.
Вы можете сделать edit коннектора. Но конечно настройки будут применены начаная с того момента, когда вы сделали edit.
Соответственно, и сообщения в соответствии с новыми настройками будут обрабатываться с этого момента.
И в качестве идентификатора в кафке используется имя коннектора.
Если же необхоимо перелить всю таблицу с новыми настройками, то вам необходимо создать новый коннектор.
Для этого мы приняли правило именования, где добавляем префикс с версией (v1, v2 и тд). И просто создаем новый коннектор.
Но вообще, тут вопрос именно в том, как вы работаете с версионированием схемы данных.

По поводу централизованного управления коннекторами в компании, мои коллеги готовят внутренний фреймвор для этого. Возможно, про это будет в будущем статья или проект на github.
Спасибо за статью!
Вы пишете что вы хотите перейти на событийную архитектуру. Но при этом, вроде бы, решили другую задачу — синхронизировали таблицы (те по сути построили такую большую мат вьюху).

Мне кажется если мы говорим про события, то речь идет о следующем: сервис генерирует событие (как бы факт), это событие кидается в топик и куча других сервисов подписывается на эти события и как-то на них реагируют. Суть именно в возможности отреагировать на событие.

Если мы говорим о синхронизации таблиц — то мы просто хотим чтобы данные были одинаковые и мы никак нам особо ничего не надо делать при их изменении. Например, НСИ часто нам нужно просто синхронизировать, но никак не надо реагировать на изменение позиций в справочнике. Я правильно понимаю, что вы решали именно задачу синхронизации НСИ?

Есть ли у вас именно события? Если да, то интересно как вы обеспечиваете целостность при записи в Kafka?
Следует сказать, что просто добавить source коннекторов — это не полноценный переход на событийную архитектуру. Но это важный шаг в этом направлении.

Да наши source коннекторы синкают данные из базы монолита в Кафку. И да некоторые данные мы просто синкаем в нашу таблицу. Но так же другие сервисы могут подписываться на эти топики, и реагировать на сообщение каким угодно способом.

Например, добавление ресторана для сервиса Каталог означает, что просто этот ресторан появится в выдаче.
Но есть сервис по рассылке писем, и при появлении ресторана, он может рассылать различные письма менеджерам. Но топик в Кафке — тот же.

По поводу обеспечения целостности данных, в Кафке есть идемпотентная доставка сообщений. Но так же стоит иметь в виду, что коннекторы друг про друга ничего не знают. Если вам необходимо на стороне Кафки выполнить связь данных, вы можете воспользоваться другим фреймворком — Kafka Streams. То есть скомбинировав Stream и Connect можно решить большинство кейсов по связи и доставке данных.

И так же хотелось еще раз отметить, что Kafka Connect — это именно фреймворк от Apache Kafka, а не наша поделка. И нам этот инструмент очень помог в решении конкретной задачи. Возможно для решения ваших задач он так же будет полезен. Но может и нет.
Sign up to leave a comment.