【对比Java学Kotlin】协程-异步流
文章目录
- 异步流定义
- 响应式编程与异步流
- 操作符
- 构建型
- 上下文型
- 协作型
- 转换型
- 数量型
- 启动型
- 组合型
- 取消异步流
- 异常处理
- 监听开始和结束
- 参考文献
异步流定义
我们知道有 RxJava、RxSwift、RxJs,那有 RxKotlin 吗?答案是没有。因为不需要,Kotlin 内置了 「RxKotlin」,天生拥有响应式编程能力,这就是 Kotlin 异步流。
类比 RxJava = 响应式编程+线程,有 Kotlin 异步流 = RxKotlin + 协程。
RxJava 支持切换线程,而Kotlin异步流切换的是协程。
既然是“异步”流,那有“同步”流吗?有的,Kotlin 中的同步流就是 Sequence。
跟 RxJava、Project Reactor 一样,Kotlin Flow 也是响应式编程规范(Reactive Streams Specification)的一种实现。
通过 launch()、async() 这些创建协程的方法可以获取单个异步执行的实例。那如果我们想同时获取多个呢?或者说获取一组异步执行的实例呢?这个时候异步流就派上用场了。
响应式编程与异步流
什么是响应式编程?
抛开具体编程语言,我们看一个赋值语句,涉及 a、b、c 三个变量:
var a = 1
var b = a + 1
a = 2
print(a, b)
试问,a 的值发生改变之后,b 的值会跟着变吗?根据我们的经验,一般 b 是不会变的。其实我们可以设计一门编程语言,让 b 跟着 a 变,这时这个编程语言就是天然支持响应式编程的。
从这个例子我们可以看出,响应式编程的精髓是「响应」,即一方随时「响应」另一方,二者之间可能是依赖关系。
这种响应关系就像水流(flow 或 stream)的上游和下游一样,上游发生变化,下游跟着变化。
操作符
异步流的操作符可以按不同的标准进行分类,比如按出现位置可以分为开头型、中间型和结尾型,比如构建异步流的操作符一般出现在流的开头,而启动型的操作符出现在流的结尾,其他的如转换型的操作符出现在流的中间位置。
构建型
Kotlin 提供了 flow()
、flowOf()
、asFlow()
构造异步流。后面两个方法本质上还是借助 flow()
完成的。Koltin 异步流中最基本的方法是 flow()、collect() 和 emit()。其中 emit()
方法类似 RxJava 种的 onNext(),每个操作符处理完上游传递给自己的数据然后发射/emit出去,就完成了向下游的传递。
- flow(),接收一个 FlowCollector { emit() } 类型的 lambda 表达式,返回一个 Flow { collect() } 的 lambda 表达式;
- collect(),相当于下游水龙头,调用 collect() 时触发上游开始执行;
- emit(),上游产生数据传递给下游;
使用这几个方法,我们就可构造简单的异步流了,举个例子:
fun main() = runBlocking {
launch {
for (i in 1..4) {
println("i am not blocked $i")
delay(100)
}
}
simpleFlow().collect { value ->
println(value)
}
}
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..4) {
delay(1000)
emit(i)
}
}
上述代码输出结果为:
i am not blocked 1
i am not blocked 2
i am not blocked 3
i am not blocked 4
1
2
3
4
需要注意以下几点:
- simpleFlow() 方法没有 suspend 修饰,因为该方法本身并非一个耗时方法,而是直接返回的同步方法;
- 通过 emit() 方法发射数据到下游,作用跟普通方法中的 return 和 Sequence 中的 yield() 类似;
- 只有在调用 collect() 方法时数据流才会启动运行,而且每次调用都会从头执行一遍,否则其前面的其他操作符方法不会执行;
- collect() 方法是 suspend 的,必须在另一个 suspend 方法中被调用;
- 数据流可以看做是一组值,每个值的执行也就是流经操作符的顺序是串行的;
关于最后一点,我们看个例子:
fun main() = runBlocking {
(1..5).asFlow().filter {
println("filtering $it")
it % 2 == 0
}.map {
println("mapping $it")
"string $it"
}.collect {
println("collected $it")
}
}
结果输出为:
filtering 1
filtering 2
mapping 2
collected string 2
filtering 3
filtering 4
mapping 4
collected string 4
filtering 5
从结果可以看出,每个值都依次经过了 filter()、map() 和 collect()。奇数经过 filter() 时返回 false,就会被被拦下从而没有触发 emit() 方法往下一个操作符发射数据,filter() 和 map() 方法本质上是 inline 方法,最后都会被转换成 transform() 方法,当 filter 条件命中返回 true 时才会调用 emit() 方法,map() 则是无脑调用 emit()。
emit() 方法可以被调用多次:
fun main() = runBlocking {
(1..4).asFlow()
.transform { request ->
emit("making req $request")
emit(performReq(request))
}
.collect { response -> println(response) }
}
suspend fun performReq(req: Int): String {
delay(1000)
return "response $req"
}
上述代码输出:
making req 1
response 1
making req 2
response 2
making req 3
response 3
making req 4
response 4
上下文型
flowOn()
异步流的上下文是指,异步流执行在哪个协程。我们可以粗略的将异步流分为上游和下游两部分,那么上下文要关注的是上游和下游分别运行在哪个协程中。
最常见的场景是,上游是数据生产者,是耗时操作,比如网络请求等,而下游是消费者,比如在主线程中更新UI等。
下游代码,即 collect() 对应的代码,如果没有特殊指定,默认运行在调用该方法的协程中。
我们可以使用 flowOn() 来改变上游代码运行所在的协程,一般是在构造协程的时候。
fun /*context_*/main() = runBlocking {
simple().flowOn(Dispatchers.Default).collect { value -> log("Collected $value") }
}
fun simple() = flow {
for (i in 1..3) {
delay(100)
log("Emitting $i")
emit(i)
}
}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
协作型
Flow的上游好比是泵,下游好比是水龙头。上游抽水的速度和水龙头放水的速度可能不一致,如果想让二者协同合作以达到最高效率或实现某种效果,就需要使用协作型操作符了。
buffer
一般情况下,只有下游完成处理后才会告诉上游继续发射下一个数据。但是如果上游的发射速度高于下游,是否可以不让上游等待呢?这时我们可以使用 buffer(),让上游实现并发。
可以简单的认为,有了buffer之后上游就放飞自我了,发射时机不再受下游限制了,反正提前发射的值可以被缓存(buffer)。
比如上游发射一个数据需要100ms,下游处理一个数据需要200ms:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking {
val elapse = measureTime {
simpleFlow()/*.buffer()*/.collect {
println("consuming $it")
delay(200)
println("consuming done")
}
println("all done")
}
println("elapse: $elapse")
}
fun simpleFlow() = flow {
for (i in 1..3) {
println("producing $i")
delay(100)
emit(i)
println("producing done")
}
}
使用buffer前后的耗时情况如下图:
所以未使用buffer时耗时900ms左右,使用buffer之后耗时在700ms左右。
conflate
在上游发射速度快于下游处理速度时,conflate() 可以让我们丢弃中间值,而仅仅处理最新的值。被丢弃的前提是下游还没来得及处理,如果已经开始处理了,则不会被丢弃。当下游的处理耗时是300ms时中间值会被丢弃,而150ms时则会被正常处理:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking {
var delay = 150L // 300L
val elapse = measureTime {
simpleFlow()/*.buffer()*/.conflate().collect {
println("consuming $it")
delay(delay)
println("consuming $it done")
}
println("all done")
}
println("elapse: $elapse")
}
fun simpleFlow() = flow {
for (i in 1..3) {
println("producing $i")
delay(100)
emit(i)
println("producing $i done")
}
}
150 的输出:
producing 1
producing 1 done
producing 2
consuming 1
producing 2 done
producing 3
consuming 1 done
consuming 2
producing 3 done
consuming 2 done
consuming 3
consuming 3 done
all done
elapse: 616.934055ms
Process finished with exit code 0
300的输出:
producing 1
producing 1 done
producing 2
consuming 1
producing 2 done
producing 3
producing 3 done
consuming 1 done
consuming 3
consuming 3 done
all done
elapse: 767.395294ms
如果上游和下游速度都慢,我们可以使用 conflate 来提升处理速度,本质上是丢弃中间值。另一种方式是每当上游发射一个值时,下游都取消当前正在处理的任务,重新启动一个新任务处理最新值,这类操作符统称为 xxLatest。
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking {
var delay = 150L
val elapse = measureTime {
simpleFlow()/*.buffer().conflate()*/.collectLatest {
println("consuming $it")
delay(delay)
println("consuming $it done")
}
println("all done")
}
println("elapse: $elapse")
}
fun simpleFlow() = flow {
for (i in 1..3) {
println("producing $i")
delay(100)
emit(i)
println("producing $i done")
}
}
从输出可以看出,虽然下游处理任务启动了,但是还没有完成的时候就被取消了(没有 consuming 2 done):
producing 1
consuming 1
producing 1 done
producing 2
consuming 2
producing 2 done
producing 3
consuming 3
producing 3 done
consuming 3 done
all done
elapse: 509.858319ms
转换型
- transform
- map
- filter
- take
- reduce
我们可以对流做一些转换操作,把发射出来的值转换成其他类型或值。比如最基本的 map()、filter() 操作符,这些操作符支持 suspend 方法,里面可以是耗时操作:
fun /*transformation_*/main() = runBlocking {
(1..3).asFlow().map { request -> performRequest(request) }
.collect { response -> println(response) }
}
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
输出为:
(delay 1 sec)
response 1
(delay 1 sec)
response 2
(delay 1 sec)
response 3
map()、filter()、transform() 都是 Flow 的扩展方法,map()和filter()本质上是调用 transform() 实现的,transform() 自己又构造了一个 flow:
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
数量型
我们可以使用 take() 操作符只取前几个发射的值,后续的值将会被cancel掉,同时抛出异常:
fun /*sizeLimiting_*/main() = runBlocking {
numbers()
.take(2)
.collect { value -> println(value) }
}
fun numbers() = flow {
try {
emit(1)
emit(2)
println("this line will not be executed")
emit(3)
} catch (e: Throwable) {
println(e.message)
} finally {
println("finally in numbers")
}
}
输出:
1
2
Flow was aborted, no more elements needed
finally in numbers
启动型
collect() 也是一个操作符。除了 collect(),还有哪些操作符能启动一个异步流吗?有的,大概有如下几类:
- 转成集合,比如 toList、toSet;
- 获取第一个发射值以及保证只有一个发射值(无发射值或多于一个则会抛异常)的操作符,如 first()和single();
在这里插入代码片
- 将所有发射值缩减至一个值的操作符,如 reduce()、fold();
- 新启协程运行异步流,比如 launchIn();
这些操作符有如下特点:
- 只能出现在异步流的最后,不能与 collect() 同时出现;
- 都是 suspend 方法;
fun /*trigger_*/main() = runBlocking {
val res = (1..5).asFlow()
.map { it * it }
// .single()
// .first()
.reduce { a, b -> a + b }
// .fold(1) { acc: Int, value: Int -> acc + value }
println(res)
}
输出:55
launchIn() 操作符比较特殊,使用它可以另外启动一个协程用于运行当前异步流。一般 collect() 方法启动的异步流会直接使用当前协程运行,但是就导致 collect() 方法后面的代码需要等待,等待异步流执行完成后才能开始执行。如果我们希望 collect() 方法立即执行,那么可以使用 launchIn()。比如使用 collect() 方法启动异步流时:
fun /*launch_*/main() = runBlocking {
(1..3).asFlow().onEach { delay(100) }.onEach { value -> println(value) }/*.launchIn(this)*/.collect()
println("DONE")
}
输出为:
1
2
3
DONE
显然异步流后面的代码是等待异步流执行完成之后才执行的。如果我们使用 launchIn():
fun /*launch_*/main() = runBlocking {
(1..3).asFlow().onEach { delay(100) }.onEach { value -> println(value) }.launchIn(this)/*.collect()*/
println("DONE")
}
输出为:
DONE
1
2
3
launchIn() 会另外开启一个新的协程运行异步流的代码,与 printlin("DONE")
是同时运行。其效果相当于:
fun /*launch_*/main() = runBlocking {
launch { (1..3).asFlow().onEach { delay(100) }.onEach { value -> println(value) }.collect() }
println("DONE")
}
组合型
- zip
- combine
- flatten
zip() 可以把两个flow组合起来:
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking
fun /*compose_*/main() = runBlocking {
val keys = flowOf("one", "two", "three", "four").onEach { delay(300) }
val values = (1..3).asFlow().onEach { delay(400) }
keys.zip(values) { a, b -> "$a -> $b" }.collect { println(it) }
}
输出:
one -> 1
two -> 2
three -> 3
执行流程示意图:
combine()
combine 的作用是,每当其中一个流有最新值时,该最新值都会与其他流的最新值重新执行一遍处理流程。
fun /*compose_*/main() = runBlocking {
val keys = flowOf("one", "two", "three", "four").onEach { delay(1000) }
val values = (1..3).asFlow().onEach { delay(2000) }
keys/*.zip*/.combine(values) { a, b -> "$a -> $b" }.collect { value -> println("$value") }
}
每隔2000ms输出一行:
one -> 1
two -> 1
three -> 1
three -> 2
four -> 2
four -> 3
执行流程为:
flatten
在实际开发中,有一种场景是一个flow发射的每一个值会使用map()操作符产生一个新的flow,于是就有了一个流组成的流(a flow of flows):
fun /*flatten_*/main() = runBlocking {
(1..3).asFlow().map { requestFlow(it) }.collect { flow ->
flow.collect { println("inner flow: $it") }
println(flow)
}
}
fun requestFlow(i: Int) = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
比如我们有一个获取userid的流,然后每次发射的userid值都会再去请求该userid对应的数据,每个userid对应的网络请求又是一个流,最后我们需要把这些流汇合成一个流进行处理。在这种情况下,我们可以使用 flatten 操作符。
跟xxLatest类似,flatten是一个操作符「家族」,包括 flatMapConcat()、flatMapMerge()、flatMapLatest()。
fun /*flatten_*/main() = runBlocking {
(1..3).asFlow().flatMapConcat/*.flatMapMerge*/ { requestFlow(it) }.collect {
println(it)
}
}
fun requestFlow(i: Int) = flow {
emit("$i: First")
delay(2000)
emit("$i: Second")
}
flatMapConcat() 是以元素流创建的顺序串行的处理:
输出:
1: First
(delay 2 sec)
1: Second
2: First
(delay 2 sec)
2: Second
3: First
(delay 2 sec)
3: Second
flatMapMerge 不是串行,而是并行的处理各个元素流:
输出:
1: First
2: First
3: First
(delay 2 sec)
1: Second
2: Second
3: Second
flatMapLatest
跟xxLatest家族其他成员一样,flatMapLatest会在新发射值时取消正在执行的流任务:
fun /*flatten_*/main() = runBlocking {
(1..3)
.asFlow()
//.flatMapConcat
//.flatMapMerge
.onEach { delay(500) }
.flatMapLatest { requestFlow(it) }
.collect {
println(it)
}
}
fun requestFlow(i: Int) = flow {
emit("$i: First")
delay(2000)
emit("$i: Second")
}
执行过程示意图:
输出:
1: First
(delay 500ms)
2: First
(delay 500ms)
3: First
(delay 2 sec)
3: Second
取消异步流
我们通常在异步流处于 suspend 方法中时取消异步流任务,具体方法是通过 withTimeoutOrNull() 和 cancel() 方法:
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull
fun /*cancellation_*/main() = runBlocking {
withTimeoutOrNull(200) {
simpleFlow()/*.cancellable()*/.collect { value ->
// if (value === 2) {
// cancel()
// }
println(value)
}
}
println("Done")
}
输出:
producing 1
1
producing 1 done
producing 2
Done
另一种方法是 cancel()。cancel() 方法实际上是 CoroutineScope 的扩展方法,该方法取消的是整个 scope 内的 Job 以及该 Job 的子孙,具体可以参考该方法的注释:
/**
- Cancels this scope, including its job and all its children with an optional cancellation [cause].
- A cause can be used to specify an error message or to provide other details on
- a cancellation reason for debugging purposes.
- Throws [IllegalStateException] if the scope does not have a job in it.
*/
通过 flow() 构造的异步流,每次 emit() 值的时候都会调用 ensureActive() 方法 check 下是否已经调用了 cancel() 方法,如果是那么就取消当前异步流。但是处于性能的考虑,多数其他的构造方式并未内置 ensureActive() 方法,我们需要通过 .onEach { currentCoroutineContext().ensureActive() } 或者 .cancellable() 方法来实现同样的效果。比如 (1…3).asFlow() 就需要显示调用 .onEach { currentCoroutineContext().ensureActive() } 或者 .cancellable() 方法来达到取消异步流的目的,否则异步流在 cancel() 之后并不会结束。
flow() {} 构造的异步流可以直接响应 cancel() 取消掉异步流:
fun /*cancellation_*/main() = runBlocking {
// withTimeoutOrNull(200) {
simpleFlow().cancellable().collect { value ->
if (value === 2) {
cancel()
}
println(value)
}
// }
println("Done")
}
输出:
producing 1
1
producing 1 done
producing 2
2
producing 2 done
producing 3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@763d9750
IntRange.asFlow() 构造的异步流不能直接调用 cancel() ,否则不起作用,而是要通过 .onEach{ currentCoroutineContext.ensureActive() } 或 cancellable() 的方式调用才有效:
(1..3)
.asFlow()
// .onEach { currentCoroutineContext().ensureActive() }
// .cancellable()
.collect { value ->
if (value === 2)
cancel()
println(value)
}
输出:
1
2
3
Done
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@2aaf7cc2
当我们把上面代码的反注释掉 onEach{} 或 cancellable() 方法时,输出为:
1
2
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@3c5a99da
异常处理
我们可以使用 try-catch 来处理异步流的异常,不论该异常是由哪个操作符抛出来的:
fun /*exception_*/main() = runBlocking {
try {
(1..3).asFlow().onEach { println("Emitting $it") }.map { value ->
check(value <= 1) {
"Crashed on $value"
}
"string $value"
}.collect { value ->
println(value)
// check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
输出:
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
其实,官方推荐的处理异常的方式是通过操作符 .cacth(),catch() 也是一个扩展方法,入参是一个 suspend 类型的 lambda 表达式,我们可以在其中继续发射值,但是需要注意的是 catch() 方法只能捕获其之前操作符抛出来的异常,而无法捕获 collect() 里面的异常:
fun /*catch_*/main() = runBlocking {
(1..4)
.asFlow()
.map { value ->
check(value <= 1) {
"Crashed on $value"
}
"string $value"
}
.catch { e -> emit("Caught $e") }
.collect { value -> println(value) }
}
监听开始和结束
如果我们想监听异步流运行的开始和结束,可以使用 onStart 和 onCompletion 操作符。
除了 onCompletion,我们也可以使用 try-finally,但是这样无法区分异步流是正常结束还是异常结束:
fun /*completion_*/main() = runBlocking {
try {
(1..3).asFlow().onStart { println("Started") }.collect { value -> println(value) }
} finally {
println("DONE")
}
}
输出:
Started
1
2
3
DONE
try-finally 是命令式的写法,我们还可以使用声明式的写法,即 onCompletion 操作符:
fun /*completion_*/main() = runBlocking {
(1..3).asFlow()
.onStart { println("Started") }
.onCompletion { cause ->
println("DONE")
if (cause != null) println("Flow completed exceptionally")
}
.catch { cause -> println("Caught exception, cause: $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
输出:
Started
1
DONE
Flow completed exceptionally
Exception in thread "main" java.lang.IllegalStateException: Collected 2
注意,onCompletion 里面的 cause 是可以为 null 的,null 表示正常结束,否则表示异常结束。不论是正常结束还是异常结束,都会走到 onCompletion 里面。但是如果是异常结束,该操作符并不做拦截,而是继续透传给 catch 操作符,异常可能是 onCompletion 前面的操作符引起的,也可能是其后面的操作符(比如 collect)引起的。
onCompletion 的位置位于 collect 之前,可能会引起混乱,这时因为 collect 返回的 Unit,没法继续链式调用 onCompletion,其实 onCompletion 的位置并不影响其功能,毕竟其执行时机只跟流结束有关,而跟位置无关。
面对命令式和声明式的用法如何取舍,官方的建议是具体情况具体分析,根据自己的实际场景而使用。
参考文献
- kotlinlang.org-flow
- Coroutine
- Python协程与JavaScript协程的对比
- The Discoveries of Continuations
- 图解协程上下文 CoroutineContext 这回彻底搞懂了
- 抽丝剥茧聊协程之深入理解Continuation原理
- 图解 Kotlin Flow 构建数据流“管道”
- KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow by Roman Elizarov
- Combining Kotlin Flows with Select Expressions
- Combining flows: merge, zip, and combine