[Spark] 详解 outputMode
在 Spark Structured Streaming 中,outputMode
用于指定将结果表中的数据写入接收器(Sink)的方式。主要有以下三种模式:
- append 模式:这是默认的输出模式。仅将自上次触发以来添加到结果表中的新行输出到接收器。适用于不包含聚合操作的查询,或者包含聚合操作且使用了水印(watermark)的情况。在有聚合操作和水印时,水印用于删除旧的聚合状态,并仅在水印超过窗口结束时间时,输出一次最终结果到结果表并写入接收器。如果是没有聚合的操作(如
select
、where
、map
、flatMap
、filter
、join
等),则直接输出结果表中新增的数据。 - update 模式:只将自上次触发以来结果表中更新的行(新增或修改)输出到接收器。如果没有聚合操作,其行为类似于 append 模式,即输出结果表中新增的数据;如果有聚合操作,则输出聚合结果改变的数据。
- complete 模式:每次触发后,会将整个结果表输出到接收器。此模式仅适用于包含聚合操作的查询。由于它会输出表的所有内容,因此只有在此模式下可以进行全局排序。但需要注意,使用此模式时,结果表数据会一直存储在内存中,所以要谨慎使用,确保数据量不会过大导致内存溢出。
下面是使用不同 outputMode
的示例代码:
// 无聚合操作,使用 append 模式
val noAggDf = deviceDataDf.select("device").where("signal > 10")
noAggDf.writeStream.format("console") .start() noAggDf.writeStream.parquet("path/to/destination/directory") .start() // 有聚合操作,使用 complete 模式
val aggDf = df.groupBy("device").count()
aggDf.writeStream.outputMode("complete") .format("console") .start() aggDf.writeStream.queryName("aggregates") // 此查询名称将成为表名.outputMode("complete") .format("memory") .start()
spark.sql("select * from aggregates").show() // 交互查询内存表
在实际应用中,选择哪种 outputMode
取决于具体的业务需求和数据特点。如果数据量较大且不需要全局排序,或者没有聚合操作,通常使用 append 模式以减少输出的数据量。如果需要输出完整的聚合结果或进行全局排序,则使用 complete 模式,但要注意内存限制。而 update 模式适用于只关心数据更新部分的情况。
此外,还需要注意的是,使用某些接收器(如FileSink
)时,其支持的 outputMode
可能会受到限制。例如,在 Spark 2.0 中,FileSink
仅支持 Parquet 文件和 append 模式。同时,为了确保数据的一致性和可靠性,还可以设置检查点(checkpoint)来保存一些元数据信息,以便在出现故障时能够恢复。