Kotlin
February 8, 2021

Kotlin Flow. Shared Flow. State Flow

Что такое Flow?

Flow — это новая часть корутин, представленная компанией JetBrains и призванная сделать корутины более реактивным фреймворком, заменив собой Rx и LiveData. Забавно, что когда только представили корутины, одним из преимуществ, помимо легковесности, была практически синхронная работа. То есть вам не требуются колбэки и вы можете вычислять все данные прямо там, где они нужны.

Но!

После долгой работы внезапно выяснилось, что реактивность все-таки иногда нужна, а подходящего решения не было. Итак, что же такое флоу? Это обычная реализация паттерна наблюдатель. У вас есть источник данных и подписчик, и флоу как раз реализует источник данных. Вот сейчас вы можете видеть пример нескольких эмитированных интов.

https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow

Код, который может быть приостановлен располагается внутри блока flow и он может быть suspendable. Что еще раз нас приближает к так называемому подходу DSL, который очень моден в наше время и используется практически повсеместно, например, в очень популярной библиотеке, я бы даже сказал фреймворке - Jetpack Compose.

Видео по Jetpack Compose

Здесь тоже используется этот подход и на самом деле он мне очень нравится, я бы даже сказал, он мне кажется намного более удобным, чем старый подход с простыми функциями. Он ярко выраженный, то есть, если я смотрю код, то могу читать его практически как обычную книгу.

Flow имеет холодный вызов или Flows are cold, если дословно. Что это означает? Ну сами термины холодный и горячий это одно из следствий массовых заимствований в русский язык из английского. Наверняка вы слышали выражения холодный звонок или холодный клиент. В общем-то можно заменять слово холодный на неподготовленный. В случае с Flow холодный означает, что подписка не будет вызвана до тех пор пока не будет назначен подписчик.

Давайте подробнее, возьмем наш пример.

https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow

Как мы видим, внутри функции simple мы выпускаем каждое значение i, пользуясь функцией emit. Учитывая, что я уже сказал, вы наверное уже догадались, что принт Flow started будет напечатан после Calling collect, как раз потому что запуск флоу является холодным, и на самом деле это очень круто, потому что вы таким образом можете регулировать жизненный цикл вашего флоу. Причем как вы видите на экране функция simple уже не является suspend, и это как раз потому что вы возвращаете себе как бы слепок будущих данных, а реальные данные вы начинаете получать уже в момент вызова collect.

Существует несколько различных способов вызова флоу, но в целом я думаю вы итак их легко найдете в интернете, это просто разные способы как нам создать поток данных и это собственно тоже самое как и в Rx. Нам иногда нужны различные подходы к одному и тому же делу.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow

Так же, как и с любым Observable, вы можете использовать и различные операторы трансформации, конкатенации и так далее. Если вы вдруг не понимаете, что за операторы я имею ввиду, то вот ссылка на видео по RxJava

Также, вместо onNext вы можете использовать onEach и это как бы несколько вас подвигает в сторону использования подхода работы не внутри самого subscribe или collect, а именно в аналоге doOnNext. И тоже самое при работе с ошибками.

https://developer.android.com/kotlin/flow

Backpressure

Один из самых частых вопросов чем же отличаются все-таки Observable и Flowable (не путать с Flow) - это проблема backpressure. Если вдруг вы не знали, то вот ссылка на видео по этой теме:

Вкратце, это проблема, когда у вас слишком много данных для обработки. Я люблю приводить в пример приложение, которое служит синтезатором для миди-пианино. Представьте, что вы посадили виртуоза за такое пианино, а вам нужно отрисовывать на экране нотки, которые этот виртуоз играет. Если виртуоз будет играть слишком быстро, а отрисовка слишком сложная, то рано или поздно ваш телефон перестанет справляться с этой задачей и не сможет принимать новые данные, которые генерирует наш гений.

