当前位置: 首页 > news >正文

[Spark] 详解 outputMode

在 Spark Structured Streaming 中,outputMode 用于指定将结果表中的数据写入接收器(Sink)的方式。主要有以下三种模式:

  1. append 模式:这是默认的输出模式。仅将自上次触发以来添加到结果表中的新行输出到接收器。适用于不包含聚合操作的查询,或者包含聚合操作且使用了水印(watermark)的情况。在有聚合操作和水印时,水印用于删除旧的聚合状态,并仅在水印超过窗口结束时间时,输出一次最终结果到结果表并写入接收器。如果是没有聚合的操作(如selectwheremapflatMapfilterjoin等),则直接输出结果表中新增的数据。
  2. update 模式:只将自上次触发以来结果表中更新的行(新增或修改)输出到接收器。如果没有聚合操作,其行为类似于 append 模式,即输出结果表中新增的数据;如果有聚合操作,则输出聚合结果改变的数据。
  3. complete 模式:每次触发后,会将整个结果表输出到接收器。此模式仅适用于包含聚合操作的查询。由于它会输出表的所有内容,因此只有在此模式下可以进行全局排序。但需要注意,使用此模式时,结果表数据会一直存储在内存中,所以要谨慎使用,确保数据量不会过大导致内存溢出。

下面是使用不同 outputMode 的示例代码:

// 无聚合操作,使用 append 模式
val noAggDf = deviceDataDf.select("device").where("signal > 10")
noAggDf.writeStream.format("console") .start() noAggDf.writeStream.parquet("path/to/destination/directory") .start() // 有聚合操作,使用 complete 模式
val aggDf = df.groupBy("device").count()
aggDf.writeStream.outputMode("complete") .format("console") .start() aggDf.writeStream.queryName("aggregates") // 此查询名称将成为表名.outputMode("complete") .format("memory") .start() 
spark.sql("select * from aggregates").show() // 交互查询内存表

在实际应用中,选择哪种 outputMode 取决于具体的业务需求和数据特点。如果数据量较大且不需要全局排序,或者没有聚合操作,通常使用 append 模式以减少输出的数据量。如果需要输出完整的聚合结果或进行全局排序,则使用 complete 模式,但要注意内存限制。而 update 模式适用于只关心数据更新部分的情况。

此外,还需要注意的是,使用某些接收器(如FileSink)时,其支持的 outputMode 可能会受到限制。例如,在 Spark 2.0 中,FileSink 仅支持 Parquet 文件和 append 模式。同时,为了确保数据的一致性和可靠性,还可以设置检查点(checkpoint)来保存一些元数据信息,以便在出现故障时能够恢复。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 创建完整的APP页面
  • cuda编程Debug断点调试
  • web小游戏开发:拼图(二)图片切割
  • SQL必知必会
  • 从零到一:用Go语言构建你的第一个Web服务
  • 《Java初阶数据结构》----8.<java对象的比较总结>
  • springboot自动装配原理(springboot知识点梳理二)
  • Linux系统上安装zookeeper
  • Ubuntu下设置文件和文件夹用户组和权限
  • Vue项目增加路由跳转全局进度条 NProgress.js
  • 【Rust光年纪】深入了解Rust语言的关键库:功能特点与使用场景分析
  • 一文总结代理:代理模式、代理服务器
  • 【Python】基础语法(下)
  • 代码混淆与代码打包---bash脚本
  • Spring笔记(五)——事务
  • Android Volley源码解析
  • Apache的80端口被占用以及访问时报错403
  • CSS 提示工具(Tooltip)
  • Java|序列化异常StreamCorruptedException的解决方法
  • JSDuck 与 AngularJS 融合技巧
  • nfs客户端进程变D,延伸linux的lock
  • PHP面试之三:MySQL数据库
  • vue--为什么data属性必须是一个函数
  • 程序员最讨厌的9句话,你可有补充?
  • 汉诺塔算法
  • 理解在java “”i=i++;”所发生的事情
  • 漂亮刷新控件-iOS
  • 前端面试之CSS3新特性
  • 中国人寿如何基于容器搭建金融PaaS云平台
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 交换综合实验一
  • 如何在 Intellij IDEA 更高效地将应用部署到容器服务 Kubernetes ...
  • ​Benvista PhotoZoom Pro 9.0.4新功能介绍
  • ​DB-Engines 12月数据库排名: PostgreSQL有望获得「2020年度数据库」荣誉?
  • # wps必须要登录激活才能使用吗?
  • #QT 笔记一
  • (11)工业界推荐系统-小红书推荐场景及内部实践【粗排三塔模型】
  • (ros//EnvironmentVariables)ros环境变量
  • (每日持续更新)jdk api之StringBufferInputStream基础、应用、实战
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (太强大了) - Linux 性能监控、测试、优化工具
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • . Flume面试题
  • .net mvc 获取url中controller和action
  • .NET中分布式服务
  • .net最好用的JSON类Newtonsoft.Json获取多级数据SelectToken
  • .pyc文件是什么?
  • /etc/apt/sources.list 和 /etc/apt/sources.list.d
  • @EventListener注解使用说明
  • @Service注解让spring找到你的Service bean
  • @Tag和@Operation标签失效问题。SpringDoc 2.2.0(OpenApi 3)和Spring Boot 3.1.1集成
  • [ IOS ] iOS-控制器View的创建和生命周期
  • [ 隧道技术 ] 反弹shell的集中常见方式(二)bash反弹shell
  • [52PJ] Java面向对象笔记(转自52 1510988116)
  • [Algorithm][动态规划][路径问题][不同路径][不同路径Ⅱ][珠宝的最高价值]详细讲解