Comments
Спасибо за статью! Разве у томката нельзя настроить размер тред пула и если он весь исчерпан складывать сообщения в очередь на обработку?
Конечно можно, но если внешние ресурсы деградируют, то это как снежный ком (старые запросы в ожидании, новые порождают еще большую нагрузку на внешние системы). Очередь будет разрастаться и коннекты начнут отпадать по таймауту на уровне клиента/nginx так и не попав в обработку.

Проблема еще кроется в том, что очередь общая и есть запросы, которые не требуют обращения к внешним ресурсам и быстро обрабатываются. Они также начнут падать. Особенно остро это касается health-чеков k8s, которые отваливались находясь в очереди и прибивали поду.
Так это у вас с перфомансом проблемы были, а не с потоками.
Почитайте что-нибудь по теории массового обслуживания.
Очередь может увеличиваться, только если ресурсов меньше, чем требуется.
Надо было открыть профайлер и разобраться почему 40 запросов в секунду, которые не считают параметры чёрных дыр, так нагружают сервера

И правильно, что health-check прибивается — какой же это health, хотя «быстрые» запросы можно на уровне прокси пропускать.

Я уверен, что пока вы свою архитектуру переделывали, кто-то в вашей команде тупо пофиксил перформанс. А вы этого даже и не заметили.

Параллельные запросы просто убили. Это же настоящий антипаттерн — противоположность паттерну DTO
При нормальных условия все работает хорошо, но, к сожалению, мир не идеален и в любой момент что-то может пойти не так, например деградация одного или нескольких сервисов, к которым мы делаем запросы.
Конечно же можно залить это ресурсами, если таковых хватит, но эффективность использования ресурсов при этом упадет ниже плинтуса.

Если последовать вашему же совету и посмотреть информацию о потоках в момент проблемы, то можно увидеть кучу тредов которые просто ждут ответа от проблемного апстрима и, кроме этого, ничего полезного не делают.
Кстати, подобную, пусть и не такую драматическую картину с потоками можно увидеть и в нормальной ситуации, если у вас много внешних вызовов.
При использовании webflux и неблокирующих операций мы эффективнее используем потоки, они не висят в ожидании ответа от сервиса, но начинают обслуживать другие запросы и затуп одного апстрима не повлияет на скорость работы той части системы, которая с ним не связана.

Не понял, что же правильно в том, что рабочее приложение ложится по health check? Мы же тем самым только усугубляем ситуацию.

Что касается множества запросов, то этим мы:
  1. Ускоряем загрузку фронта. Когда мы получаем всю информацию для страницы в одном запросе, то время ответа будет равно времени самого медленного ресурса, к которому мы сходили за данными, в нашем же случае фронт отрисовывается по мере получения порций данных.
  2. Отказоустойчивость. Если один из ресурсов так и не ответил — не беда, мы отрисуем страницу с доступной информацией и покажем клиенту, что вот эти данные сейчас недоступны попробуйте позже.
  3. Проще управлять и развивать.
Ну вы точно также могли бы и на бэкенде подождать ответа и, если ответ не пришёл, то записать null в соответствующее поле DTO. Ваше решение выглядит как перекладывание проблем с бакэнда на UI.

У UI разработчиков и так (как правило) самая сложная работа, так теперь им ещё
надо ломать голову о том, как правильно ошибки обрабатывать от ваших 700 микросервисов.
В общем случае null <> ошибка. Конечно, можно это обойти, но выглядеть это в модели данных будет не очень.

Основной набор данные фронт получает от одного бека, который в свою очередь при необходимости ходит к другим сервисам, поэтому с форматами ошибок проблем нет.

Подождать ответа на беке можно, но вы как-то забыли о клиенте в этой истории, а он будет все это время ждать отрисовки страницы, хотя ему возможно и не нужна та информация, которая предоставляется затупившим сервисом.

Вообще это конечно две крайности (1 эндпоит который тащит все vs 100500 эндпоинтов на каждый чих), а правда я считаю находится где-то посередине и это разумное разделение, при котором фронт может запросить ровно те данные, которые ему нужны в данный момент.
Если сервис затупил, то ничего ждать не надо. Ваш сервис фасад просто пометит его как недоступный и не будет обращаться некоторое время.
Будет вместо актуальных данных какой-нибудь null вставлять, либо старые данные