В случае с Rx это решается через Backpressure Strategy, а что с Flow? Если взять прям из коробки, то здесь все решается очень просто, ведь мы имеем дело с suspend функцией. Получается, если мы не можем обработать результат пришедший к нам из источника данных и мы имеем холодный запуск, то есть всем управляет именно коллектор, то мы можем просто держать функцию в suspend состоянии ровно до тех пор, пока наш коллектор не сможет обработать следующее значение. И это на самом деле довольно круто.

Но, скажет мне требовательный читатель, а как же другие стратегии? Что если мне нужно, например, сбрасывать значения, которые не смогли обработаться, или накапливать их в буфер и так далее. И вообще, что если моя функция генерирует данные с задержкой, зачем все это делать последовательно? И читатель будет абсолютно прав, ведь делать все это последовательно совершенно не нужно.

Давайте возьмем в пример функцию, которая будет так же генерировать нам инты, но при этом обработчик, то есть коллектор, тоже будет обрабатывать наш поток данных с некоторой задержкой, ну то есть наша ситуация с пианистом и отрисовкой на экране. Между нажатиями на клавиши 100 миллисекунд, и между отрисовками 300 миллисекунд.

Если мы это распечатаем, то увидим, что примерно все это обработается за 1200 миллисекунд, то в общем-то логично 100 + 300 умножить на 3 будет 1200.

Здесь нам поможет отличный оператор под названием buffer. Буфер это оператор для Flow, который позволяет нам запускать коллектор на другой корутине.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html

И это позволяет нам как выполнять параллельно работу коллектора и эмиттера, так и выбирать разные стратегии переполнения буффера. Как вы наверное заметили у функции есть два параметра. Первый - это капасити, который является флагом.

Здесь остановимся подробнее. Раз у нас эмиттер запущен в отдельной корутине и коллектор запущен в отдельной корутине, то нам нужно как-то передавать между ними данные. Для этого используется механизм Channel. Более подробно расскажу про него как-нибудь отдельно.

А второй параметр у нас фактически определяет стратегию, которой мы будем пользоваться при переполнении нашего буфера и здесь все очень похоже на RxJava, поэтому особо разбирать не будем. Но очень интересно взглянуть на саму реализацию буфера внутри.

Buffeer изнутри (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html)

Здесь мы видим более подробно, что capacity работает именно как флаг и что есть интересный флаг под названием Conflated, и на самом деле как мы видим из исходников это просто некая такая иконка на стратегию брать самый последний элемент из потока, то есть, в целом вы можете это реализовать через capacity = Buffered и стратегию Drop_Oldest. Также капасити принимает и любое не нулевое значение, что позволяет четко определить емкость данных для передачи в канале.

То есть, возвращаясь к нашему примеру, мы получим уже не 1200 миллисекунд, а около 1000 (на самом деле в тестах все время разное, но в среднем около 1050 мс). Это получается из-за того, что у нас есть некие сдвиги и иногда все же получаются паузы, когда простаивают и эмиттер и коллектор, но все равно 15% прирост времени просто за счет одного оператора это очень неплохо. Однако, как и в любой асинхронности, как только мы начинаем работать с неким общим ресурсом, а в случае андроида с любым долгоживущим запросом (сеть, чтение из базы данных или из файла) у нас сразу же встает проблема как это все делать на разных потоках.

Dispatchers

Как мы уже знаем из корутин у нас есть различные диспетчеры на которых мы можем выполнять те или иные действия. И так как в видео про корутины, где у нас был такой быстрый беглый взгляд мы особо никак не разбирали как это работает, я думаю будет правильно сделать это здесь. Видео про корутины:

Итак, что вообще такое диспетчеры? Наверняка вы делали в своем коде что-то вроде withContext(Dispatchers.Default), но возможно даже не задумывались над тем, что в этот момент происходит. В отличие от стандартной многопоточности, где мы создаем новый поток каждый раз, когда нам нужно сделать новую задачу, в корутинах мы можем иметь несколько корутин на одном потоке (и причем довольно много). Достигается этот эффект из-за тех самых точек приостановки, которые мы разбирали в видеостатье о корутинах:

Кто знаком с Pх, вспоминаем все эти бесчисленные subscribeOn и observeOn. И там все очень просто: вы либо стартуете в новом потоке, либо выполняетесь на том потоке, где был создан Рх объект, но как же быть с тем, когда у нас может выполняться несколько корутин? Здесь нам поможет интерфейс под названием CoroutineContext, который, включен в стандартную библиотеку котлина. Ссылка на этот класс: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/

Это набор различных элементов один из которых это Job, но об этом возможно расскажу как-нибудь отдельно. Самое главное, что нас интересует — это CoroutineDispatcher, который как раз и устанавливает связь между потоком или потоками и корутиной, которая должна выполниться. С помощью этого параметра мы можем запустить корутину на главном потоке, на новом созданном потоке, на каком-то специфичном потоке, который мы хотим указать и так далее.

Создавая корутину, не важно через launch или асинк, вы всегда можете указать параметром на каком диспетчере это будет сделано. А можете и не указывать, потому что этот параметр опциональный.

По коду ниже сразу видно что происходит когда мы указываем или не указываем диспетчер. Особый интерес здесь вызывает именно когда мы не указываем его. Потому что когда указываем тут примерно все понятно, а вот что будет если не указать в явном виде? В таком случае наша корутина унаследует контекст того места откуда была вызвана и если ее вызывать из главного потока, или точнее контекста, который привязан к главному потоку, то логично, что выполнена она будет на главном потоке.

Поэтому будьте аккуратны, вот буквально недавно сталкивался с запросом от одного из людей, что у человека были постоянные ошибки, как раз связанные с тем, что работа UI операций постоянно срабатывала на диспетчере, который был привязан к воркер треду. Причем как и многое другое, связанное с потоками это может давать весьма нетривиальные ошибки. То есть, когда у вас приложение крашится это на самом деле далеко не самый плохой вариант. Вы сразу понимаете ага ну упало, берете, фиксите и дальше живете спокойной жизнью, а вот когда например у вас приложение не падает, но перерасходует батарею на 20% больше, чем могло бы вот это страшно, потому что по моему опыту такое может жить годами, прежде чем это найдут. Вообще тем performance очень недооцененная, особенно для андроид приложений. Если эта тема интересна напишите в комментариях и я сделаю видео.

Итак я думаю более-менее с диспетчерами мы разобрались, но давайте теперь посмотрим как это работает в Flow. На самом деле довольно просто, но как всегда есть нюанс. Может показаться, что раз мы используем внутри нашего flow билдера suspend функции, то и контекст нужно переключать внутри них. Однако, это не так. При таком вот подходе

https://kotlinlang.org/docs/reference/coroutines/flow.html#flows-are-cold

вы получите Exception, который в явном виде вам скажет, что у вас коллектор и эмиттер работают на разных диспетчерах и это мол недопустимо, поэтому такой способ переключения не работает. Правильный способ — использовать оператор flowOn и уже в нем указывать диспетчер, на котором произойдет эмиссия новых значений.

https://kotlinlang.org/docs/reference/coroutines/flow.html#flows-are-cold

Момент, который на мой взгляд заслуживает отдельного внимания это то, что оператор flowOn является обычным цепочечным оператором билдера. К слову о том, что сейчас билдеры уже не используют и все пишут на конструкторах с параметрами. Если не понятно о чём речь, то вот ссылка на видео про Строитель:

Раз оператор является цепочечным, значит логично, что мы можем вызвать его сколько угодно раз для различных других цепочечных операций таких как фильтр, маппинг и так далее. Если вы хотите, чтобы операции по фильтрации у вас проходили например на воркере, а операции по принту всего этого дела на главном потоке, то важно помнить, что оператор flowOn действует только на те операторы, которые предшествуют оператору flowOn.

Как вы видите выше, мы выполняем флоу и мап на потоке ввода вывода, потому что они идут перед диспетчер ИО. Ну и видно как выполняются следующие операции.

Это очень важная штука для понимания, потому как я много раз на практике сталкивался с бездумно написанными сабскрайбами, обсервами и т.д., авторы которых не понимают как именно это работает под капотом. Я, конечно, понимаю, что в мобильной разработке у вас вряд ли может возникнуть подобная ситуация, однако, может когда-нибудь и вам это пригодится.

Здесь может возникнуть интересный вопрос — что будет если написать несколько раз подряд flowOn с разными диспатчерами. Отвечаю: операция выполнится в итоге на том диспетчере, который был первым в этом списке.

Combining

Так как Flow позиционируется как полноценная замена Rx (как и Лайвдата кстати тоже), то естественно у флоу есть аналоги операторов комбинирования нескольких флоу, так как без этого трудно представить ни одно серьезное приложение - разные источники данных, микросервисные архитектуры, сервисы всякие левые, все это порождает необходимость женить потоки данных между собой.

Здесь у тех, кто знаком с Рх все будет предельно узнаваемо и знакомо. Оператор зип перекочевал в своем первозданном виде и сюда.

Как видите, все до боли знакомо. Хотя не совсем. Всё-таки сам вызов отличается и это на самом деле больно, потому что если вам надо соединить три, четыре или более флоу, возникнет проблема. Однако, это как правило не нужно, так как в мобильных приложениях мы больше оперируем синглами и это можно сделать через обычные корутины. Тем не менее, я нашел вполне себе нормальное решение как сделать подобную функцию самому. Надеюсь, JetBrains добавят такую реализацию к себе, ведь она может быть полезной.

Не буду задерживаться и на других реализациях операторов комбинации, их вы сможете найти по ссылке, там можно посмотреть различные combine, flatten и так далее операторы: https://kotlinlang.org/docs/reference/coroutines/flow.html#flows-are-cold

А теперь давайте поговорим про совершенно новые штуки, которые нам относительно недавно представили JetBrains, а так же попробуем все это на практике.

SharedFlow. StateFlow

Чтобы разобраться с этими понятиями нам нужно будет вернуться в историю. Если хотите разобраться более подробно, я рекомендую прочитать статью на хабре: https://habr.com/ru/post/529944/

Сказанное мной далее будет частично пересекаться с этой статьей, потому как это перевод статьи автора всего этого добра Романа Елизарова.

Если не вдаваться в детали, то для общения между корутинами ранее использовалась штука под названием Channel (уже второй раз встречается) и она была достаточно дорогостоящим решением с точки зрения производительности. В целом можете себе представить ситуацию, когда у вас есть здоровенный такой цех и общение между отдельными операторами станков происходит через письма отправляемые по трубкам. А вам иногда очень важно, чтоб ваши операторы работали, где-то синхронно, где-то асинхронно. Думаю можете представить себе масштаб бедствия.

Как мы помним из ранее сказанного, наши флоу холодные, то есть пока не появится подписчик данные не начнут эмиттиться. Но что делать с данными, которые эмиттятся вне зависимости от того подписан на них кто-то или нет. Ну, например, датчик геолокации. Ваш телефон перемещается в пространстве постоянно и соответственно датчик генерит новые данные.

Тут нам помогает сущность под названием Shared Flow. Это достаточно интересная штука в плане реализации, как мне показалось. Дело в том, что Shared Flow хранит в себе некий кэш, из которого достаются данные когда на них будет подписан коллектор.

https://habr.com/ru/post/529944/

И только после того как будет воспроизведен кэш вы начнете получать новые данные. То есть как бы получается, пока нет коллекторов SharedFlow просто как бы подписан сам на себя и просто пишет себе данные в кэш. Очень изящно и круто сделано, как по-мне. Это может быть нам полезно в случае так называемых горячих данных (датчики, события операционной системы и так далее).

