Pull to refresh

Доставка миллиардов сообщений строго один раз

Reading time 14 min
Views 21K
Original author: Amir Abu Shareb
Единственное требование ко всем системам передачи данных состоит в том, что нельзя потерять данные. Данные обычно могут поступить с опозданием или их можно запросить заново, но их никогда нельзя терять.

Чтобы удовлетворить этому требованию, большинство распределённых систем гарантирует по крайней мере однократную доставку. Техники обеспечения «по крайней мере однократной доставки» обычно сводятся к «повторам, повторам и повторам». Вы никогда не считаете сообщение доставленным, пока не получите чёткое подтверждение от клиента.

Но как пользователю по крайней мере однократная доставка — это не совсем то, что я хочу. Я хочу, чтобы сообщения доставлялись один раз. И только один раз.

К сожалению, для достижения чего-то близкого к точно однократной доставке нужен непробиваемый дизайн. Архитектура предусматривает, что каждый случай сбоя нужно тщательно обдумывать — нельзя будет просто прописать его как часть существующей реализации после самого факта сбоя. И даже тогда практически невозможно реализовать систему, в которой сообщения доставляются только один раз.

За последние три месяца мы разработали абсолютно новую систему дедупликации и максимально близко приблизились к точно однократной доставке, столкнувшись при этом с большим количеством разнообразных сбоев.

Новая система способна отслеживать в 100 раз больше сообщений, чем старая, отличается от неё повышенной надёжностью и более низкой стоимостью. Вот как мы это сделали.

Проблема


Большинство внутренних систем Segment изящно обрабатывают сбои с помощью повторов, вторичной отправки сообщений, блокировки и двухэтапных коммитов. Но есть одно примечательное исключение: клиенты, которые присылают данные напрямую в наш публичный API.

У клиентов (особенно мобильных) часто возникают перебои со связью, когда они могут отправить данные, но пропустить ответ от нашего API.

Представьте, что вы едете на автобусе и забронировали комнату из приложения HotelTonight на своём iPhone. Приложение начинает загружать данные на серверы Segment, но автобус неожиданно въезжает в тоннель и вы теряете связь. Некоторые из событий, которые вы отправили, уже обработаны, но клиент никогда не получит ответ от сервера.

В таких случаях клиенты повторяют отправку тех же событий в Segment API несмотря на то, что сервер технически уже получил ранее в точности такие же сообщения.

Судя по статистике нашего сервера, примерно 0,6% полученных событий за последние четыре недели — это повторные сообщения, которые мы уже получали.

Уровень ошибок может показаться незначительным. Но для приложения электронной коммерции, которое генерирует миллиарды долларов дохода, разница в 0,6% может означать разницу между прибылью и убытками в миллионы долларов.

Дедупликация сообщений


Итак, мы понимаем суть проблемы — нужно удалить дубликаты сообщений, которые отправляются в API. Но как это сделать?

На теоретическом уровне высокоуровневый API любой системы дедупликации кажется простым. На Python (aka псевдо-псевдокод) мы можем представить его следующим образом:

def dedupe(stream):
  for message in stream:
    if has_seen(message.id): 
      discard(message)
    else:
      publish_and_commit(message)

Для каждого сообщения в потоке сначала проверяется, встречалось ли такое сообщение раньше (по его уникальному идентификатору). Если встречалось, то отбрасываем его. Если не встречалось, то перевыпускаем сообщение и атомарно осуществляем передачу.

Чтобы не хранить постоянно все сообщения, действует «окно дедупликации», которое определяется как время хранения наших ключей до истечения их срока действия. Если сообщения не вписываются в окно, они считаются устаревшими. Мы хотим гарантировать, что в окне отправляется только одно сообщение с данным ID.

Такое поведение легко описать, но есть две детали, требующие особого внимания: производительность чтения/записи и точность.

Мы хотим, чтобы система осуществляла дедупликацию миллиардов событий в нашем потоке данных — и делала это одновременно с низкой задержкой и эффективно по стоимости.

Более того, мы хотим убедиться, что информация о зарегистрированных событиях надёжно сохраняется, так что мы можем восстановить её в случае сбоя, и что в выдаче никогда не встретятся повторные сообщения.

Архитектура


Чтобы добиться такого, мы создали «двухэтапную» архитектуру, которая считывает данные из Kafka и удаляет дубликаты событий, уже зарегистрированных в четырёхнедельном окне.


Высокоуровневая архитектура дедупликации

Топология Kafka


Для понимания работы такой архитектуры сначала взглянем на топологию потока Kafka. Все входящие вызовы API разделяются на отдельные сообщения и чётко выражают входной раздел Kafka.

Сначала каждое входящее сообщение помечается уникальным messageId, который генерируется на стороне клиента. Обычно это UUIDv4 (хотя мы рассматриваем переход на ksuid). Если клиент не сообщает messageId, то мы автоматически присваиваем его на уровне API.

Мы не используем векторные часы или порядковые номера, потому что не хотим усложнять клиентскую сторону. Применение идентификаторов UUID позволяет кому угодно легко отправить данные в наш API, потому что почти каждый основной язык программирования поддерживает их.

{
  "messageId": "ajs-65707fcf61352427e8f1666f0e7f6090",
  "anonymousId": "e7bd0e18-57e9-4ef4-928a-4ccc0b189d18",
  "timestamp": "2017-06-26T14:38:23.264Z",
  "type": "page"
}

Отдельные сообщения заносятся в журнал Kafka для долговечности и возможности повтора. Они распределяются по messageId, так что мы можем быть уверенными, что один и тот же messageId всегда поступит одному и тому же обработчику.

Это важная деталь, когда речь идёт об обработке данных. Вместо поиска в центральной базе данных ключа среди сотен миллиардов сообщений мы смогли на порядки сузить поисковое пространство просто за счёт перенаправления поискового запроса в конкретный раздел.

Воркер дедупликации — это программа на Go, которая считывает входные разделы Kafka. Она отвечает за чтение сообщений, проверку дубликатов, а если сообщения новые — за отправку их в выходную тему Kafka.

По нашему опыту, воркерами и топологией Kafka исключительно просто управлять. У нас больше нет множества больших инстансов Memcached, которым требуются отказоустойчивые реплики. Вместо этого мы применили встроенные базы данных RocksDB, которые вообще не нуждаются в координации и дают нам стойкое хранилище по исключительно низкой цене. Сейчас подробнее об этом.

Воркер RocksDB


Каждый воркер хранит локальную базу данных RocksDB на своём локальном жёстком диске EBS. RocksDB — это встроенное key-value хранилище, разработанное в Facebook, и оно оптимизировано для экстремально высокой производительности.

Всякий раз, когда событие извлекается из входных разделов, потребитель запрашивает RocksDB на предмет того, встречался ли ранее такой messageId.

Если сообщение отсутствует в RocksDB, мы добавляем ключ в базу данных, а затем публикуем сообщение в выходных разделах Kafka.

Если сообщение уже имеется в RocksDB, воркер просто не публикует его в выходные разделы и обновляет данные входного раздела с уведомлением, что он обработал сообщение.

Производительность


Чтобы добиться от нашей базы данных высокой производительности, мы должны соответствовать трём типам запросов для каждого обрабатываемого события:

  1. Обнаружение существования случайных ключей, которые поступают на входе, но вряд ли хранятся в нашей БД. Они могут располагаться где угодно в пространстве ключей.
  2. Запись новых ключей с высокой производительностью.
  3. Признание устаревшими старых ключей, которые не попали в наше «окно дедупликации».

В результате, мы должны непрерывно сканировать всю базу данных, добавлять новые ключи и признавать устаревшими старые ключи. А в идеале это должно происходить в рамках прежней модели данных.


Наша база данных должна удовлетворять трём очень разным типам запросов

Вообще говоря, основную часть прироста производительности даёт производительность БД — так что имеет смысл разобраться в устройстве RocksDB, из-за чего она настолько хорошо выполняет работу.

