Pull to refresh

Comments 12

Огромное Вам, человеческое, спасибо! Весьма годная подборка для начинающего. На русском материала по RxJS практически нет. Вы осветили крайне полезные (по крайней мере, для меня) юзкейсы.
Можно же последний снипет так
Observable.concat(updateProduct$, updateTags$, updateCategories$).toArray().subscribe(res => console.log(res))
Ну или в случае с from
Observable.from([updateProduct$, updateTags$, updateCategories$]).concatAll().toArray().subscribe(res => console.log(res))
Ну или с of
Observable.of(updateProduct$, updateTags$, updateCategories$).concatAll().toArray().subscribe(res => console.log(res))
Вопрос такой — а есть ли аналогично takeUntil оператор что то вроде restartWhen?

Задача такая — UI получает observable из метода getSmth, соответственно хотелось бы в методе saveSmth вызвать next и чтобы все подписки сходили еще раз на бакенд и получили новые данные
Я пока что реализовал примерно то что мне надо руками, но вдруг уже есть готовое решение.

Ужасы какие :-) Давайте я вам лучше покажу, как то же самое реализуется с использованием ОРП...


При открытии страницы example.com/#/users/42, по userId получить данные пользователя.

ФРП:


ngOnInit() {
  this.route.params
    .pluck('userId') // получаем userId из параметров
    .switchMap(userId => this.userService.getData(userId))
    .subscribe(user => this.user = user);
}

ОРП:


// example.com/#user=123
user() {
    return this.user_serice().data( $mol_state_arg.value( 'user' ) )
}

Можно поддержать и формат урлов из задачи, но суть не поменяется.


При открытии страницы example.com/#/users/42?regionId=13 нужно выполнить функцию load(userId, regionId). Где userId мы получаем из роутера, а regionId — из параметров запроса.

ФРП:


ngOnInit() {
  Observable.combineLatest(this.route.params, this.route.queryParams)
    .subscribe(([params, queryParams]) => { // полученный массив деструктурируем
      const userId = params['userId'];
      const regionId = queryParams['regionId'];
      this.load(userId, regionId);
    });
}

ОРП:


// example.com/#user=123/region=456
data() {
    return this.load( $mol_state_arg.value( 'user' ) , $mol_state_arg.value( 'region' ) )
}

Можно поддержать и формат урлов из задачи, но опять же суть не поменяется.


Показать значок загрузки после начала сохранения данных и скрыть его, когда данные сохранятся или произойдет ошибка.

ФРП:


save() {
  this.loading = true;
  this.userService.save(params)
    .finally(() => this.loading = false)
    .subscribe(user => {
      // Успешно сохранили
    }, error => {
      // Ошибка сохранения
    });
}

ОРП:


@ $mol_mem
save() {
    return this.user_service().save( params )
}

Да, реально, больше ничего делать не надо — анимация сама начнётся, когда начнётся запрос, закончится, когда запрос завершится, и будет нарисована ошибка в случае ошибки.


Создать переменную lang$ в configService, на которую другие компоненты будут подписываться и реагировать, когда язык будет меняться.

ФРП:


lang$: BehaviorSubject<Language> = new BehaviorSubject<Language>(DEFAULT_LANG);
setLang(lang: Language) {
  this.lang$.next(this.currentLang); // тут мы поставим
}

private subscriptions: Subscription[] = [];
ngOnInit() {
  const langSub = this.configService.lang$
    .subscribe(() => {
      // ...
    });
  this.subscriptions.push(langSub);
}
ngOnDestroy() {
  this.subscriptions
    .forEach(s => s.unsubscribe());
}

ОПР:


@ $mol_mem
lang( next = 'en' ) { return next }

title() {
    return this.language_service().text( this.lnag() , 'title' )
}

Да-да, подписки/отписки — не наша забота, всё будет работать как надо.


Показывать предложения страниц при вводе данных на форме

ФРП:


ngOnInit() {
  this.form.valueChanges
    .takeUntil(this.ngUnsubscribe)      // отписаться после разрушения
    .map(form => form['search-input'])  // данные инпута
    .distinctUntilChanged()             // брать измененные данные
    .debounceTime(300)                  // реагировать не сразу
    .switchMap(this.wikipediaSearch)    // переключить Observable на запрос в Вики
    .subscribe(data => console.log(data));
}

wikipediaSearch = (text: string) => {
  return Observable
    .ajax('https://api.github.com/search/repositories?q=' + text)
    .map(e => e.response);
}

ОРП:


suggests() {
    return $mol_http.resource( 'https://api.github.com/search/repositories?q=' + this.query() ) ).json().items
}

О подписках/отписках заботиться не надо, актуальное значение всегда есть, при изменении запроса возвращаться будут только данные для актуального запроса, а предыдущие запросы будут автоматически прибиваться. Задержки разве что тут нет, но она реализуется не тут, а компоненте поля ввода.