Но также вы можете реализовать и стандартную шину событий через это, то есть что-то такое, что будет источником данных во всем приложении. Ну, например, если вам надо реализовать стандартную штуку, связанную с отображением некоторых служебных сообщений для пользователя когда он вернулся в аппку. Представьте, вы находитесь в приложении по подбору неких авто на продажу. Пока пользователь свернул приложение у вас сгенерировалось несколько новых вариантов, а когда он вернется в аппку вы сможете отобразить ему сообщения.

https://habr.com/ru/post/529944/

Причем, используя параметры, вы можете играться с тем что именно увидит пользователь: все ли события отсеивать, дропать первые или последние и так далее. Если shared flow переполнится, то он приостановит эмиттеры до тех пор, пока их не обработает, но при этом он не дожидается обработки их коллекторами. То есть у вас получается очень быстрая шина данных внутри вашего приложения.

Также вместе с SharedFlow ребята из JetBrains сделали MutableSharedFlow, который позволяет нам, например, получать данные одновременно из suspend источников и из non-suspend источников. Ещё он позволяет нам сбросить кэш при необходимости.

Вообще мне это всё напомнило добрые старые стратегии мокси. Фактически, теперь вы можете играясь с replayCache у SharedFlow реализовывать те самые стратегии, только применяя их например к viewState. Более подробно мы рассмотрим это в примере. Но подумав о стратегиях, я не могу не вспомнить одну из самых популярных стратегий SingleStateStrategy. Кстати, если вдруг кто не видел видео про мокси:

Так вот, чтобы реализовать эту стратегию, а она заключается в демонстрации последнего полученного значения из источника данных, был сделан специальный интерфейс под названием StateFlow. Всю механику работы StateFlow можно увидеть на следующем изображении.

https://habr.com/ru/company/redmadrobot/blog/325816/

Здесь в примере можно увидеть что будет, если реализовать передачу данных через StateFlow. То есть к нам приходило три события 1, 2, и 3, событие 1 было последним, соответственно, при подписке на StateFlow мы получим событие 1, потому что он было последним. Вот такая вот простая механика.

Новые подходы SharedFlow и StateFlow обеспечивают более удобную механику взаимодействия с потоками, а также серьезно экономят ресурсы процессора, поэтому старые механики через ConflateBroadcastChannel и BroadcastChannel были объявлены устаревшими. Однако, сама механика Channel оставлена, так как многое в работе флоу и shared flow по прежнему полагается на этот механизм.

Я очень люблю инструменты, которые позволяют мне работать сразу с кучей функционала и очень не люблю, когда мы берем одно из котлина, другое из jetpack (намекаю тут на LiveData), третье из груви, четвертое из хмл, пятое еще откуда-нибудь и так далее. Котлин этим меня и привлекает. Он позволяет работать с какими-то механиками, которые друг друга усиливают. По этой же причине нативная разработка никогда не будет полностью вытеснена любыми не нативными решениями. Любое нативное решение будет быстрее и качественнее и лучше совместимо. Поэтому я для себя решил, что короткое время, когда я использовал livedata закончилось и я лучше буду использовать полностью инструменты языка. По этой же причине я топлю за Jetpack Compose, по этой же причине я топлю на kotlin kts и так далее. Просто потому, что это удобно и проще пишется. В нашем мире бесконечно развивающихся технологий и меняющихся подходов пытаться уследить вообще за всем — самоубийство. Вы просто взорвёте себе мозг. Потому очень аккуратно нужно подходить к тому, что вы изучаете. Хорошо, что здесь на канале вы всегда найдете отобранные и отфильтрованные мной подходы ))

Всем огромное спасибо и увидимся в следующих выпусках!

Практическую часть вы можете увидеть здесь:

Я в Youtube - https://youtube.com/c/MobileDeveloper

Я в Telegram - https://t.me/mobiledevnews

Я в Instagram - https://instagram.com/nplau