RocksDB — это журнально-структурированное дерево (LSM-дерево), то есть оно непрерывно добавляет новые ключи в журнал опережающей записи на диске, а также хранит отсортированные ключи в памяти как часть memtable.


Ключи отсортированы в памяти как часть memtable

Запись ключей — чрезвычайно быстрый процесс. Новые элементы записываются прямо на диск путём добавления к журналу (для непосредственного сохранения и восстановления в случае сбоя), а записи данных сортируются в памяти, чтобы обеспечить быстрый поиск и порционную запись.

Всякий раз, когда достаточное количество записей поступают в memtable, они сохраняются на диск как SSTable (таблица сортированных строк). Поскольку строки уже были отсортированы в памяти, их можно напрямую сбросить на диск.


Текущее состояние memtable сбрасывается на диск как SSTable на нулевом уровне (Level 0)

Вот пример такого сброса из наших рабочих логов:

[JOB 40] Syncing log #655020
[default] [JOB 40] Flushing memtable with next log file: 655022
[default] [JOB 40] Level-0 flush table #655023: started
[default] [JOB 40] Level-0 flush table #655023: 15153564 bytes OK
[JOB 40] Try to delete WAL files size 12238598, prev total WAL file size 24346413, number of live WAL files 3.


Каждая таблица SSTable остаётся неизменной — после того, как её создали, она никогда не изменяется — благодаря этому запись новых ключей происходит настолько быстро. Не нужно обновлять файлы, и запись не порождает новые записи. Вместо этого несколько таблиц SSTable на одинаковом «уровне» сливаются в один файл во время внеполосной фазы уплотнения.



Когда уплотняются отдельные таблицы SSTable с одного уровня, их ключи сливаются вместе, а затем новый файл переносится на более высокий уровень. В наших рабочих логах можно найти примеры таких уплотнений. В данном случае процесс 41 уплотняет четыре файла нулевого уровня и объединяет их в больший файл первого уровня.

/data/dedupe.db$ head -1000 LOG | grep "JOB 41"
[JOB 41] Compacting 4@0 + 4@1 files to L1, score 1.00
[default] [JOB 41] Generated table #655024: 1550991 keys, 69310820 bytes
[default] [JOB 41] Generated table #655025: 1556181 keys, 69315779 bytes
[default] [JOB 41] Generated table #655026: 797409 keys, 35651472 bytes
[default] [JOB 41] Generated table #655027: 1612608 keys, 69391908 bytes
[default] [JOB 41] Generated table #655028: 462217 keys, 19957191 bytes
[default] [JOB 41] Compacted 4@0 + 4@1 files to L1 => 263627170 bytes


После завершения уплотнения объединённые таблицы SSTable становятся окончательным множеством записей базы данных, а старые таблицы SSTable расцепляются.

Если посмотреть на рабочий инстанс, то мы увидим, как обновляется этот журнал опережающей записи, а также как записываются, считываются и сливаются отдельные таблицы SSTable.


Журнал и самые последние таблицы SSTable занимают львиную долю операций I/O

Если посмотреть на статистику SSTable на рабочем сервере, то увидим четыре «уровня» файлов, с увеличением размеров файлов на каждом более высоком уровне.

** Compaction Stats [default] **
Level    Files   Size(MB} Score Read(GB}  Rn(GB} Rnp1(GB} Write(GB} Wnew(GB} Moved(GB} W-Amp
--------------------------------------------------------------------------------------------
  L0      1/0      14.46   0.2      0.0     0.0      0.0       0.1      0.1       0.0   0.0
  L1      4/0     194.95   0.8      0.5     0.1      0.4       0.5      0.1       0.0   4.7
  L2     48/0    2551.71   1.0      1.4     0.1      1.3       1.4      0.1       0.0  10.7
  L3    351/0   21735.77   0.8      2.0     0.1      1.9       1.9     -0.0       0.0  14.3
 Sum    404/0   24496.89   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2
 Int      0/0       0.00   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2
