2ГИС corporate blog
Java
Development for Android
July 2014 1

Реактивное программирование под Android

Отказоустойчивость, отзывчивость, ориентированность на события и масштабируемость — четыре принципа нынче популярного реактивного программирования. Именно следуя им создаётся backend больших систем с одновременной поддержкой десятков тысяч соединений.

Отзывчивость, простота, гибкость и расширяемость кода — принципы, которые можно закрепить за реактивным UI.

Наверняка, если совместить реактивные backend и UI, то можно получить качественный продукт. Именно его мы и попытались сделать, разрабатывая 2GIS Dialer — звонилки, которая работает через API и при этом должна оставаться быстрой и удобной.




Зачем нам реактивное программирование


Рассмотрим пример:

requestDataTask = new AsyncTask<Void, Void, JSONObject>() {
            @Override
            protected JSONObject doInBackground(Void... params) {
                final String requestResult = apiService.getData();
                final JSONObject json = JsonUtils.parse(requestResult);
                lruCache.cacheJson(json);
                return json;
            }
        };

Тут всё просто, мы создаем AsyncTask, в котором:

  1. Делаем запрос к API 2ГИС.
  2. СоздаемJSONObject на основе результата запроса.
  3. Кэшируем JSONObject.
  4. Возвращаем JSONObject.

Подобный код встречается во многих проектах, он понятен, а миллионы леммингов не могут ошибаться. Но давайте копнём чуть глубже:

  1. Что делать, если где-то во время выполнения выпал Exception?
  2. doInBackground(Void...) выполняется в отдельном потоке, как нам сказать пользователю об ошибке в UI? Заводить поля для Exception?
  3. А что возвращать, если не прошел запрос? null?
  4. А если json не валидный?
  5. Что стоит делать, если не удалось кэшировать объект?

И ведь это не самый сложный пример. Представьте, что вам надо сделать ещё один запрос на основе результатов предыдущего. На AsyncTask’ах это будет callback-hell, который, как минимум, будет неустойчив к падениям, ошибкам и т.д.

Вопросов больше, чем ответов. О недостатках AsyncTask’ов можно написать целую статью, серьезно. Но есть ли варианты лучше?

Фреймворк RxJava


Оглядываясь на принципы реактивного программирования мы начали искать решение, которое обеспечит:

  • отсутствие зависаний и тормозов;
  • масштабируемость на ресурсы смартфона;
  • отсутствие крэшей;
  • ориентированность на события.

Таковым стала RxJava от ребят из Netflix — reactive extension, идея (но не реализация) которого перекочевала из reactive extension for c#.

В RxJava всё крутится вокруг Observable. Observable — это как потоки данных (ещё их можно рассматривать как монады), которые могут каким-либо образом получать и отдавать эти самые данные. Над Observable’ами можно применять операции, такие как flatmap, filter, zip, merge, cast и т.д.

Простой пример:

//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
        .subscribeOn(Schedulers.newThread()) //отдаем новый тред для работы в background
        .observeOn(AndroidSchedulers.mainThread()) //говорим, что обсервить хотим в main thread
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                //do something with result
            }
        });

Мы создаем Observable, который поочередно отдает нам числа из Iterable. Указываем, что генерация и передача данных будет происходить в отдельном треде, а обработка результата — в main thread. Подписываемся на него, и в методе подписчика производим любые манипуляции с каждым следующим результатом.

Можно сделать этот пример более интересным:

//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)).
                //оператор фильтрации для отсеивания ненужных результатов
                filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0; //выражение верно только для четных чисел
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        //do something with ONLY EVEN result
                    }
                });

Теперь, указав оператор filter, мы можем обрабатывать только чётные значения.

Как используют RxJava


Вернёмся к нашему первому AsyncTask и посмотрим, как бы мы решили задачу с помощью реактивного программирования.
Для начала создадим Observable с запросом:

//Observable, действия которого основанны на переданной ему Observable.OnSubscribe<String>
Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  //сообщить сабскрайберу о том, что есть новые данные
                  subscriber.onNext(apiService.getData());
                  //А теперь сообщаем о том, что мы закончили и данных больше нет
                  subscriber.onCompleted();
              }
          });

Тут мы создаем Observable и специфицируем его поведение. Делаем запрос и отдаем результат в onNext(...), после чего говорим Subscriber’у, что мы закончили, вызвав onCompleted().

С этим понятно: мы создали Observalble, который отвечает только за получение объекта String с API. SRP в чистом виде.
Что, если запрос не прошёл по каким-то причинам? Тогда мы можем позвать у Observable метод retry(...), который будет повторять этот самый Observable n раз, пока он не завершится успешно (читай, без Exception). Кроме того, мы можем отдать Observable’у другой Observable, если даже retry() не помог. Если backend написан криво, то лучше бы нам закрывать соединение по таймауту. И у нас есть метод timeout(...) на этот случай. Всё вместе это выглядело бы так:

final Subscription subscription =
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext(apiService.getData());
                  subscriber.onCompleted();
              }
          })
          .timeout(5, TimeUnit.SECONDS) //указываем таймаут операции в секундах
          .retry(3) // делаем 3 попытки запроса
          //назначаем обработчик в случае, если все таки мы не спасли положение
          .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
              @Override
              public Observable<? extends String> call(Throwable throwable) {
                  //return new observable here, that can rescure us from error
              }
          });

И немного рефакторинга:

final Subscription subscription =
          createApiRequestObservable() //создали Observable с запросом
          .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
          .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
          .onErrorResumeNext(createRequestErrorHandler()); // назначили обработчик ошибки

Теперь займемся созданием json. Для этого результат нашего первого Observable (а там String) надо преобразовать. Используем map(...), и, если что-то вдруг пойдет не так, вернем другой, нужный нам в случае неудачи, json с помощью onErrorReturn(...).
Вот так:

final Subscription subscription =
          createApiRequestObservable()
          .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
          .retry(RETRY_COUNT_FOR_REQUEST)
          .onErrorResumeNext(createRequestErrorHandler())
          //модифицируем Observable, чтобы тот преобразовывал String в JSONObject
          .map(new Func1<String, JSONObject>() {
              @Override
              public JSONObject call(String s) {
                  return JsonUtils.parse(s);
              }
          })
          //снова ставим обработчик ошибки
          //и вернем предустановленный "ошибочный" json
          .onErrorReturn(new Func1<Throwable, JSONObject>() {
              @Override
              public JSONObject call(Throwable throwable) {
                  return jsonObjectForErrors;
              }
          });

Ок, с json разобрались. Осталось кэширование. Кэширование: это не преобразование результата, а действие над ним. Для этого у Observable есть методы doOnNext(...), doOnEach(...) и т.д. Получается как-то так:

final Subscription subscription =
          createApiRequestObservable()
          .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
          .retry(RETRY_COUNT_FOR_REQUEST)
          .onErrorResumeNext(createRequestErrorHandler())
          //модифицируем Observable, чтобы тот преобразовывал String в JSONObject
          .map(new Func1<String, JSONObject>() {
              @Override
              public JSONObject call(String s) {
                  return JsonUtils.parse(s);
              }
          })
          //снова ставим обработчик ошибки
          //и вернем предустановленный "ошибочный" json
          .onErrorReturn(new Func1<Throwable, JSONObject>() {
              @Override
              public JSONObject call(Throwable throwable) {
                  return jsonObjectForErrors;
              }
          })
          //процедура, вызывающаяся при каждом onNext(..) от Observable
          .doOnNext(new Action1<JSONObject>() {
              @Override
              public void call(JSONObject jsonObject) {
                  lruCache.cacheJson(jsonObject);
              }
          });

Снова немного отрефакторим код:

final Subscription subscription =
          createApiRequestObservable() //создали Observable с запросом
          .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
          .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
          .onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
          .map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
          .onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
          .doOnNext(createCacheOperation()); //кэшируем JSONObject

Мы почти закончили. Как в самом первом примере с RxJava, добавим обработчик результата и укажем треды, в которых надо исполняться.
Финальная версия:

final Subscription subscription =
          createApiRequestObservable() //создали Observable с запросом
          .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
          .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
          .onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
          .map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
          .onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
          .doOnNext(createCacheOperation()); //кэшируем JSONObject
          .subscribeOn(Schedulers.newThread()) //делаем запрос, преобразование, кэширование в отдельном потоке
          .observeOn(AndroidSchedulers.mainThread()) // обработка результата - в main thread
          .subscribe(subscriber); //обработчик результата

Давайте посмотрим, чего мы тут добились:

  1. Принцип отказоустойчивости в действии: результат выполнения всех операций всегда предсказуем. Мы знаем обо всех ошибках и потенциально проблемных местах, которые могут возникнуть в коде, и уже обработали их. Никаких исключений не будет.
  2. Принцип отзывчивости в действии: соединение с базой или сервером не зависнет благодаря таймауту, попытается сам восстановиться при ошибке и, что тоже важно, вернет результат сразу, до кэширования. А кэширование в doOnNext выполнится параллельно обработке результата.
  3. Принцип ориентированности на события в действии: по ходу выполнения запроса и парсинга, мы всегда реагируем на события — события успешного/неуспешного завершения запроса, событие окончания парсинга json (2 реакции: обработка в UI и обработка в бэкграунд трэде для кэширования) и т.д. Кроме того, можно несколько раз подписываться на один Observable и держать в консистентном состоянии всю систему.
  4. Код легко расширяем и почти не требует изменений. Если нам необходимо сделать логирование ошибки или сохранение стэктрейс, можно добавить метод doOnError(Throwable thr). Хотите отфильтровать результаты — добавьте оператор filter и реализуйте его поведение.

Как и недостатки AsyncTask’ов, преимущества этого подхода, на мой взгляд, можно перечислять очень долго. Последний из принципов реактивного программирования, принцип масштабируемости, продемонстрируем ниже.

RxJava в 2GIS Dialer


Живой пример:

//создаем новый Observable путем комбинирования четырех других
final Observable<AggregatedContactData> obs = Observable.combineLatest(
                  createContactsObservable(context), //Observable для получения контактов из базы
                  createPhonesObservable(context), //Observable для получения всех телефонов контактов
                  createAccountsObservable(context), //Observable для полуения аккаунтов и контактов по ним
                  createJobsObservable(context), //Observable для получения мест работы контактов
                  contactsInfoCombinator() //функция комбинироваия результатов всех Observable выше
          ).onErrorReturn(createEmptyObservable()).cache() //обработчик ошибки и оператор кэширования
          .subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //для выполнения такой задачи потребуется тред пул
          .observeOn(AndroidSchedulers.mainThread()); // обработка данных как всегда - в main thread


  1. Тут происходит сразу много интересного и посложнее описанного выше:
    Первое, что бросается в глаза, это Observable.combineLatest(...). Этот оператор ждет onNext(...) от всех переданных ему Observable’ов и применяет функцию комбинирования сразу ко всем результатам. Может показаться сложным, но картинка из вики RxJava сделает всё понятнее. Самое важное тут, что каждый из Observable, переданных в Observable.combineLatest(...) — это CursorObservable, который передает в свой onNext(...) новый курсор, как только он меняется в базе данных. Таким образом, на любое обновление любого из четырех курсоров выполняется функция комбинирования, что позволяет всегда поставлять самые свежие данные. Это и есть принцип ориентированности на события.
  2. Если что-то пошло не так, то мы исходя из своих нужд возвращаем требуемое. В данном случае Collections.emptyList();
  3. Оператор cache() очень полезен, если на этот Observable могут быть подписаны сразу несколько Subscribers. Если этот оператор применен к Observable, то новый его подписчик мгновенно получает данные, при условии, что эти данные уже были посчитаны для подписавшихся ранее. Таким образом, все желающие имеет актуальные одинаковые данные.
  4. А вот тут видно принцип масштабируемости: в subscribeOn(...) я отдаю тред пул на 4 треда, чтобы каждый из 4х моих Observable выполнялся в отдельном треде с целью максимизации скорости, всю остальную заботу берет на себя RxJava. То есть задействованы будут все 4 процессора, при наличие оных.

Как видите, потенциал у реактивного программирования огромный, а фукнционал RxJava реализует его в достаточной мере.

Проблемы, с которыми мы столкнулись


Всё, продемонстрированное выше и намного больше в том или ином виде используется у нас в дайлере. И вот с какими проблемами мы столкнулись:

  • Проблема OOM. Наивно полагать, что Android может дать много тредов для многопоточной работы. При количестве тредов больше 15, даже топовые смартфоны начинали “задумываться”, а их мелкие собратья и вовсе падали с OutOfMemoryError. Решение было простым. Ввести CachedThreadPool для этих дел и проблема решена.
  • Кэширование запросов. Речь не про оператор cache() из примера выше. Хотелось бы, чтобы следующий запрос на тот же самый url сразу брался из кэша. В RxJava такого нет. В принципе это правильно, потому что реактивность и кэш — две разные вещи. Поэтому мы написали свой кэш.

Что еще?


Мы увидели, как классно реактивно работать с многопоточностью и запросами в Android. Но это далеко не всё. Например, можно подписываться на изменение Checkable или EditText (это из коробки идет в RxJava для Android). Тут всё просто, но ужасно удобно.
Кстати, одной RxJava реактивное программирование под Java не ограничивается. Существуют и другие библиотеки, например, Bolts-Android.
Кроме этого, сейчас активно разрабатывается Reactive-Streams, который призван унифицировать работу с разными реактивными провайдерами в java.

Вывод


Понравилось ли нам? Однозначно. Реактивные приложения действительно гораздо устойчивее к багам и падениям, код становится понятным и гибким (были бы лямбды — был бы еще и красивым). Много рутинной работы перекладывается на библиотеку, которая выполняет свою работу лучше, чем нативные Android-компоненты. Это позволяет сосредоточиться на реализации вещей, которые действительно стоит обдумать.

Реактивное программирование — это немного другое мышление по сравнению с традиционной разработкой под Android. Потоки данных, функциональные операторы — эти сложные, на первый взгляд, вещи оказываются намного проще, если разобраться. Стоит немного поработать с реактивным программированием, и мозг начинает перестраиваться с объектов и состояний на монады и операторы над ними. Это такая большая, добрая, мощная частичка ФП в ООП, которая делает жизнь и код проще, а приложение лучше. Попробуйте, не пожалеете.

Ссылки, которые помогут вам разобраться с реактивным программированием или просто могут оказаться интересными:


Небольшое отступление. Если вы разделяете наши взгляды на программирование и создание продуктов, то приходите — будем рады вас видеть в команде 2GIS Dialer.
+46
86.8k 398
Comments 59
Top of the day