Pull to refresh
1007.14
OTUS
Цифровые навыки от ведущих экспертов

Микросервисы и RabbitMQ в Docker

Reading time12 min
Views14K
Original author: Usama Ashraf

Микросервисная архитектура предполагает декомпозицию монолитного приложения на несколько полностью независимо развёртываемых и масштабируемых сервисов. За пределами этого базового определения, то, что представляет собой микросервис, может быть несколько субъективным. Хотя есть несколько проверенных в бою практик, принятых такими гигантами, как Netflix и Uber, которые всегда следует принимать во внимание. И я расскажу о некоторых из них. 

В конечном итоге мы хотим разделить приложение на более мелкие приложения, каждое из которых представляет собой отдельную систему и занимается только одним аспектом всего приложения и делает это очень хорошо. Такая декомпозиция является важным шагом и может быть выполнена на основе субдоменов, которые должны быть правильно определены. Более мелкие приложения — более модульные и управляемые, с чётко определёнными границами, могут быть написаны с использованием разных языков/фреймворков. Они выходят из строя изолированно, так что всё приложение не падает (без SPOF). Возьмём пример с продажей билетов в кинотеатр:

Источник: https://codeburst.io/build-a-nodejs-cinema-api-gateway-and-deploying-it-to-docker-part-4-703c2b0dd269

Давайте посмотрим на это глобально:

i) Пользовательское приложение может быть мобильным клиентом, SPA и т. д. или любым клиентом, потребляющим наши бэкэнд-сервисы.

ii) Просить клиентов взаимодействовать с каждым из сервисов отдельно считается плохой практикой по причинам, которые я пытался объяснить в этом посте. Для этого и есть API-шлюзы: получать клиентские запросы, вызывать сервис(ы), возвращать ответ. Таким образом, клиенту приходится общаться только с одним сервером, создавая иллюзию монолита. Для разных типов клиентов (мобильных приложений, планшетов, браузеров и т. д.) может использоваться несколько шлюзов. Они отвечают за ограниченную функциональность: объединение ответов от сервисов, аутентификация, ACL. В больших приложениях, которые масштабируются и перемещаются динамически, шлюзы также должны иметь доступ к реестру сервисов, в котором хранятся местоположения экземпляров микросервисов, баз данных и т. д.

iii) У каждого сервиса собственное хранилище данных — это ключевой момент, обеспечивающий свободную связность. Некоторые запросы потребуют объединения данных, которые принадлежат нескольким сервисам. Чтобы избежать значительного падения производительности, данные можно реплицировать и разделить на шарды. Этот принцип не просто допускается в микросервисах, но и поощряется.

iv) REST-вызовы, сделанные в API-шлюз, передаются сервисам. Они, в свою очередь, общаются с другими сервисами и возвращают результат на шлюз — который компилирует его и отвечает клиенту. При одном клиентском запросе к приложению подобное общение между сервисами не должно происходить. В противном случае мы будем жертвовать производительностью из-за дополнительного HTTP-обмена, ради недавно введённой модульности.

В идеале, один запрос должен вызывать только один сервис для получения ответа. Это означает, что любые синхронные запросы между сервисами должны быть сведены к минимуму — а это не всегда возможно. При необходимости обычно используются такие механизмы, как gRPC, Thrift или даже простой HTTP (как в нашем примере). Как вы уже догадались, это означает, что данные должны реплицироваться между сервисами. Скажем, конечная точка GET /catalog/<<cityId>> также должна возвращать информацию о премьерах в каждом кинотеатре города на данный момент. При нашей новой стратегии премьеры придется хранить в базе данных и для сервиса Cinema Catalog. Отсюда следует пункт iii).

Асинхронное взаимодействие между сервисами