Необходимо закешировать Observable запрос

ФРП:


private tagsCache$ = this.getTags()
  .publishReplay(1, 2000) // кешируем одно значение на 2 секунды
  .refCount()             // считаем ссылки
  .take(1);               // берем 1 значение

getCachedTags() {
  return tagsCache$;
}

ОРП:


@ $mol_mem
tags() {
    const resource = $mol_http.resource( '/tags' )

    // сбрасываем кеш через 2 секунды
    setTimeout( ()=> resource.json( undefined , $mol_atom_force ) , 2000 )

    return resource.json()
}

Так как кеширование происходит по умолчанию, то задача своидится к противоположной — сбросить кэш через 2 секунды.


Критическая ситуация на сервере! Backend команда сообщила, что для корректного обновления продукта нужно выполнять строго последовательно: Обновление данных продукта (заголовок и описание); Обновление списка тегов продукта; Обновление списка категорий продукта.

ФРП:


const updateProduct$ = this.productService.update(product);
const updateTags$ = this.productService.updateTags(productId, tagList);
const updateCategories$ = this.productService.updateCategories(productId, categoryList);

Observable
  .from([updateProduct$, updateTags$, updateCategories$])
  .concatMap(a => a)  // выполняем обновление последовательно
  .toArray()          // Возвращает массив из последовательности
  .subscribe(res => console.log(res)); // res содержит массив результатов запросов

ОРП:


update_product() { return this.product_service().update( this.product() ) }
update_tags() { return this.product_service().update_tags( this.product() , this.tags() ) }

update_categories() { return this.product_service().update_categories( this.product() , this.categories() ) }

@ $mol_mem
updating() {
    console.log(
        this.update_product().valueOf() ,
        this.update_tags().valueOf() ,
        this.update_categories().valueOf() ,
    )
}

Без valueOf() все запросы пошли бы параллельно. Поэтому мы немедленно требуем результат, чтобы следующий запрос не начался до получения результата предыдущего.


Загадка на посошок

Ну и у меня для вас задачка:


У меня есть список игрушек, у каждой игрушки есть свойства. Свойства могут меняться. Есть функция фильтрации, которая может быть сложной и тяжёлой и которая тоже может меняться динамически. Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит. Я так понимаю, каждое свойство должно быть стримом и надо как-то подписаться на заданные свойства всех игрушек. Как это лучше всего сделать?


У меня пока получилось следующее:


const ToysSource = new Rx.BehaviorSubject( [] )
const Toys = ToysSource.distinctUntilChanged().debounce( 0 )

const FilterSource = new Rx.BehaviorSubject( toy => toy.count > 0  )
const Filter = FilterSource.distinctUntilChanged().debounce( 0 )

const ToysFiltered = Filter
.select( filter => {
  if( !filter ) return Toys
  return Toys.map( toys => toys.filter( filter ) )
} )
.switch()
.distinctUntilChanged()
.debounce( 0 )

Но тут, очевидно, при любом изменении игрушек будет происходить повторная фильтрация. Например: фильтруем по числу остатков, а меняется цена — происходит повторная фильтрация, что не хорошо.

> Хотелось бы, чтобы перефильтрация происходила лишь тогда, когда меняются свойства, от которых результат фильтрации реально зависит.

А ничего, что это алгоритмически неразрешимая задача? filter toy = if arithmetic_is_consistent then test(toy.property1) else test(toy.property2)

Всмысле неразрешимая? ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене. Уверен такое можно сделать и на Rx через хитрую комбинацию операторов.

> Всмысле неразрешимая?

Всмысле, нет никакого способа узнать, какие поля требуются для фильтра, не запустив сам фильтр. Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

> ОРП же позволяет выстроить потоки данных так, что изменение цены не будет приводить к перефильтрации, пока мы не фильтруем по цене.

Либо не позволяет, либо я не понимаю, что вы имеете в виду.
Смотрите выше пример — надо ли перефильтровывать при изменении property1? а property2?

Допустим у фильтра есть метаданные, в которых указано какие свойства он проверяет, тогда можно заранее сказать какие при изменении каких свойств нужно провести перефильтрацию.


Либо не позволяет, либо я не понимаю, что вы имеете в виду.

Во время предыдущей фильтрации отслеживается к каким свойствам было обращение. Изменение этих свойств или фильтра приведёт к перефильтрации. Изменение любых других — не приведёт.

> Допустим у фильтра есть метаданные

Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

> Во время предыдущей фильтрации отслеживается к каким свойствам было обращение.

А если при том же фильтре в разных случаях обращение к разным полям?
Рассчитать программно эти метаданные нельзя, то есть их добавляет к фильтру программист?

Агась.


А если при том же фильтре в разных случаях обращение к разным полям?

Если там не Math.random(), а зависит от какого-либо реактивного свойства, то всё будет ок — при изменении этого свойства будет перефильтрация.

Only those users with full accounts are able to leave comments. Log in, please.