Kotlin 数据流

Flow

在协程中通过 async 或 withContext 挂起函数可以返回单个数据值,数据流(Flow)以协程为基础构建,可以按顺序发出多个值。

基本使用

以从数据库中获取数据为例,假如要取出 5 条数据,使用 Flow 则不需要等待 5 条数据全部取出之后再更新,而是可以实时地接收数据更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...

lifecycleScope.launch{
// 因为 Flow 是冷流,冷流是在数据被订阅后,发布者才可以执行发射数据流的代码,
// 并且若有多个订阅者,那么每一个订阅者与发布者都是一对一的关系,也就是说,每个订阅者都会收到发布者完整的数据。
// 所以需要使用 collect 方法对 Flow 进行订阅
loadData().collect{
Log.d(TAG,it.toString())
}
}

// 日志打印:
// D/TAG_TestActivity: ----loadData----
// D/TAG_TestActivity: 1
// D/TAG_TestActivity: 2
// D/TAG_TestActivity: 3
// D/TAG_TestActivity: 4
// D/TAG_TestActivity: 5
}

private fun loadData() = flow {
Log.d(TAG,"----loadData----")
// 每隔 1 秒发出一条数据
for (i in 1..5){
// flow 方法内部是一个挂起函数,所以可以调用 delay 函数。
delay(1000)
emit(i)
}
}
}

filter 操作符

filter 操作符提供了对结果添加限制条件的功能。比如要限制仅发送偶数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...

lifecycleScope.launch{
loadData().collect{
Log.d(TAG,it.toString())
}
}

// 日志打印:
// D/TAG_TestActivity: ----loadData----
// D/TAG_TestActivity: 2
// D/TAG_TestActivity: 4
}

private fun loadData() = flow {
Log.d(TAG,"----loadData----")
for (i in 1..5){
delay(1000)
emit(i)
}
}.filter {
it % 2 == 0
}
}

map 操作符

map 操作符则提供了将结果集映射为其他类型的方式。比如将结果集映射为原集合数值的 5 倍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...

lifecycleScope.launch{
loadData().collect{
Log.d(TAG,it.toString())
}
}

// 日志打印:
// D/TAG_TestActivity: ----loadData----
// D/TAG_TestActivity: 5
// D/TAG_TestActivity: 10
// D/TAG_TestActivity: 15
// D/TAG_TestActivity: 20
// D/TAG_TestActivity: 25
}

private fun loadData() = flow {
Log.d(TAG,"----loadData----")
for (i in 1..5){
delay(1000)
emit(i)
}
}.map {
it * 5
}
}

flowOn 操作符

flowOn 操作符在实际开发中算是比较常用的操作符之一。假设要将 Flow 中的代码块执行在 I/O 线程中,以前是加个协程切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TestActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...

// 程序崩溃并抛出异常:
// java.lang.IllegalStateException: Flow invariant is violated:
// Flow was collected in [StandaloneCoroutine{Active}@869b1be, Dispatchers.Main.immediate],
// but emission happened in [DispatchedCoroutine{Active}@1cc21f, Dispatchers.IO].
// Please refer to 'flow' documentation or use 'flowOn' instead
}

private fun loadData() = flow {
withContext(Dispatchers.IO){
Log.d(TAG,"----loadData----")
for (i in 1..5){
delay(1000)
emit(i)
}
}
}
}

使用 catch 方法捕获异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class TestActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...

// 日志打印:(程序未崩溃)
// D/TAG_TestActivity: ----loadData----
// D/TAG_TestActivity: Flow invariant is violated:
// Flow was collected in [StandaloneCoroutine{Active}@869b1be, Dispatchers.Main.immediate],
// but emission happened in [DispatchedCoroutine{Active}@1cc21f, Dispatchers.IO].
// Please refer to 'flow' documentation or use 'flowOn' instead
}

private fun loadData() = flow {
withContext(Dispatchers.IO){
Log.d(TAG,"----loadData----")
for (i in 1..5){
delay(1000)
emit(i)
}
}
}.catch { catch ->
Log.d(TAG,catch.message.toString())
}
}