Допустим, премьеры меняются в результате какой-то CRUD-операции на сервисе Movies. Чтобы данные синхронизировались, это событие обновления должно быть вызвано и применено к сервису Cinema Catalog. Попробуйте представить себе микросервисы как кластер машин состояний, где обновления состояний можно передать по всему кластеру для достижения конечной согласованности. Конечно, не нужно ожидать, что конечным пользователям придётся ждать завершения запросов и жертвовать своим временем ради модульности в нашу пользу. Таким образом, все эти коммуникации должны быть неблокирующими. И здесь на помощь приходит RabbitMQ.

RabbitMQ — это мощный брокер сообщений, который реализует протокол обмена сообщениями AMQP. Если вкратце: сначала вы устанавливаете экземпляр сервера RabbitMQ (брокер) в системе. Затем программа Publisher / Producer (издатель / продюсер) подключается к этому серверу и отправляет сообщение. RabbitMQ ставит это сообщение в очередь и передает его одному или нескольким Subscriber / Consumer (подписчик / консьюмер), которые прослушивают сервер RabbitMQ.

Прежде чем перейти к сути статьи, я хочу сказать, что микросервисы гораздо сложнее; и мы не будем в этой статье подробно освещать такие важные темы, как:

Также мы не будем здесь подробно останавливаться на том, как решить, подойдут вам микросервисы или нет.

Принцип работы RabbitMQ

Если вкратце, сообщения публикуются (published) в обменник (exchange) внутри брокера RabbitMQ. Затем обменник распределяет копии сообщений по очередям (queues) на основе определенных разработчиком правил, называемых привязками (bindings). Эта часть пути сообщений называется маршрутизацией (routing). И эта непрямая маршрутизация обеспечивает неблокируемую передачу сообщений. Потребители (consumers), прослушивающие те очереди, которые получили сообщение, получат его. Довольно просто, верно?

Не совсем. Существует четыре различных типа обменников, и каждый из них, наряду с привязками, определяет алгоритм маршрутизации. «Алгоритм маршрутизации» означает то, как сообщения распределяются между очередями. Подробное описание каждого типа будет излишним, поэтому я остановлюсь на том, который мы будем использовать: topic exchange:

Чтобы обменник мог отправить сообщение в очередь, эта очередь должна быть привязана к обменнику. Мы можем создать несколько обменников с уникальными именами в явном виде. Однако, когда вы развёртываете RabbitMQ, он поставляется с обменником без имени по умолчанию. Каждая созданная очередь будет автоматически привязана к этому обменнику. Для наглядности я буду создавать именованный обменник вручную, а затем привязывать к нему очередь. Эта привязка определяется ключом привязки (binding key). То, как именно работает ключ привязки, зависит от типа обменника. Вот как это работает с topic exchange:

  • Очередь привязывается к обменнику с помощью строкового шаблона (ключа привязки).

  • Опубликованное сообщение доставляется в обменник вместе с ключом маршрутизации.

  • Обменник проверяет, какие очереди соответствуют ключу маршрутизации, основываясь на шаблоне ключа привязки, определённом ранее.

* может заменить ровно одно слово. # может заменять ноль или более слов.

Источник: https://www.rabbitmq.com/tutorials/tutorial-five-python.html

Любое сообщение с ключом маршрутизации "quick.orange.rabbit" будет доставлено в обе очереди. Однако сообщения с ключом "lazy.brown.fox" дойдут только до Q2. Сообщения с ключом маршрутизации, не совпадающим ни с одним шаблоном, будут потеряны.

Рассмотрим ещё два типа обмена:

  • Fanout-обмен: сообщения, отправленные в этот тип обмена, будут отправлены во ВСЕ очереди, связанные с ним. Ключ маршрутизации, если он был указан, будет полностью проигнорирован. Это может быть использовано, например, для транслирования обновлений глобальной конфигурации в распределённой системе.

  • Прямой обмен (самый простой): отправляет сообщение в очередь, ключ привязки которой в точности равен заданному ключу маршрутизации. Если очередь прослушивают несколько потребителей, то сообщения будут распределены между ними, поэтому она обычно используется для распределения задач между несколькими worker-ами по кругу.