Rd(MB/s} Wr(MB/s} Comp(sec} Comp(cnt} Avg(sec} KeyIn KeyDrop
    0.0     15.6         7         8    0.925       0      0
   20.9     20.8        26         2   12.764     12M     40
   19.4     19.4        73         2   36.524     34M     14
   18.1     16.9       112         2   56.138     52M  3378K
   18.2     18.1       218        14   15.589     98M  3378K
   18.2     18.1       218        14   15.589     98M  3378K

RocksDB хранит индексы и фильтры Блума конкретных таблиц SSTable в самих этих таблицах — и они загружаются в память. Эти фильтры и индексы потом опрашиваются для поиска конкретного ключа, а затем полная таблица SSTable загружается в память как часть LRU.

В абсолютном большинстве случаев мы видим новые сообщения, которые делают из нашей системы дедупликации классический случай применения фильтров Блума.

Фильтры Блума говорят, находится ли ключ «вероятно принадлежит множеству» или «определённо не принадлежит множеству». Чтобы выдать ответ, фильтр сохраняет множество битов после применения различных хеш-функций для каждого элемента, который встречался ранее. Если все биты от хеш-функции сходятся с множеством, то он выдаёт ответ «вероятно принадлежит множеству».


Запрос буквы w в фильтре Блума, когда наше множество содержит только {x, y, z}. Фильтр возвращает ответ «не принадлежит множеству», поскольку один из битов не сходится

Если получен ответ «вероятно принадлежит множеству», то RocksDB может запросить исходные данные из наших таблиц SSTable и определить, действительно ли элемент присутствует в множестве. Но в большинстве случаев мы можем вообще избежать запросов к любым таблицам, потому что фильтр возвращает ответ «определённо не принадлежит множеству».

Когда мы обращаемся к RocksDB, то создаём запрос MultiGet для всех релевантных messageId, которые хотим запросить. Мы создаём его в составе пакета ради производительности и чтобы избежать многих параллельных блокирующих операций. Он также позволяет нам пакетировать данные, поступающие от Kafka, и обычно избегает случайных записей в пользу последовательных.

Это объясняет, как задачи чтения/записи демонстрируют высокую производительность — но всё ещё остаётся вопрос, как несвежие данные признаются устаревшими.

Удаление: привязка к размеру, а не ко времени


В нашем процессе дедупликации нужно определиться, ограничивать систему по строгому «окну дедупликации» или по общему размеру базы данных на диске.

Чтобы избежать падения системы из-за чрезмерной дедупликации для всех пользователей, мы решили выбрать ограничение по размеру, а не ограничение по интервалу времени. Это позволяет установить максимальный размер для каждого инстанса RocksDB и справляться с внезапными скачками или увеличением нагрузки. Побочный эффект состоит в том, что интервал времени может сократиться менее чем до 24 часов, а на этой границе происходит вызов нашего дежурного инженера.

Мы периодически признаём устаревшими старые ключи из RocksDB, чтобы не дать её увеличиться до безграничного размера. Для этого мы храним вторичный индекс ключей по порядковому номеру, так что можем удалять первыми самые старые ключи.

Вместо использования RocksDB TTL, что потребовало бы сохранения фиксированного TTL при открытии базы данных, мы удаляем сами объекты по порядковому номеру каждого вложенного ключа.

Поскольку порядковые номера хранятся как вторичный индекс, мы можем быстро запрашивать их и «помечать» как удалённые. Вот наша функция удаления после передачи ей порядкового номера.