程序崩溃的原因在于使用 Flow 构建器时,提供方不能提供来自不同 CoroutineContext 的 emit 值,所以不能在 Flow 中创建协程作用域并在协程作用域中发送结果,如果需要切换线程操作,则要使用 flowOn 来代替。

1
2
3
4
5
6
7
8
9
10
11
class TestActivity : AppCompatActivity() {
...

private fun loadData() = flow {
Log.d(TAG, "----loadData----")
for (i in 1..5) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.IO)
}

除此之外,Flow 还提供了 buffer、zip、flatMapConcat 等操作。

StateFlow

既然 Flow 是冷流,而 StateFlow 就是一种热流,即无论是否有订阅者,都会执行发射数据流的操作,并且发布者与订阅者是一对多的关系。

StateFlow 的使用场景与 LiveData 是非常接近的。以检测 ViewModel 中数值变化为例:

1
2
3
4
5
6
7
8
class TestViewModel : ViewModel() {
private val _uiState = MutableStateFlow("")
val uiState:StateFlow<String> = _uiState
// 处理字符串
fun buildUp(world:String){
_uiState.value = "Hello $world"
}
}

ViewModel 中在输入的文字前面添加 Hello 字符串,并将监听结果展示在 UI 中。与 LiveData 组件不同的是,这里必须为 MutableStateFlow 指定默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
android:orientation="vertical"
xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent">
<EditText
android:id="@+id/et"
android:layout_width="match_parent"
android:layout_height="wrap_content" />
<TextView
android:id="@+id/tv"
android:layout_width="match_parent"
android:layout_height="wrap_content" />
<Button
android:id="@+id/btn"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="确定" />
</LinearLayout>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class TestActivity : AppCompatActivity() {
private val TAG = "TAG_TestActivity"

private lateinit var binding: ActivityTestBinding
private lateinit var viewModel: TestViewModel

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = ActivityTestBinding.inflate(layoutInflater)
setContentView(binding.root)

initView()
initData()

// 日志打印:
// D/TAG_TestActivity: 打印结果:
// D/TAG_TestActivity: 打印结果:Hello Jetpack
}

private fun initData() {
viewModel = ViewModelProvider(this).get(TestViewModel::class.java)

lifecycleScope.launch {
viewModel.uiState.collect{
binding.tv.append(it)
Log.d(TAG,"打印结果:$it")
}
}
}

private fun initView() {
binding.btn.setOnClickListener {
viewModel.buildUp(binding.et.text.toString())
}
}
}

从日志打印结果来看,即使多次点击按钮,也就是程序为 StateFlow 多次赋值(binding.tv.append(it)),如果值没变,StateFlow 是不会回调 collect 函数的,这一点与 LiveData 组件不同,并且 StateFlow 总会先收到默认值。

SharedFlow

SharedFlow 是 StateFlow 的一种可配置性极高的泛化数据流。

1
2
3
4
5
6
class TestViewModel : ViewModel() {
private val _uiState = MutableSharedFlow<String>()
val uiState: SharedFlow<String> = _uiState


}

MutableSharedFlow 的构造方法如下:

SharedFlow.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* MutableSharedFlow 构造方法的三个参数为:
* [replay] 表示新订阅者重播值的个数默认为 0,即新订阅者默认不会收到之前的值。
* [extraBufferCapacity] 表示减去 replay 的数量之后,MutableSharedFlow 缓存数据的个数默认值也为 0。
* [onBufferOverflow] 表示 Flow 的缓存策略默认为挂起。
*/
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}

StateFlow 本质上是一个 replay 为 1,extraBufferCapacity 为 0 的 SharedFlow,这也是使用 StateFlow 时会先收到设置的默认值的原因。此外:

  • Flow 还提供了 stateIn 方法,用于将任何数据流转化为 StateFlow。
  • Flow 还提供了 shareIn 方法,用于将任何数据流转化为 SharedFlow。

LiveData、StateFlow 与 SharedFlow 都有各自的优势,在业务开发中选择合适的方式便可以提升开发效率。


备注

参考资料

《Android Jetpack开发 原理解析与应用实战》

欢迎关注微信公众号:非也缘也