Моя иллюстрация будет очень простой: приложение на Python Flask с единственной конечной точкой POST, которая при вызове будет якобы обновлять информацию о пользователе, отправлять сообщение брокеру RabbitMQ (разумеется, неблокируемое) и возвращать 201. Отдельный Go-сервис будет слушать сообщение от брокера и иметь возможность обновить свои данные. Все три сервиса будут размещены в отдельных контейнерах.

Настройка контейнеризованных микросервисов и брокера с помощью Docker Compose

Предоставление кучи контейнеров и всего, что с ними связано, может быть непростой задачей, поэтому я всегда полагаюсь на Docker Compose.

Вот весь код. Мы объявляем три сервиса, которые будут использоваться для трех контейнеров. Два volumes нужны для размещения кода внутри контейнеров:

# docker-compose.yml

version: "3.2"
services:
    rabbitmq-server:
        build: ./rabbitmq-server

    python-service:
        build: ./python-service
        # 'rabbitmq-server' will be available as a network reference inside this service 
        # and this service will start only after the RabbitMQ service does.
        depends_on:
            - rabbitmq-server
        # Keep it running.  
        tty: true
        # Map port 3000 on the host machine to port 3000 of the container.
        # This will be used to receive HTTP requests made to the service.
        ports:
            - "3000:3000"
        volumes:
            - './python-service:/python-service'

    go-service:
        build: ./go-service
        depends_on:
            - rabbitmq-server
        tty: true
        volumes:
            - './go-service:/go-service'

# Host volumes used to store code.
volumes:
    python-service:
    go-service:

Dockerfiles — это практически стандартные файлы из Docker Hub, к которым я добавил:

  • рабочую директорию /go-service в контейнере сервиса Go;

  • рабочую директорию /python-service в контейнере сервиса Python;

  • клиентскую библиотеку RabbitMQ для Go под названием amqp;

  • клиентскую библиотеку RabbitMQ для Python, Pika & Flask.

Наше приложение Flask имеет только одну конечную точку, которая получает user_id и full_name — они будут использоваться для обновления профиля пользователя. Затем сообщение об этом обновлении будет отправлено брокеру RabbitMQ.

# main.py

from flask import Flask
from flask import request
from flask import jsonify
from services.user_event_handler import emit_user_profile_update

app = Flask(__name__)

@app.route('/users/<int:user_id>', methods=['POST'])
def update(user_id):
    new_name = request.form['full_name']

    # Update the user in the datastore using a local transaction...

    emit_user_profile_update(user_id, {'full_name': new_name})

    return jsonify({'full_name': new_name}), 201

Логика передачи событий другим сервисам всегда должна быть отделена от остальной части приложения, поэтому я вынес её в модуль. Обменник должен явно проверяться и создаваться как издателем, так и подписчиком, потому что мы не можем знать, какой сервис запускается первым. Это хорошая практика, которую большинство клиентских библиотек RabbitMQ легко поддерживают:

# services/user_event_handler.py

import pika
import json

def emit_user_profile_update(user_id, new_data):
    # 'rabbitmq-server' is the network reference we have to the broker, 
    # thanks to Docker Compose.
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq-server'))
    channel    = connection.channel()

    exchange_name = 'user_updates'
    routing_key   = 'user.profile.update'

    # This will create the exchange if it doesn't already exist.
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)

    new_data['id'] = user_id

    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=json.dumps(new_data),
                          # Delivery mode 2 makes the broker save the message to disk.
                          # This will ensure that the message be restored on reboot even  
                          # if RabbitMQ crashes before having forwarded the message.
                          properties=pika.BasicProperties(
                            delivery_mode = 2,
                        ))

    print("%r sent to exchange %r with data: %r" % (routing_key, exchange_name, new_data))
    connection.close()

Пусть вас не сбивает с толку channel (канал). Канал — это просто виртуальное, лёгкое соединение внутри TCP-соединения, предназначенное для предотвращения открытия нескольких дорогостоящих TCP-соединений. Особенно в многопоточных средах.

