Альфа-Банк corporate blog
JavaScript
Programming
ReactJS
July 16

ReactiveX Redux

Все, кто работает с Redux, рано или поздно сталкиваются с проблемой асинхронных действий. Но современное приложение разработать без них невозможно. Это и http-запросы к бэкенду, и всевозможные таймеры/задержки. Сами создатели Redux говорят однозначно — по умолчанию поддерживается только синхронный data-flow, все асинхронные действия необходимо размещать в middleware.

Конечно, это слишком многословно и неудобно, поэтому тяжело найти разработчика, который пользуется одними только “нативными” middleware. На помощь всегда приходят библиотеки и фреймворки, такие как Thunk, Saga и им подобные.

Для большинства задач их вполне хватает. Но что если нужна чуть более сложная логика, чем отправить один запрос или сделать один таймер? Вот небольшой пример:

async dispatch => {
   setTimeout(() => {
      try {
         await Promise
            .all([fetchOne, fetchTwo])
            .then(([respOne, respTwo]) => {
                dispatch({ type: 'SUCCESS', respOne, respTwo });
             });
      } catch (error) {
          dispatch({ type: 'FAILED', error });
      }   
   }, 2000);
}

На такой код больно даже смотреть, а поддерживать и расширять просто невозможно. Что делать, когда нужна более сложная обработка ошибки? А вдруг понадобится повтор запроса? А если я захочу переиспользовать эту функцию?

Меня зовут Дмитрий Самохвалов, и в этом посте я расскажу, что такое концепция Observable и как применять её на практике в связке с Redux, а еще сравню всё это с возможностями Redux-Saga.

Как правило, в таких случаях берут redux-saga. ОК, перепишем на саги:

try {
    yield call(delay, 2000);
    const [respOne, respTwo] = yield [
       call(fetchOne),
       call(fetchTwo)
    ];
    yield put({ type: 'SUCCESS', respOne, respTwo });
} catch (error) {
    yield put({ type: 'FAILED', error });    
}

Стало заметно лучше — код почти линейный, лучше выглядит и читается. Но расширять и переиспользовать по-прежнему трудно, потому что сага такой же императивный инструмент, как и thunk.

Есть и другой подход. Это именно подход, а не просто очередная библиотека для написания асинхронного кода. Он называется Rx (они же Observables, Reactive Streams и т.п.). Воспользуемся им и перепишем пример на Observable:

action$
  .delay(2000)
  .switchMap(() => 
     Observable.merge(fetchOne, fetchTwo)
       .map(([respOne, respTwo]) => ({ type: 'SUCCESS', respOne, respTwo }))
       .catch(error => ({ type: 'FAILED', error }))

Код не просто стал плоским и уменьшился в объеме, изменился сам принцип описания асинхронных действий. Теперь мы не работаем непосредственно с запросами, а выполняем операции над специальными объектами под названием Observable.

Observable удобно представлять как функцию, которая отдает поток (последовательность) значений. У Observable есть три основных состояния — next (“отдай следующее значение”), error (“произошла ошибка”) и complete (“значения закончились, отдавать больше нечего”). В этом плане он немного напоминает Promise, но отличается тем, что по этим значениям можно итерироваться (и в этом одна из суперспособностей Observable). Обернуть в Observable можно все что угодно — таймауты, http-запросы, DOM-события, просто js объекты.



Второй суперсилой Observable являются операторы. Оператор — это функция, которая принимает и возвращает Observable, но производит какие-то действия над потоком значений. Ближайшая аналогия — map и filter из javascript (кстати, такие операторы есть в Rx).



Наиболее полезными лично для меня были операторы zip, forkJoin и flatMap. На их примере легче всего объяснить работу операторов.

Оператор zip работает очень просто — он принимает на вход несколько Observable (не более 9) и возвращает в виде массива значения, которые они испускают.

const first = fromEvent("mousedown");
const second = fromEvent("mouseup");


zip(first, second)
    .subscribe(e => 
       console.log(`${e[0].x} ${e[1].x}`));



//output
[119,120]
[120,233]
…

В общем виде работу zip можно представить схемой:



Zip используется, если у вас есть несколько Observable и вам необходимо согласованно получать от них значения (при том, что они могут испускаться с разными интервалами, синхронно или нет). Он очень полезен при работе с DOM-событиями.

Оператор forkJoin похож на zip за одним исключением — он возвращает только последние значения от каждого Observable.



Соответственно, его разумно использовать, когда нужны только конечные значения из потока.
Немного сложнее оператор flatMap. Он принимает на вход Observable и возвращает новый Observable, и мапит значения из него в новый Observable, используя либо функцию-селектор, либо другой Observable. Звучит запутанно, но на схеме все довольно просто:



Еще нагляднее в коде:

const observable = of("Hello");

const promise = value => 
new Promise(resolve => resolve(`${value} World`);

observable
  .flatMap(value => promise(value))
  .subscribe(result => console.log(result));

//output
"Hello World"

Наиболее часто flatMap используется в запросах к бэкенду, наряду со switchMap и concatMap.
Каким же образом можно использовать Rx в Redux? Для этого есть замечательная библиотека redux-observable. Ее архитектура выглядит так:



Все Observable, операторы и действия над ними оформляются в виде специального middleware, который называется epic. Каждый epic принимает на вход action, оборачивает его в Observable и должен вернуть action, также в виде Observable. Возвращать обычный action нельзя, это создает бесконечный цикл. Напишем небольшой epic, который делает запрос к апи.

const fetchEpic = action$ => 
    action$
      .ofType('FETCH_INFO')
      .map(() => ({ type: 'FETCH_START' }))
      .flatMap(() => 
        Observable
          .from(apiRequest)
          .map(data => ({ type: 'FETCH_SUCCESS', data }))
          .catch(error => ({ type: 'FETCH_ERROR', error }))
      )

Невозможно обойтись без сравнения redux-observable и redux-saga. Многим кажется, что они близки по функциональности и возможностям, но это совсем не так. Саги — целиком императивный инструмент, по сути набор методов для работы с сайд-эффектами. Observable это принципиально другой стиль написания асинхронного кода, если хотите, другая философия.

Я написал несколько примеров для иллюстрации возможностей и подхода к решению задач.

Допустим, нам нужно реализовать таймер, который будет останавливаться по действию. Вот как это выглядит на сагах:

while(true) {
   const timer = yield race({
     stopped: take('STOP'),
     tick: call(wait, 1000)
   })

   if (!timer.stopped) {
      yield put(actions.tick())
   } else {
      break
   }
}

Теперь используем Rx:

interval(1000)
   .takeUntil(action$.ofType('STOP'))


Допустим, есть задача реализовать запрос с отменой на сагах:

function* fetchSaga() {
  yield call(fetchUser);
}

while (yield take('FETCH')) {
  const fetchSaga = yield fork(fetchSaga);
  yield take('FETCH_CANCEL');        
  yield cancel(fetchSaga);
}

На Rx все проще:

switchMap(() => fetchUser())
  .takeUntil(action$.ofType('FETCH_CANCEL'))

Напоследок мое любимое. Реализовать запрос к апи, в случае неудачи сделать не более 5 повторных запросов с задержкой в 2 секунды. Вот что имеем на сагах:

for (let i = 0; i < 5; i++) {
    try {
      const apiResponse = yield call(apiRequest);
      return apiResponse;
    } catch (err) {
      if(i < 4) {
        yield delay(2000);
      }
    }
  }
  throw new Error(); 
}

Что получится на Rx:

.retryWhen(errors => 
     errors
       .delay(1000)
       .take(5))

Если суммировать плюсы и минусы саги, получится такая картина:



Саги просты в освоении и очень популярны, поэтому в комьюнити можно найти рецепты почти на все случаи жизни. К сожалению, императивный стиль мешает использовать саги по-настоящему гибко.

Совсем другая ситуация у Rx:



Может показаться, что Rx это волшебный молоток и серебряная пуля. К сожалению, это не так. Порог входа в Rx заметно выше, поэтому тяжелее вводить нового человека в проект, активно использующий Rx.

Кроме того, при работе с Observable особенно важно быть внимательным и всегда хорошо понимать, что происходит. Иначе можно наткнуться на неочевидные ошибки или неопределенное поведение.

action$
  .ofType('DELETE')
  .switchMap(() => 
     Observable
       .fromPromise(deleteRequest)
       .map(() => ({ type: 'DELETE_SUCCESS'})))

Однажды я написал epic, который делал довольно простую работу — при каждом action с типом ‘DELETE’ вызывался метод API, который производил удаление элемента. Однако при тестировании возникли проблемы. Тестировщик жаловался на странное поведение — иногда при нажатии на кнопку удаления не происходило ничего. Оказалось, что оператор switchMap поддерживает выполнение только одного Observable в момент времени, своего рода защита от race condition.

В качестве итога приведу несколько рекомендаций, которым следую сам и призываю следовать всем, кто начинает работу с Rx:

  • Будьте внимательны.
  • Изучайте документацию.
  • Проверяйте в sandbox.
  • Пишите тесты.
  • Не стреляйте из пушки по воробьям.

+20
3.1k 35
Comments 31