当前位置: 首页 > news >正文

Flink和Kafka连接时的精确一次保证

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。

相关文章:

  • 文字的力量
  • C++项目案例圆和点的关系 (涉及知识点:头文件定义类,cpp文件实现类,类和作用域,linux编译运行c++项目)
  • html实现图片裁剪处理(附源码)
  • 通过bat命令启动jar后缀软件
  • C++继承(公有继承,保护继承,私有继承)
  • 企业APP软件定制开发的关键步骤|网站小程序搭建
  • asp.net在线考试系统+sqlserver数据库
  • Ubuntu22.04 部署Mqtt服务器
  • 最长单调上升子序列问题
  • Centos(Linux)服务器安装Dotnet8 及 常见问题解决
  • 21. 深度学习 - 拓朴排序的原理和实现
  • 使用webhook发送企业微信消息
  • 【C++】类和对象(7)--友元, static成员
  • Android 12 客制化修改初探-Launcher/Settings/Bootanimation
  • 斯坦福机器学习 Lecture1 (机器学习,监督学习、回归问题、分类问题定义)
  • android图片蒙层
  • Computed property XXX was assigned to but it has no setter
  • crontab执行失败的多种原因
  • ESLint简单操作
  • ES学习笔记(12)--Symbol
  • java取消线程实例
  • jdbc就是这么简单
  • js继承的实现方法
  • Promise面试题,控制异步流程
  • Quartz初级教程
  • React-redux的原理以及使用
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • 聊一聊前端的监控
  • 推荐一款sublime text 3 支持JSX和es201x 代码格式化的插件
  • 我从编程教室毕业
  • 学习使用ExpressJS 4.0中的新Router
  • 一个JAVA程序员成长之路分享
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (第二周)效能测试
  • (数位dp) 算法竞赛入门到进阶 书本题集
  • (转载)从 Java 代码到 Java 堆
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • .NET Core 2.1路线图
  • .NET Framework Client Profile - a Subset of the .NET Framework Redistribution
  • .NET Windows:删除文件夹后立即判断,有可能依然存在
  • .net 后台导出excel ,word
  • .net 生成二级域名
  • .NET性能优化(文摘)
  • // an array of int
  • /dev/VolGroup00/LogVol00:unexpected inconsistency;run fsck manually
  • /etc/sudoer文件配置简析
  • :O)修改linux硬件时间
  • @RequestParam详解
  • [ 环境搭建篇 ] 安装 java 环境并配置环境变量(附 JDK1.8 安装包)
  • [04]Web前端进阶—JS伪数组
  • [2013AAA]On a fractional nonlinear hyperbolic equation arising from relative theory
  • [acm算法学习] 后缀数组SA