Как стать автором
Обновить
1047.95
OTUS
Цифровые навыки от ведущих экспертов

kotlinx.coroutines 1.4.0: представляем StateFlow и SharedFlow

Время на прочтение5 мин
Количество просмотров9.2K
Автор оригинала: Anton Arhipov

В преддверии старта курса "Kotlin Backend Developer" приглашаем всех желающих записаться на открытый урок по теме "Kotlin multiplatform: front/back на одном языке".


А сейчас предлагаем к прочтению традиционный перевод статьи.


Сегодня мы с радостью объявляем о выходе версии 1.4.0 библиотеки Kotlin Coroutines . Основными новшествами этого релиза стали StateFlow и SharedFlow, которые теперь являются стабильными API-интерфейсами. StateFlow и SharedFlow предназначены для использования в тех случаях, когда требуется управление состоянием в контексте асинхронного выполнения с применением Kotlin Coroutines.

API-интерфейс Flow в Kotlin предназначен для асинхронной обработки потока данных, который выполняется последовательно. По сути, Flow — это последовательность. В Kotlin с помощью Flow можно выполнять такие же операции, как с помощью Sequences: преобразовывать, фильтровать, сопоставлять и т. д. Основное различие между Sequences и Flow в Kotlin заключается в том, что Flow позволяет приостанавливать выполнение.

Во Flow приостановку можно выполнить в любом месте: в функции сборки или в любом из операторов, предоставляемых API-интерфейсом Flow. Приостановка во Flow работает как контроль backpressure, при этом вам не нужно ничего делать — всю работу выполняет компилятор.

val flow: Flow<Int> = flow {
	delay(100)
	for(i in 1..10) {
		emit(i)
	}
}.map {
	delay(100)
	it * it
}

Интерфейс Flow так же прост в использовании, как и Sequences. Однако Flow несет в себе все преимущества реактивного программирования, в котором не требуется управлять backpressure.

Flow является удобным API-интерфейсом, однако он не обеспечивает возможность управлять состоянием, которая требуется в некоторых случаях. Например, у процесса может быть несколько промежуточных и одно конечное состояние. Примером такого процесса является загрузка файла: процесс загрузки длится некоторое время и мы можем определить такие промежуточные состояния процесса, как «Запущен» и «Выполняется», и конечные состояния «Успешно» и «Сбой». В этом случае нам интересны только результаты: успешно была выполнена загрузка или нет.

При реализации описанного выше сценария с помощью API-интерфейса Flow нам нужно публиковать изменения состояния для наблюдателей, которые могут совершать те или иные действия исходя из этих изменений. Ранее мы всегда рекомендовали использовать для этого ConflatedBroadcastChannel. Однако ConflatedBroadcastChannel является слишком сложным для этой задачи. Кроме того, имеются некоторые логические нестыковки, которые возникают при использовании каналов для управления состоянием. Например, канал может быть закрыт или отменен. Это не очень хорошо сочетается с управлением состоянием, поскольку состояние-то нельзя отменить!

Мы решили отказаться от ConflatedBroadcastChannel и вместо этого внедрить пару новых API-интерфейсов — StateFlow и SharedFlow!

StateFlow

StateFlow имеет две разновидности: StateFlow и MutableStateFlow:

public interface StateFlow<out T> : SharedFlow<T> {
   public val value: T
}

public interface MutableStateFlow<out T>: StateFlow<T>, MutableSharedFlow<T> {
   public override var value: T
   public fun compareAndSet(expect: T, update: T): Boolean
}

Состояние представлено значением. Любое изменение значения будет отражено во всех коллекторах потока путем выдачи значения с изменениями состояния.

Давайте посмотрим, как можно реализовать описанный ранее пример с загрузкой файла с помощью нового API-интерфейса.

class DownloadingModel {

   private val _state.value = MutableStateFlow<DownloadStatus>(DownloadStatus.NOT_REQUESTED)
   val state: StateFlow<DownloadStatus> get() = _state

   suspend fun download() {
       _state.value = DownloadStatus.INITIALIZED
       initializeConnection()
       processAvailableContent {
               partialData: ByteArray,
               downloadedBytes: Long,
               totalBytes: Long
           ->
           storePartialData(partialData)
           _state = DownloadProgress(downloadedBytes.toDouble() / totalBytes)
       }
       _state.value = DownloadStatus.SUCCESS
   }
}

В этом примере клиентам предоставляется неизменяемая версия state, а управление изменяемым состоянием (state) выполняется внутри. В функции загрузки мы сначала обновляем внутреннее значение состояния: state.value = DownloadStatus.INITIALIZED. Затем мы можем обновлять внутреннее состояние, задавая промежуточные числа, которые указывают ход выполнения загрузки. Наконец, state получит конечное значение, указывающее состояние загрузки.

Как видите, никаких API для работы с каналами здесь не используется. Мы не запускаем никаких дополнительных корутин, и нет нужды изучать какие-либо новые концепции. Только простой императивный код, в котором для описания реализации используется переменная, а клиентам предоставляется state как Flow.

SharedFlow

Что, если вместо управления состоянием нам потребуется управлять рядом обновлений состояния, то есть потоком событий? Для таких случаев у нас есть новый API-интерфейс под названием SharedFlow. Этот API-интерфейс подходит для обработки ряда выдаваемых значений, например для вычисления скользящего среднего из потока данных.

public interface SharedFlow<out T> : Flow<T> {
   public val replayCache: List<T>
}

Общий поток — это просто поток, где есть кэш повтора, который можно использовать в качестве атомарного моментального снимка. Каждый новый подписчик сначала получает значения из кэша повтора, а затем получает новые выданные значения.

Вместе с SharedFlow мы также предоставляем MutableSharedFlow.

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
   suspend fun emit(value: T)
   fun tryEmit(value: T): Boolean
   val subscriptionCount: StateFlow<Int>
   fun resetReplayCache()
}

С помощью MutableSharedFlow можно выдавать значения из приостанавливающего и неприостанавливающего контекста. Как можно заключить из имени, кэш повтора MutableSharedFlow можно сбрасывать. Кроме того, он предоставляет количество своих коллекторов как поток.

Реализовать собственный MutableSharedFlow может быть довольно сложно. Поэтому мы предоставляем несколько удобных методов для работы с SharedFlow.

public fun <T> MutableSharedFlow(
   replay: Int,
   extraBufferCapacity: Int = 0,
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

Чтобы инициализировать экземпляр MutableSharedFlow с помощью приведенной выше функции, можно указать количество значений, которые повторяются для новых подписчиков, емкость буфера, а также действия, которые следует выполнять в случае заполнения буфера. Например, если буфер заполнится, можно приостановить поток.

Если у вас уже есть экземпляр потока и вы хотите обеспечить возможность предоставления к нему общего доступа, можно воспользоваться новым оператором Flow.shareIn.

public fun <T> Flow<T>.shareIn(
   scope: CoroutineScope,
   replay: Int,
   started: SharingStarted = SharingStarted.Eagerly
)

Резюме

Новые API-интерфейсы StateFlow и SharedFlow обеспечивают более элегантный способ работы с состоянием в программах на Kotlin с корутинами. Они намного проще и понятнее, чем использование широковещательных каналов для публикации изменений состояния из контекста потока.

Попробуйте новые API, испытайте их на прочность и отправьте нам свой отзыв!

Подробные сведения о нововведениях в Kotlin Coroutines можно узнать, посмотрев выступление Всеволода Толстопятова на конференции Kotlin 1.4 Online Event.


Узнать подробнее о курсе "Kotlin Backend Developer".

Записаться на открытый урок "Kotlin multiplatform: front/back на одном языке".

Теги:
Хабы:
Всего голосов 7: ↑6 и ↓1+5
Комментарии4

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS