当前位置: 首页 > news >正文

Flink Kafka Sink 源码分析

Sink部分Flink根据Kafka分为了2个部分

  • 0.11之前
  • 0.11之后

0.11之前因为没有kafka的事务相关 所以没法做到 消息的exactly_once

0.11之后是可以实现的

FlinkKafkakProducer 的创建有多个重载构造方法,当我们没有指定相关的 流checkpoint 语义,

那么默认 是at_least_once

FlinkKafkaProducerBase

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {

继承 sinkFunction 基于实现checkpointFunction

对于SinkFunction相关的处理逻辑是在invoke,snapshot里面,但我们首先看下其一些前置准备工作 open(), init()

Open

open方法做了一些前置工作的准备,主要是一下几个

  1. 序列化方式
  2. 创建KafkaProducer
  3. metrics
  4. checkpoint配置

Invoke

invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)

如果 transcation 如果是 EXACTLY_ONCE 每次会在每次 checkpoint的时候变化

做一些消息转发的动作, 如果是 exactly_once模式 这时候是不会真正的提交,只有在checkpoint的时候才会

将本次事务的消息进行提交,然后再开启下一个事务

Snapshot checkpoint 阶段 (重点)

  1. FlinkKafkaProducer.snapshot()
    1. super.snapshotState(context) 实际调用 TwoPhaseCommitSinkFunction
  2. TwoPhaseCommitSinkFunction.snapshotState
    1. 获取checkpointId
    2. preCommit 将当前事务的数据进行提交
      1. kafkaProducer: 将exactly_once 和 at_least_once 的数据进行 producer.flush

        3. currentTransactionHolder = beginTransactionInternal() 开启一个新的事务

  •                 这里只有对 exactly_once 模式进行了事务创建
  •                 at_least_once && none 默认复用之前的

相关文章:

  • 高斯消元法(2)——保姆级笔记
  • R语言因子分析全流程
  • Nginx简单配置 - 基础安全
  • 基于sdrpi的openwifi实践2:生成BOOT.BIN
  • 七、安装Centos7+8系统+超级优化
  • Kali-登录暴力破解器工具-medusa使用
  • 除砷树脂HP-776
  • 矿产行业供应链协同系统解决方案:构建数智化平台,保障矿产资源安全供应
  • 申请专利流程及费用。
  • HTML+CSS期末网页设计前端作品(大三)
  • java-php-python-ssm-民航售票管理系统-计算机毕业设计
  • 学生台灯用led灯好还是荧光灯好?推荐几款高品质的LED灯
  • RabbitMQ安装
  • 为啥3次握手4次挥手
  • 前端promise理解
  • Angular6错误 Service: No provider for Renderer2
  • Brief introduction of how to 'Call, Apply and Bind'
  • css的样式优先级
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • Kibana配置logstash,报表一体化
  • mongo索引构建
  • Sass Day-01
  • springMvc学习笔记(2)
  • 阿里云前端周刊 - 第 26 期
  • 从零开始学习部署
  • 判断客户端类型,Android,iOS,PC
  • 我的业余项目总结
  • 学习笔记:对象,原型和继承(1)
  • 在Unity中实现一个简单的消息管理器
  • #QT项目实战(天气预报)
  • #每日一题合集#牛客JZ23-JZ33
  • #我与Java虚拟机的故事#连载16:打开Java世界大门的钥匙
  • (floyd+补集) poj 3275
  • (Matlab)遗传算法优化的BP神经网络实现回归预测
  • (rabbitmq的高级特性)消息可靠性
  • (ZT)北大教授朱青生给学生的一封信:大学,更是一个科学的保证
  • (分享)一个图片添加水印的小demo的页面,可自定义样式
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (原創) 物件導向與老子思想 (OO)
  • .NET 2.0中新增的一些TryGet,TryParse等方法
  • .net core Swagger 过滤部分Api
  • .NET WebClient 类下载部分文件会错误?可能是解压缩的锅
  • .netcore 如何获取系统中所有session_ASP.NET Core如何解决分布式Session一致性问题
  • .NET中的十进制浮点类型,徐汇区网站设计
  • /使用匿名内部类来复写Handler当中的handlerMessage()方法
  • @Data注解的作用
  • @GlobalLock注解作用与原理解析
  • @Responsebody与@RequestBody
  • @SuppressWarnings注解
  • @value 静态变量_Python彻底搞懂:变量、对象、赋值、引用、拷贝
  • [8-23]知识梳理:文件系统、Bash基础特性、目录管理、文件管理、文本查看编辑处理...
  • [AMQP Connection 127.0.0.1:5672] An unexpected connection driver error occured