Pull to refresh
20
0
Alexey Bozhev @zealot_and_frenzy

Java Programmer

Send message
Если хотите бесконечный поток данных (push based), то я не вижу другого выхода. Кто-то ведь должен вызывать emitter.next у FluxSink. С точки зрения реактора можно воспользоваться шедулером.

Вариант проще — pull based подход через fluxSink.onRequest. Тут и нить выполнения можно изменить через subscribeOn. Правда, в этом случае сложно организовать «бесконечную» работу.

Более элегантного варианта я не знаю, благо есть гиттер, где в т.ч. отвечают оперативней.
Не до конца понял вопрос, но попробую ответить. Какого-то универсального рецепта нет, все упирается в данные. Если метод возвращает весь необходимый список объектов единовременно (не потоком) — вероятно это не Flux, а Mono. Hot Stream это скорее про какие-нибудь Event Listener на UI или системные ивенты, вебсокеты. Что-то, вызов чего мы не контролируем.

Да, в плане работы с JDBC — все очень плохо.


Но в случае чего, для работы с блокирующим источником, документация предлагает следующую конструкцию:


Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());

У нас в одном из сервисов есть вот такой метод:


public Mono<Boolean> createUserRelation(UserId objectId, UserId subjectId) {
    return Mono.fromSupplier(() -> {
        try {
            relationRepo.create(new UserRelationResource(objectId, subjectId));
            return true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }).publishOn(elastic);
}

Оба варианта: publishOn() и subscribeOn() позволяют изменять поток выполнения. В случае с Schedulers.elastic(), можно сказать что он похож на стандартный ExecutorService newCachedThreadPool().

Да, отличная статья. Прикрепил ссылку, правда, к выводу.

Перед RxJava 1? Очевидно есть. Перед RxJava 2? Отсутствие поддержки Java 6 + Android 2.3 + наследия RxJava 1, в связи с чем более "чистая" семантика. Лучшая поддержка фишек Java 8. Spring Boot 2, по умолчанию, предлагает использовать Reactor для реактивности, хотя это же Spring, поэтому можно подключить альтернативное решение.


Еще есть вот такой забавный твит, от "project lead of RxJava".

По поводу Flux.range. Там нет ошибки, просто так устроен оператор:


/**
* @param start the first integer to be emit
* @param count the total number of incrementing values to emit, including the first value
* @return a ranged {@link Flux}
*/
public static Flux<Integer> range(int start, int count) {}

Он выводит значения начиная со start и по count-1.

Это из-за особенности устройства Flux (и работы оператора flatMap()). Flux.empty() будет передан дальше по цепочке вызовов, а потом подхватится оператором switchIfEmpty(). Вместо тысячи слов:


Flux<String> emptyFlux = Flux.empty();
String lastValue = emptyFlux
        .flatMap((Function<String, Publisher<String>>) someValue -> {
            System.out.println("flatMap call");
            return Flux.just(someValue + "(is modified)");
        })
        .<String>switchIfEmpty(Flux.just(":("))
        .blockLast();
System.out.println("lastValue = " + lastValue);

Выведет грустный смайлик

Information

Rating
Does not participate
Location
Пенза, Пензенская обл., Россия
Date of birth
Registered
Activity