func (d *DB) delete(n int) error {
        // open a connection to RocksDB
        ro := rocksdb.NewDefaultReadOptions()
        defer ro.Destroy()

        // find our offset to seek through for writing deletes
        hint, err := d.GetBytes(ro, []byte("seek_hint"))
        if err != nil {
                return err
        }

        it := d.NewIteratorCF(ro, d.seq)
        defer it.Close()

        // seek to the first key, this is a small
        // optimization to ensure we don't use `.SeekToFirst()`
        // since it has to skip through a lot of tombstones.
        if len(hint) > 0 {
                it.Seek(hint)
        } else {
                it.SeekToFirst()
        }

        seqs := make([][]byte, 0, n)
        keys := make([][]byte, 0, n)

        // look through our sequence numbers, counting up
        // append any data keys that we find to our set to be
        // deleted
        for it.Valid() && len(seqs) < n {
                k, v := it.Key(), it.Value()
                key := make([]byte, len(k.Data()))
                val := make([]byte, len(v.Data()))

                copy(key, k.Data())
                copy(val, v.Data())
                seqs = append(seqs, key)
                keys = append(keys, val)

                it.Next()
                k.Free()
                v.Free()
        }

        wb := rocksdb.NewWriteBatch()
        wo := rocksdb.NewDefaultWriteOptions()
        defer wb.Destroy()
        defer wo.Destroy()

        // preserve next sequence to be deleted.
        // this is an optimization so we can use `.Seek()`
        // instead of letting `.SeekToFirst()` skip through lots of tombstones.
        if len(seqs) > 0 {
                hint, err := strconv.ParseUint(string(seqs[len(seqs)-1]), 10, 64)
                if err != nil {
                        return err
                }

                buf := []byte(strconv.FormatUint(hint+1, 10))
                wb.Put([]byte("seek_hint"), buf)
        }

        // we not only purge the keys, but the sequence numbers as well
        for i := range seqs {
                wb.DeleteCF(d.seq, seqs[i])
                wb.Delete(keys[i])
        }

        // finally, we persist the deletions to our database
        err = d.Write(wo, wb)
        if err != nil {
                return err
        }

        return it.Err()
}

Для дальнейшей гарантии высокой скорости записи RocksDB не возвращается немедленно и не удаляет ключ (вы же помните, что таблицы SSTable неизменные!). Вместо этого RocksDB добавляет ключу «надгробие», которое затем удаляется в процессе уплотнения базы. Поэтому мы можем быстро признавать устаревшими записи во время последовательных операций записи и избегать засорения памяти при удалении старых элементов.

Обеспечение корректности данных


Мы уже обсудили, как обеспечивается скорость, масштабирование и дешёвый поиск в миллиардах сообщений. Остался последний фрагмент — как обеспечивается корректность данных при различных сбоях.

EBS-снимки и приложения


Чтобы защитить наши инстансы RocksDB от повреждения из-за ошибки программиста или сбоя в работе EBS, мы периодически делаем снимки каждого из наших жёстких дисков. Хотя EBS реплицируется сама по себе, но эта мера защищает от повреждения, вызванного неким внутренним механизмом. Если нам нужен конкретный инстанс — клиента можно поставить на паузу, а в это время соответствующий диск EBS открепляется, а затем повторно прикрепляется уже к новому инстансу. До тех пор, пока мы сохраняем неизменным ID раздела, повторное прикрепление диска остаётся вполне безболезненной процедурой, которая по-прежнему обеспечивает корректность данных.

В случае сбоя воркера мы полагаемся на встроенный в RocksDB журнал опережающей записи, чтобы не терять сообщения. Сообщения не допускаются из входного раздела, пока у нас нет гарантии, что RocksDB надёжно сохранила сообщение в журнале.

Чтение выходного раздела


Вы могли заметить, что до сего момента не было того «атомарного» шага, который позволяет гарантировать, что сообщения доставляются строго однократно. В любой момент есть вероятность сбоя нашего воркера: при записи в RocksDB, при публикации в выходной раздел или при подтверждении входящих сообщений.

Нам нужна атомарная точка «фиксации», которая будет однозначно покрывать транзакции для всех этих отдельных систем. Требуется некий «источник истины» для наших данных.

Вот где вступает в дело чтение из выходного раздела.

Если по какой-то причине происходит сбой воркера или ошибка в Kafka с последующим перезапуском, первым делом осуществляется сверка с «источником истины» по поводу того, произошло ли событие: этим источником является выходной раздел.