Отдача кэшированных данных для нас огромная проблема ибо каналов информирования клиента об изменении ключевых данных очень много и клиент, что логично, пытается зайти и увидеть эти данные. Если мы начнем после уведомления показывать старые данные — это проблема.

По circuit breaker.
Это все отлично работает при параллельных запросах, а не последовательных, логика формирования ответа, который зависит от нескольких источников очень сильно усложняется в ситуациях, когда вам нужно поддерживать более 1 версии API с разной логикой и форматом.
Почему на сервлетах >3.1 программа начинает пухнуть, захлебываясь в переключениях контекста, а rx этой проблемы лишен? Кажется, при большом количестве подключений сервис также должен начать деградировать, причем на первый взгляд кажется, что порядок запросов должен быть сравним
С приходом спецификации Servlet 3.1 мы получили NIO при обработки запросов, но проблема в том, что в WebMVC API до сих пор много блокирующего кода, стандартные модули Spring'а и драйвера в них также блокирующие. По своей сути перехода на контейнеры Servlet 3.1 вообще ничего не меняют, у нас также куча блокировок, но зато теперь мы можем возвращать CompletableFuture в контроллерах…

не пользуйте R2DBC в продакшене, он медленный и еще не стабильный.
для постгреса наилучший вариант https://vertx.io/docs/vertx-pg-client/java/


Как вы делаете привязку различных логов к одному запросу? Тк запрос может скакать по потокам то MDC к примеру не работает. У нас я сделал в лоб через


Hooks.onEachOperator(...);

Оно нормально работает, но меня напрягает что логи пишутся в некоторых случаях, а оборачиваем в свою логику по определению и выставлению MDC мы для кадого Mono/Flux

Насколько мне известно, это пока единственный вариант поддержки MDC, его мы и используем.
Для варианта с корутинами есть kotlinx-coroutines-slf4j. Пока мы его не пробовали, но что-то мне подсказывает, что там не все так гладко с интеграцией между CoroutineContext и ReactorContext.

Спасибо за отзыв по R2DBC, у нас есть мотивация его протестировать, вероятно будет материал для статьи по этому поводу.

Это не особо удобно, сравните


public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
    return signal -> {
        if (!signal.isOnNext()) return; 
        Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY"); 

        toPutInMdc.ifPresentOrElse(tpim -> {
            try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { 
                logStatement.accept(signal.get()); 
            }
        },
        () -> logStatement.accept(signal.get())); 
    };
}

с просто log.info('some'):
так же зачастую надо логировать не только на onNext но и внутри flatMapи тогда приходиться закручивать все в


Mono.subscriberContext()
                .flatMap(context -> {
                    val mdcValues = signal.getContext().getOrEmpty("MDC_KEY");
                    MDC.setContextMap(mdcValues);
                    ...
                });

или map тогда надо брать и переделывать ее на flatMap
вообщем вместо простой вещи получается лапша за которой прячется бизнес логика.

Это не особо удобно, сравните

Это один раз засовывается в методы в какой-нибудь util-либе и везде потом используется. У нас это выглядит примерно так:
.doOnEach(mdcNext(() -> log.info("in")))
//какие-то действия
.doOnEach(mdcComplete(() -> log.info("out")))
.doOnEach(mdcError(e -> log.error("thrown {}", e.getMessage())))

так же зачастую надо логировать не только на onNext но и внутри flatMap

Тут согласен, в таких ситуациях приходится оборачивать в Mono.subscriberContext(), но такое встречается нечасто и несильно напрягает.
Основной плюс такого подхода — минимум оверхеда.

У нас в коде наоборот, onNext не особо надо логировать, а вот в местах бизнес логики такого много.


Тут согласен, в таких ситуациях приходится оборачивать в Mono.subscriberContext(), но такое встречается нечасто и несильно напрягает.
А можно чуть подробнее про R2DBC? Вопрос для нас действительно актуальный. Насколько проседает производительность, есть ли бенчмарки по R2DBC?
И что с ним в плане стабильности? Недавно вышел spring boot 2.3 в него как раз вошел r2dbc, по всей видимости ребята из pivotal посчитали его готовым к использованию в продакшене.

Не впервые слышу про медлительность R2DBC, но пока не видел тому подтверждений, при все этом его использование много приятнее альтернатив учитывает интеграцию со spring data.

Вот в подобной теме ссылки дали
https://habr.com/ru/post/500446/#comment_21678412


или вот еще пример https://github.com/r2dbc/r2dbc-postgresql/issues/138


Я пытался к проду прикрутить где то пару месяцев назад и вылазили странные ошибки. Например возникала ошибка что данных для Моно есть несколько а не одно как должно быть Ошибки поправили, но мне не хотелось выкатывать такое нестабильное поведение на прод.
Поэтому просто сделал прослойку которая заворачивает vert.x постгрес клиент в Mono/Flux и живу горя не зная.

Мы не являемся высоконагруженным сервисом и обслуживаем в довольно спокойном темпе около 30-40 запросов в секунду на трёх активных подах (по 1 процессору и 2 Гб ОЗУ).

Тоесть вы запускали многопоточное приложение на 1 процессоре и, удивительно, оно работало не очень. Конечно, подход с event loop работает лучше в таких условиях. Только вот это уже не Kotlin, а не пойми что
    .onErrorContinue { exception, obj ->
        handleException(obj, message, exception)
    }
    .onErrorResume {
        handleException(null, message, it)
        Flux.empty()
    }

Например, нет exception-ов. Чем отличается onErrorContinue от onErrorResume неясно. Ну тоесть вы добровольно отказались от возможностей языка и пишете программы на языке фреймворка.
Только вот это уже не Kotlin, а не пойми что
Например, нет exception-ов.

Тут я вас не понял, это все тот же Kotlin. Exception-ы есть, но также и дополнительный механизм работы с ним. Если вы, к примеру, используете Java Streams — это тоже не Java?

Тоесть вы запускали многопоточное приложение на 1 процессоре и, удивительно, оно работало не очень.

Тут вопрос не в том сколько процессоров, а в том что его утилизация была в районе плинтуса. Мы можем налить туда процессоров, памяти, поднять стоимость эксплуатации, но зачем если можно гораздо эффективнее утилизировать ресурсы.
Если вы, к примеру, используете Java Streams — это тоже не Java?

Ну Java Streams прямо скажем такая себе, очень спорная Java
Exception-ы есть, но также и дополнительный механизм работы с ним.

ну а зачем они нужны, если есть уже exception-ы, стандартный механизм. Вот смотрите, ваш фреймворк вводит новые понятия для программиста onErrorContinue, onErrorResume, в дополнение к exception-ам, про которые программисту надо думать и где он может допустить ошибку. Поэтоу я и говорю, у вас программа получается не на котлине, а на языке фреймворка, который навязывает другую модель обработки ошибок (по-другому и быть собственно не может раз он весь из себя неблокируюший и коллбэчный).
Тут вопрос не в том сколько процессоров, а в том что его утилизация была в районе плинтуса.

Ну тоесть проблема возможно была в какой-то блокировке где-то, раз утилизация процессора была низкой. Вам было лень разбираться (что неудивительно совсем, фик там разберешь где там в spring-е чего блокируется) и вы решили переписать в неблокирующем виде. И вдруг повезло в процессе переписывания блокировка ушла, ну повезло, это гут

Расскажу за свой пример никак не связанный с авторским.
Есть сервис который должен отсылать сотни пуш нотификаций на мобильные устройства в секунду. и есть внешний провайдер через который это надо делать, с ним подписан контракт и на его мобильный сдк завязано приложение и с него не спрыгнуть. И для этого провайдера вполне нормальная ситуация когда он вместо ~100ms на запрос делает по 6-20 сек. И пофиг сколько я процессоров накидаю в микросервис, все просто будет отжирать память и ждать ответа. Так же забиваются хттп потоки для веб сервера и он просто не сможет обрабатывать новые входящие сообщения. И простое добавление реактивщины избавляет от этих проблем — тк надо всего пару потоков на асинхронный прием запроса, обработки и потом асинхронный запрос к провайдеру.


Наш сервис может обслуживать кучу клиентов, памяти жрем в разы меньше, инстансов нужно меньше.

Нам очень полезно логировать входящие/исходящие запросы, так как это существенно облегчает коммуникацию со внешними системами при выяснении, из-за чего возникла та или иная ошибка. Насколько я понимаю, в реактивном подходе это означает блокировку.
Логируете ли вы запросы? Я имею ввиду тело запроса.

Да, действительно, есть проблема с логированием тела запроса к другому ресурсу, но в нашем случаем мы вполне обходимся логированием входного DTO на уровне клиента (передаваемый как аргумент). Заголовки можно достать через фильтр.


Для входящих запросов мы обходимся логами на UI. Приходится идти на компромиссы.

Only those users with full accounts are able to leave comments. Log in, please.