Pull to refresh

Грокаем RxJava, часть вторая: Операторы

Reading time 6 min
Views 94K
Original author: Dan Lew
В первой части мы с вами рассмотрели основные строительные блоки RxJava, а также познакомились с оператором map(). Я могу понять тех из вас, кто всё ещё не чувствует желания всё бросить и начать использовать этот фреймворк, так как пока что мы, условно выражаясь, рассмотрели лишь вершину айсберга. Но скоро всё переменится — большая часть всей мощи RxJava скрывается в её операторах, и я как раз подготовил для вас пример, по которому можно изучить некоторую их часть.

Задача


Предположим, что у меня есть такой вот метод:

// Возвращает список url'ов, основываясь на поиске по содержимому веб-страницы
Observable<List<String>> query(String text); 

Я хочу написать систему для поиска и отображения текста. Основываясь на том, что мы изучили в предыдущем уроке, мы можем написать нечто подобное:

query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });

Это решение никоим образом нас не удовлетворяет потому, что мы теряем возможность трансформировать поток данных. Если у нас возникнет желание модифицировать каждый url, нам придётся делать всё это в Subscriber, оставляя, таким образом, все наши трюки с map() не у дел.
Можно было бы написать map(), который работал бы с одним списком url'ов, и выдавал бы наружу список измененных url'ов, но в таком случае каждый вызов map() содержал бы в себе for-each. Тоже не очень-то и красиво.

Проблеск надежды


Применим метод Observable.from(), который берёт коллекцию и «испускает» один элемент этой коллекции за другим:

Observable.from("url1", "url2", "url3")
    .subscribe(url -> System.out.println(url));

Похоже на то, что нам нужно, давайте попробуем воспользоваться им в нашей задаче:

query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });

От цикла мы избавились, но, что получилось в итоге, выглядит как полный бардак: вместо цикла мы получили вложенные друг в друга подписки. И плохо не только то, что код выглядит запутанно и потому его скорее всего будет трудно модифицировать; он конфликтует с некоторыми особенностями RxJava, о которых я ещё не упоминал1. Мда.

Есть способ получше


Затаите своё дыхание при виде спасителя: flatMap().
Observable.flatMap() принимает на вход данные, излучаемые одним Observable, и возвращает данные, излучаемые другим Observable, подменяя таким образом один Observable на другой. Неожиданный поворот событий, так сказать: вы думали, что получаете один поток данных, а получаете на самом деле другой. Вот как flatMap() поможет нам решить нашу проблему:

query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));

Я показал полную версию для того, чтобы вам было проще разобраться в происходящем, но, если переписать код с лямбдами, то выглядеть он начинает просто замечательно:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));

Довольно странная штука, если призадуматься. Зачем flatMap() возвращает другой Observable? Ключевой момент тут в том, что новый Observable — это то, что увидит в итоге наш Subscriber. Он не получит List<String>, он получит поток индивидуальных объектов класса String так, как он получил бы от Observable.from().
Между прочим, этот момент показался мне самым сложным, но, как только я его понял и осознал, большая часть того, как работает RxJava, встала в моей голове на свои места.

И можно сделать кое-что более крутое


Подчеркну ещё раз, потому что это важно: flatMap() может вернуть нам любой Observable, какой вы только захотите.
Например, у меня есть второй метод:

// Возвращает заголовок вебсайта, или null, если мы получили ошибку 404
Observable<String> getTitle(String URL);

Вместо того, чтобы печатать url'ы, я теперь хочу печатать заголовок каждого сайта, до которого удалось достучаться. Есть проблемы: мой метод принимает только один url, и он не возвращает String, он возвращает Observable, который возвращает String.
Можно решить обе эти проблемы с flatMap(); сначала мы перейдём от списка url'ов к потоку индивидуальных url'ов, а потом используем getTitle() внутри flatMap() прежде чем передать окончательный результат в Subscriber:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String url) {
            return getTitle(url);
        }
    })
    .subscribe(title -> System.out.println(title));

И ещё раз упростим всё с помощью лямбд:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));