Если сообщение найдено в выходном разделе, но не в RocksDB (и наоборот), то воркер дедупликации сделает необходимую правку, чтобы синхронизировать базу данных и RocksDB. По сути, мы используем выходной раздел одновременно как журнал опережающей записи и конечный источник истины, в то время как RocksDB фиксирует и проверяет его.

В реальной работе


Сейчас наша система дедупликации работает в продакшне уже три месяца, и мы невероятно довольны результатами. Если в цифрах, то у нас:

  • 1,5 ТБ ключей хранится на диске в RocksDB
  • 4-недельное окно дедупликации перед признанием устаревшими старых ключей
  • примерно 60 млрд ключей хранится в инстансах RocksDB
  • 200 млрд сообщений проходят через систему дедупликации

Система в целом работает быстро, эффективно и устойчива к сбоям — и при этом с очень простой архитектурой.

В частности, у второй версии нашей системы есть ряд преимуществ перед старой системой дедупликации.

Раньше мы хранили все ключи в Memcached и использовали атомарный оператор проверки и установки значения записи CAS (check-and-set) для установки несуществующих ключей. Memcached выполняла роль точки фиксации и «атомарности» при публикации ключей.

Хотя такая схема довольно хорошо работала, но требовала большого объёма памяти, чтобы поместились все наши ключи. Более того, нужно было выбирать между принятием случайных сбоев Memcached или удвоением наших затрат на создание прожорливых к памяти отказоусточивых копий.

Схема Kafka/RocksDB даёт почти все преимущества старой системы, но с увеличением надёжности. Подводя итоги, вот главные достижения:

Хранение данных на диске: сохранение всего набора ключей или полная индексация в памяти были недопустимо дороги. Перенеся больше данных на диск и задействуя различные уровни файлов и индексов, мы смогли значительно снизить стоимость. Теперь мы можем переключаться при сбое на «холодное» хранилище (EBS), а не поддерживать работу дополнительных «горячих» инстансов на случай сбоя.

Выделение разделов: конечно, чтобы сузить наше поисковое пространство и избежать загрузки в память слишком большого количества индексов, требуется гарантия, что определённые сообщения направляются правильным воркерам. Выделения разделов в Kafka позволяет последовательно направлять эти сообщения по правильным маршрутам, так что мы можем намного более эффективно кэшировать данные и генерировать запросы.

Точное признание устаревших ключей: в Memcached мы бы установили TTL для каждого ключа, чтобы определять срок его жизни, а затем полагались бы на процесс Memcached для исключения ключей. В случае больших пакетов данных это угрожало бы нехваткой памяти и скачками в использовании CPU из-за большого количества исключений ключей. Поручив клиенту удаление ключей, мы можем изящно избежать проблемы, уменьшив «окно дедупликации».

Kafka как источник истины: чтобы по-настоящему избежать дедупликации при наличии нескольких точек фиксации нам нужно использовать источник истины, общий для всех наших клиентов внизу по потоку данных. Kafka в качестве такого «источника истины» работает на удивление здорово. В случае большинства сбоев (кроме сбоев самой Kafka) сообщения или записываются в Kafka, или не записываются. И использование Kafka гарантирует, что опубликованные сообщения доставляются надлежащим образом и реплицируются по дискам на многочисленных машинах, без необходимости хранить большое количество данных в памяти.

Пакетные чтение и запись: сделав пакетные операции I/O для вызовов к Kafka и RocksDB, мы смогли сильно повысить производительность, используя последовательные чтение и запись. Вместо случайного доступа, который был раньше с Memcached, мы добились гораздо более высокой пропускной способности за счёт повышения производительности дисков и хранения в памяти только индексов.

В целом, мы вполне довольны теми гарантиями, которые даёт созданная нами система дедупликации. Использование Kafka и RocksDB в качестве основы потоковых приложений всё больше и больше становится нормой. И мы с радостью продолжим разработку новых распределённых приложений на этом фундаменте.
Tags:
Hubs:
+21
Comments 16
Comments Comments 16

Articles