当前位置: 首页 > news >正文

Kotlin 流 Flow

挂起函数可以异步地返回一个值,而对于返回多个值,可以使用流,使用 emit(x) 发射多个值,
collect { } 来收集值。

默认 流是冷的,只有 收集(collect) 时才会执行。

1. 流的创建

  1. flow {} 生成流,emit(x) 来发射值;
  2. xxx.asFlow() 集合转成Flow;
  3. flowOf(1, 2, 3) 生成固定值的流。
1.1 flow {}

flow {} 里的 发射(emissions)默认是可取消的,对应 SafeFlow,继承自 AbstractFlow

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {public final override suspend fun collect(collector: FlowCollector<T>) {val safeCollector = SafeCollector(collector, coroutineContext)try {collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}}

对应 emit() 方法,就是 SafeCollector.emit() 内部 调用了 currentContext.ensureActive() 做 取消检查。

而 其他两种,默认是 不可取消,使用 cancellable() 操作符可取消。

2. 流的操作符

2.1 中间操作符
  1. map
  2. filter
  3. transform
2.2 结束操作符
  1. collect {}
  2. toList()toSet() 收集到集合
  3. first()single()
  4. reduce()fold() 合并值

3. 流的上下文

默认 流 执行在 和 收集者(Collector) 相同的 上下文。

更改流的发射的上下文,必须使用 flowOn,而不是在 flow {} 中使用 withContext()

flow {...
}.flowOn(Dispatchers.Default)

4. 缓冲

当 Flow emit 生产者速度 大于 collector 消费者速度时。

  1. buffer() 并发地执行 发射 和 收集,而不是 顺序执行(发射 收集 再 发射 收集);
  2. conflate() 丢弃中间值,取最新发射的值;
  3. collectLatest { } 收集最新的值,但 如果 发射新值,会 取消 慢的收集。
simple().collectLatest { value -> // cancel & restart on the latest valueprintln("Collecting $value") delay(300) // pretend we are processing it for 300 msprintln("Done $value") } 

说明:会执行所有 Collecting,但是 因为慢处理,会被取消。

结果:

Collecting 1
Collecting 2
Collecting 3
Done 3

https://kotlinlang.org/docs/flow.html#processing-the-latest-value

5. 流的组合

  1. zip 1对1的 组合
  2. combine 每次上游更新,就会重新计算。两个流 生产速度不一样时,就会 不同的对应组合。更新值即组合

举例:

zip 组合值: 1->one, 2->two 3->three,
而 combine 组合,则可能 1->one 2->one 这样只有一方流 发射值,就会调用计算。

6. flatten 展平

对于 Flow 又对应 Flow<T> 任务,这时候对于 Flow<Flow<T>> 需要展开Flow<T>。场景就是 一序列 对应 请求任务。

fun requestFlow(i: Int): Flow<String> = flow {emit("$i: First")delay(500) // wait 500 msemit("$i: Second")
}(1..3).asFlow().map { requestFlow(it) }
  1. flatMapConcat 按顺序,流完成后,才接着下一个流
  2. flatMapMerge 支持并发地处理,流 则 出现并发交错地收集值,concurrency 设置并发数
  3. flatMapLatest 处理最新的流,当新的流发射值时,取消之前的流

7. 异常

  1. try { flow.collect {} } catch (e: Exception) { } 处理异常,包含收集器里代码异常;
  2. flow {}.catch { }.collect { } 处理 上游异常,但不会处理 下游 异常;
  3. flow {}.onEach { }.collect() 处理上下游异常。
7.1. try/catch 全部捕获
try {simple().collect {println("value: $it")}
} catch (e: Exception) {// 捕获了 flow发射代码块、中间操作符 和 结束操作符 的所有异常
}
7.2. catch 操作符
simple().catch { e ->// catch 捕获上面的异常,但 不处理 下游 和 结束操作符 的 异常println("exception: $e") }.collect {// 如果 这里异常,则不会被捕捉println("value: $it")}
7.3. 声明式捕获

如果想 捕获 结束操作符的异常,需要 声明式地捕捉。把 collect 的代码部分 上移到 onEach 中,使用无参的 collect() 收集:

simple().onEach {check(it < 2)println("value: $it")}.catch { e -> println("exception: $e")}.collect()

8. 完成

  1. try/finnaly 在结束后处理
  2. flow {}.onCompletion { cause -> } 处理
flow {}.onCompletion { cause ->// 完成回调,cause 是空 表示 正常完成if (cause == null) {println("success")}
}

9. 取消

  1. onEach 时检测
  2. cancel() 在收集时,调用取消
  3. flow {}.cancellable() 设置flow可取消
// 不加 cancellable() 不会 做取消检查,导致完成收集后 才 报异常
// cancellable() 则会 及时取消
// flowOf(1, 2, 3)
listOf(1, 2, 3).asFlow()
//  .cancellable().collect {if (it > 1) {cancel()}println("value: $it")}

cancellable()的实现:

  • CancellableFlowImpl
public fun <T> Flow<T>.cancellable(): Flow<T> =when (this) {is CancellableFlow<*> -> this // Fast-path, already cancellableelse -> CancellableFlowImpl(this)}private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {override suspend fun collect(collector: FlowCollector<T>) {flow.collect {currentCoroutineContext().ensureActive()collector.emit(it)}}
}

文档

  • Flow
  • 异步流
  • Kotlin Flow:掌握基本,征服应用,避开开发陷阱!

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 安装win7鼠标键盘不能动原因分析及解决办法
  • Unity3D在2D游戏中获取触屏物体的方法
  • Spring Boot-自定义banner
  • ASP.NET Core 入门教学十七 GraphQL入门指南
  • 触想内嵌式工业一体机应用于智能检票机改善旅游体验
  • How can I provide a RGBA png file to OpenAI PHP library
  • 【LeetCode】07.整数反转
  • 系统架构师考试学习笔记第三篇——架构设计高级知识(18)面向服务架构设计理论与实践
  • DFS算法专题(一)——二叉树中的深搜【回溯与剪枝的初步注入】
  • 分类预测|基于蜣螂优化极限梯度提升决策树的数据分类预测Matlab程序DBO-Xgboost 多特征输入单输出 含基础模型
  • MySQL 默认事务隔离级别及原因
  • 数组与贪心算法——409、621(1中1简)
  • 游卡,三七互娱,得物,顺丰,快手,oppo,康冠科技,途游游戏,埃科光电25秋招内推
  • notepad++将换行替换成空
  • c++一个数因子和(快速求解)
  • IE9 : DOM Exception: INVALID_CHARACTER_ERR (5)
  • #Java异常处理
  • EOS是什么
  • git 常用命令
  • Java 23种设计模式 之单例模式 7种实现方式
  • leetcode388. Longest Absolute File Path
  • Rancher-k8s加速安装文档
  • SpiderData 2019年2月23日 DApp数据排行榜
  • SSH 免密登录
  • vuex 笔记整理
  • vue学习系列(二)vue-cli
  • 当SetTimeout遇到了字符串
  • 二维平面内的碰撞检测【一】
  • 欢迎参加第二届中国游戏开发者大会
  • 类orAPI - 收藏集 - 掘金
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 爬虫进阶 -- 神级程序员:让你的爬虫就像人类的用户行为!
  • 入门到放弃node系列之Hello Word篇
  • 山寨一个 Promise
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • 智能网联汽车信息安全
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • (1)虚拟机的安装与使用,linux系统安装
  • (19)夹钳(用于送货)
  • (3)(3.2) MAVLink2数据包签名(安全)
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (附源码)springboot建达集团公司平台 毕业设计 141538
  • (介绍与使用)物联网NodeMCUESP8266(ESP-12F)连接新版onenet mqtt协议实现上传数据(温湿度)和下发指令(控制LED灯)
  • (企业 / 公司项目)前端使用pingyin-pro将汉字转成拼音
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (一)UDP基本编程步骤
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • .ai域名是什么后缀?
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .NET : 在VS2008中计算代码度量值
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .net 调用海康SDK以及常见的坑解释
  • .NET多线程执行函数