Здорово, да? Мы объединяем вместе несколько не зависящих друг от друга методов, которые возвращают нам Observables.
Обратите внимание на то, каким образом я объединил вместе два вызова API в одну цепочку. То же самое можно проделать для любого количества обращений к API. Вы наверняка знаете, насколько сложно порой бывает скоординировать работу нескольких вызовов API для того, чтобы получить в итоге некоторый нужный нам результат: сделали запрос, получили результат в функции обратного вызова, уже изнутри неё сделали новый запрос… Брр. А здесь мы взяли и обошли весь этот ад стороной, уложив всю ту же самую логику в один короткий реактивный вызов2.

Изобилие операторов


Пока что мы взглянули лишь на два оператора, но их в RxJava на самом деле гораздо больше. Как ещё можно улучшить наш код?
getTitle() возвращает null, если мы получили ошибку 404. Мы не хотим выводить на экран "null", и мы можем отфильтровать ненужные нам значения:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .subscribe(title -> System.out.println(title));

filter() «испускает» тот же самый элемент потока данных, который он получил, но только если он проходит проверку.
А теперь мы хотим показать только 5 результатов, не больше:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .subscribe(title -> System.out.println(title));

take() возвращает не больше заданного количества элементов (если в нашем случае получилось меньше 5 элементов, take() просто-напросто завершит свою работу раньше.
Знаете, а давайте-ка будем ещё и сохранять каждый полученный нами заголовок на диск:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(title -> saveTitle(title))
    .subscribe(title -> System.out.println(title));

doOnNext()позволяет нам добавить некоторое дополнительное действие, происходящее всякий раз, как мы получаем новый элемент данных, в данном случае этим действием будет сохранение заголовка.
Взгляните на то, как легко нам манипулировать потоком данных. Можно добавлять всё новые и новые ингридиенты к вашему рецепту, и не получить в итоге неудобоваримую бурду.
RxJava поставляется с вагоном и маленькой тележкой разнообразных операторов. Такой огромный список может и напугать, но его стоит просмотреть хотя бы для того, чтобы иметь представление о том, что есть в наличии. У вас уйдёт некоторое время для того, чтобы запомнить доступные вам операторы, но, как только вы это сделаете, вы обретёте истинную силу на кончиках ваших пальцев.
Да, кстати, вы также можете писать свои собственные операторы! Эта тема выходит за рамки данного цикла статей, но, скажем так: если вы придумаете свой собственный оператор, вы почти наверняка сможете реализовать его3.

И что дальше?


Хорошо, понял, вы скептик и убедить вас опять не получилось. Зачем вам вообще беспокоиться обо всех этих операторах?

Идея №3: Операторы позволяют вам делать с потоком данных всё, что угодно


Единственное ограничение находится в вас самих.
Можно написать довольно сложную логику манипулирования данными, не используя ничего, кроме цепочек простых операторов. Это и есть функциональное реактивное программирование. Чем чаще вы им пользуетесь, тем сильнее изменяется ваше представление о том, как должен выглядеть программный код.
Также, подумайте о том, как легко было представить наши данные конечному потребителю после того, как мы трансформировали их. Под конец нашего примера мы делали два запроса к API, обрабатывали данные, и заодно сохраняли их на диск. Но наш конечный Subscriber не имеет об этом ни малейшего представления, он всё так же работает с обычным Observable<String>. Инкапсуляция делает код более простым!
В третьей части мы пройдёмся по другим крутым особенностям RxJava, которые связаны с манипуляцией данными в меньшей степени: обработка ошибок и параллелизм.

Перейти к третьей части.


1 Так, например, обработка ошибок, многопоточность и отмена подписок в RxJava сочетаются с этим кодом чуть менее чем никак. Я затрону эти темы в третьей части.
2 А вот тут вы, возможно, задумались о другой стороне callback hell: обработка ошибок. Я рассмотрю это в третьей части.
3 Если вы хотите написать свои собственные операторы, вам стоит посмотреть вот сюда. Некоторые детали их имплементации, правда, будут довольно сложны для понимания, до прочтения третьей части статьи.
Tags:
Hubs:
+17
Comments 8
Comments Comments 8

Articles