Параметр durable гарантирует, что обменник будет сохранён на диске и может быть восстановлен, если брокер по какой-либо причине упадёт или отключится от интернета. Издатель (Python сервис) создаёт обменник с именем user_updates и отправляет ему обновлённые данные пользователя с user.profile.update в качестве ключа маршрутизации. Он будет сопоставлен с ключом привязки user.profile.*, который определит наш сервис Go:

// main.go

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        // Exit the program.
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    // 'rabbitmq-server' is the network reference we have to the broker, 
    // thanks to Docker Compose.
    conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-server:5672/")
    failOnError(err, "Error connecting to the broker")
    // Make sure we close the connection whenever the program is about to exit.
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    // Make sure we close the channel whenever the program is about to exit.
    defer ch.Close()

    exchangeName := "user_updates"
    bindingKey   := "user.profile.*"

    // Create the exchange if it doesn't already exist.
    err = ch.ExchangeDeclare(
            exchangeName,   // name
            "topic",        // type
            true,           // durable
            false,
            false,
            false,
            nil,
    )
    failOnError(err, "Error creating the exchange")

    // Create the queue if it doesn't already exist.
    // This does not need to be done in the publisher because the
    // queue is only relevant to the consumer, which subscribes to it.
    // Like the exchange, let's make it durable (saved to disk) too.
    q, err := ch.QueueDeclare(
            "",    // name - empty means a random, unique name will be assigned
            true,  // durable
            false, // delete when the last consumer unsubscribes
            false, 
            false, 
            nil,   
    )
    failOnError(err, "Error creating the queue")

    // Bind the queue to the exchange based on a string pattern (binding key).
    err = ch.QueueBind(
            q.Name,       // queue name
            bindingKey,   // binding key
            exchangeName, // exchange
            false,
            nil,
    )
    failOnError(err, "Error binding the queue")

    // Subscribe to the queue.
    msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer id - empty means a random, unique id will be assigned
            false,  // auto acknowledgement of message delivery
            false,  
            false,  
            false,  
            nil,
    )
    failOnError(err, "Failed to register as a consumer")


    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body)

            // Update the user's data on the service's 
            // associated datastore using a local transaction...

            // The 'false' indicates the success of a single delivery, 'true' would
            // mean that this delivery and all prior unacknowledged deliveries on this
            // channel will be acknowledged, which I find no reason for in this example.
            d.Ack(false)
        }
    }()

    fmt.Println("Service listening for events...")

    // Block until 'forever' receives a value, which will never happen.
    <-forever
}

RabbitMQ использует порт 5672 по умолчанию для соединений без TLS и "guest" в качестве имени пользователя и пароля. Вы можете изучить множество доступных опций конфигурации и как их использовать с Pika и Go amqp.

Вам может быть интересно, для чего нужна эта строка: d.Ack(false)

Она сообщает брокеру, что сообщение было доставлено, успешно обработано и может быть удалено. По умолчанию эти подтверждения происходят автоматически. Но мы указали обратное, когда подписывались на очередь: ch.Consume.

Теперь, если сервис на Go выйдет из строя (по любой непредвиденной причине), подтверждение отправлено не будет, и это заставит брокера вернуть сообщение в очередь, чтобы оно могло получить ещё один шанс на обработку.

Запуск микросервисов

Итак, давайте запустим их:

docker-compose up

Когда все три сервиса будут созданы (в первый раз это займёт не менее нескольких минут), проверьте их имена с помощью команды docker ps:

Откройте два новых терминала, войдите по SSH в контейнеры Python и Go, используя соответствующие имена контейнеров, и запустите серверы:

docker exec -it microservicesusingrabbitmq_python-service_1 bashFLASK_APP=main.py python -m flask run --port 3000 --host 0.0.0.0

docker exec -it microservicesusingrabbitmq_go-service_1 bash
go run main.go

Откройте третий терминал, чтобы отправить POST-запрос. Я буду использовать Curl:

curl -d "full_name=usama" -X POST http://localhost:3000/users/1

И вы увидите передачу:

В любой момент вы также можете войти по SSH в контейнер RabbitMQ и посмотреть:

  • rabbitmqctl list_exchanges (список всех обменников на этом узле брокера)

  • rabbitmqctl list_queues (список всех очередей на этом узле брокера)

  • rabbitmqctl list_bindings (список всех привязок на этом узле брокера)

  • rabbitmqctl list_queues name messages_ready messages_unacknowledged (список всех очередей с указанием количества сообщений, которые готовы к доставке клиентам, но еще не доставлены, и доставленных, но еще не подтверждённых)

Как я уже говорил вначале, это ни в коем случае не глубокое погружение в микросервисы. Есть много вопросов, и я попытаюсь ответить на один из них: как сделать эту коммуникацию транзакционной? Что произойдёт, если наш Go-сервис (подписчик) выбросит исключение при обновлении состояния на своей стороне? Нам нужно убедиться, что событие обновления откатится по всем сервисам, которые были затронуты этим событием. Представьте, насколько сложной может оказаться эта задача, если у нас несколько микросервисов и тысячи таких «событий обновления». По сути, нам нужно будет включить отдельные события, выполняющие откат.

В нашем случае, если сервис Go выбросит исключение при обновлении данных, он должен будет отправить сообщение обратно сервису Python, чтобы тот откатил обновление. Также важно отметить, что в случае подобных ошибок доставка сообщения должна быть подтверждена (даже если обработка не была успешной), чтобы сообщение не было повторно поставлено в очередь брокером. При написании нашего подписчика мы должны будем решить, какие ошибки означают, что сообщение должно быть повторно поставлено в очередь (повторная попытка), а какие означают, что сообщение не должно быть повторно поставлено в очередь и просто откатывается назад.

Но как указать, какое событие обновления нужно откатить и как именно будет происходить откат? Для обеспечения такой согласованности данных широко используется паттерн Saga вместе с паттерном event sourcing.

Несколько слов о проектировании брокера

Рассмотрим две вещи: типы используемых обменов и способ группировки обменников.

Если вам нужно транслировать определённые типы сообщений всем сервисам в системе, обратите внимание на тип обмена fanout. Тогда одним из способов группировки обменников может быть группировка на основе событий, например — три fanout-обменника с именами user.profile.updated, user.profile.deleted, user.profile.added. Возможно, это не совсем то, что вам нужно, поскольку в итоге у вас получится слишком много обменников и вы не сможете фильтровать сообщения для конкретных потребителей без создания нового обменника, очереди и привязки.

Другим способом может быть создание topic exchanges в терминах сущностей в системе. Так, в нашем первом примере user, movie, cinema и т.д. могут быть сущностями, и очереди, привязанные к user, могут использовать такие ключи привязки, как:

  • user.created — получить сообщение, когда пользователь создан;

  • user.login — получить сообщение, когда пользователь вошёл в систему;

  • user.roles.grant — получить сообщение о том, что пользователю была предоставлена роль авторизации;

  • user.notify — отправить пользователю уведомление и так далее.

Всегда используйте маршрутизацию для фильтрации и доставки сообщений определённым потребителям. Писать код, отбрасывающий одни входящие сообщения и принимающий другие, — это антипаттерн. Подписчики должны получать только те сообщения, которые им необходимы.

Наконец, если ваши потребности сложны и требуют отфильтровать сообщения для определённых потребителей на основе множества свойств, используйте обмен заголовками.


В заключение приглашаем всех желающих на открытый урок, на котором продолжим говорить о RabbitMQ, теперь в сравнении с Kafka. Обсудим, как эти брокеры устроены, что могут; их слабые, сильные стороны и технические ограничения. Записаться на урок можно на странице онлайн-курса «RabbitMQ для разработчиков и администраторов».

Tags:
Hubs:
Total votes 17: ↑16 and ↓1+15
Comments15

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS