Pull to refresh

Пример использования RabbitMQ Delayed Message Exchange в Java Spring Framework

Reading time 6 min
Views 31K
image В этом посте я хочу показать как использовать отложенные сообщения в RabbitMQ. В качестве примера задачи, где удобно использовать отложенную очередь, возьму механизм постбеков (s2s ping back, s2s pixel).

В двух словах о механизме постбеков:


1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут

Для повторного уведомления я буду использовать отложенную очередь.

RabbitMQ по умолчанию не умеет задерживать сообщения, они доставляются сразу после публикации. Функционал отложенной доставки доступен в виде плагина rabbitmq-delayed-message-exchange.

Сразу хочу отметить, что плагин экспериментальный. Не смотря на то, что в целом он достаточно стабилен, использовать в продакшене его нужно на свой страх и риск.

Собираем Docker контейнер с RMQ и плагином


За основу я возьму официальный образ с management plugin, пригодится для тестирования.

Dockerfile:

FROM rabbitmq:3.6-management
RUN apt-get update && apt-get install -y curl
RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

Сборка
docker build --tag=x25/rmq-delayed-message-exchange .

Запуск
docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange

Spring AMQP


Spring Framework полностью поддерживает плагин в проекте spring-rabbit. Начиная с версии 1.6.4 можно пользоваться как xml/bean конфигурациями так и аннотациями.

Я буду использовать Spring Boot Amqp Starter.

Зависимость для Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Зависимость для Gradle
compile "org.springframework.boot:spring-boot-starter-amqp"

Конфигурация через аннотации


При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message<?> message) {
    //...
}

И при запуске приложения RabbitAdmin автоматически объявит delayed exchange, queue, создаст обработчики событий и привяжет их к аннотированному методу.

Нужно больше потоков для обработки сообщений? Это настраивается через файл внешней конфигурации (свойство spring.rabbitmq.listener.concurrency) или через параметр containerFactory у аннотации:

Пример
//Создаем конфигурацию:
@Configuration
public class RabbitConfiguration {
    @Bean(name = "containerFactory")
    @Autowired
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(10);
        factory.setPrefetchCount(30);
        return factory;
    }
}

//Добавляем параметр:
@RabbitListener(containerFactory = "containerFactory", ...)

Для отправки отложенного сообщения удобно использовать RabbitTemplate:

rabbitTemplate.send(
        DELAY_EXCHANGE_NAME,
        DELAY_QUEUE_NAME,
        MessageBuilder
                .withBody(data)
                .setHeader("x-delay", MESSAGE_DELAY_MS).build()
);

Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке x-delay. Максимально допустимое время задержки (2^32-1) мс.

Готовый пример приложения:

@SpringBootApplication
public class Application {

    private final Logger log = LoggerFactory.getLogger(Application.class);

    private final static String DELAY_QUEUE_NAME = "delayed.queue";
    private final static String DELAY_EXCHANGE_NAME = "delayed.exchange";

    private final static String DELAY_HEADER = "x-delay";
    private final static String NUM_ATTEMPT_HEADER = "x-num-attempt";
    private final static long   RETRY_BACKOFF = 5000;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
    exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
    public void onMessage(Message<byte[]> message) {

        String endpointUrl = new String(message.getPayload());
        Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L);

        log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt);

        if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) {
            rabbitTemplate.send(
                    DELAY_EXCHANGE_NAME,
                    DELAY_QUEUE_NAME,
                    MessageBuilder
                            .withBody(message.getPayload())
                            .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF)
                            .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1)
                            .build()
            );
        }
    }

    private boolean doNotifyEndpoint(String url) {
        try {
            HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
            /* @todo: set up connection timeouts */
            return (connection.getResponseCode() == 200);
        } catch (Exception e) {
            log.error(e.getMessage());
            return false;
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

application.yml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      prefetch: 10
      concurrency: 50

build.gradle
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'rmq-delayed-demo'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-amqp")
    testCompile("org.springframework.boot:spring-boot-starter-test")
}

Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:

image

Логи:

2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1
2016-12-21 14:27:25.941: Connection refused (Connection refused)
2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2
2016-12-21 14:27:30.951: Connection refused (Connection refused)
2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3

Конфигурация через XML


При использовании XML конфигураций нужно просто установить у exchange-бина свойство delayed в true и RabbitAdmin сделает все остальное за вас.

Пример xml-конфигурации в связке с Integration Framework
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <int:channel id="to-delayed-rmq" />

    <int-amqp:outbound-channel-adapter channel="to-delayed-rmq"
                                       amqp-template="rabbitTemplate"
                                       exchange-name="delayed.exchange"
                                       routing-key="delayed.binding"
                                       mapped-request-headers="x-delay" />

    <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue"
                                      queue-names="delayed.queue"
                                      message-converter="amqpMessageConverter"
                                      connection-factory="rabbitConnectionFactory"
                                      concurrent-consumers="10"
                                      prefetch-count="50" />

    <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage">
        <bean id="postbackServiceActivator" class="PostbackServiceActivator" />
    </int:service-activator>

    <rabbit:queue name="delayed.queue" />

    <rabbit:direct-exchange name="delayed.exchange" delayed="true">
        <rabbit:bindings>
            <rabbit:binding queue="delayed.queue" key="delayed.binding" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

Полезные ссылки


Tags:
Hubs:
+14
Comments 1
Comments Comments 1

Articles