当前位置: 首页 > news >正文

【Flink读写外部系统】Flink自定义kafka分区并输出

目录

  • 1 kafka中的消息写入与自定义分区器
  • 2 Flink的分区数定义多少个合适?
  • 3 FlinkkafkaProducer下的消费保障
  • 4 代码

1 kafka中的消息写入与自定义分区器

  • 在我们将消息写入kafka的topic时,我们可以通过FlinkkafkaPartitioner指定写入topic的哪个分区。在不指定的情况下,默认的分区器会将每个数据任务映射到一个单独的kafka分区中,即单个任务的所有记录都会发往同一分区。
  • 如果任务数多余分区数,则每个分区可能会包含多个任务发来的记录。
  • 而如果任务数小于分区数,则默认配置会导致有些分区收不到数据。
  • 若此时恰好有使用事件时间的Flink应用消费了该Topic,那么可能会导致问题;
    (理由:在上面我们讲到分配器会在每个分区上定义水位线,然后再对各个分区之间的水位线进行合并)

2 Flink的分区数定义多少个合适?

  • Flink kafka 连接器会以并行的方式获取事件流。每个并行的数据源任务都可以从一个或多个分区中读取数据
    因此当我的程序并发数比较多的情况下,kafka分区数我也设置多点,是不是就意味着我程序处理kafka中topic的数据就会比较高效?

    分区设置的多点好还是不好?
    分区设置的多点既有好处也有坏处,虽然我的分区设置多点,集群的吞吐量会变大,但每个分区也有自己的开销;

    … …

3 FlinkkafkaProducer下的消费保障

注意:
  这里是通过FlinkkafkaProducer将数据发送到kafka;跟下面的检查点是不一样的
FlinkkafkaProducer下的消费保障总共分为3级别
val kafkaSink = new FlinkKafkaProducer[ResultDt]("topicName", kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

  1. at most once:对于一条message,kafka的receiver最多收到一次(0次或1次)
    (sender把message发送给receiver后,无论receiver是否收到message,sender都不再重新发送message)
  2. at least once(这是默认选项):对于一条message,Kafka的Receiver最少收到一次(1次及以上)
    (sender把message发送给receiver后,当receiver在规定的时间内没有恢复或恢复了error信息,那么sender就会重发,知道sender收到receiver的成功信息)
  3. exactly once:对于一条message,receiver确保只收到一次

4 代码

以下样例代码采用的是flink1.7版本,并依托于数据中的key做分区。

class MySchema extends KeyedSerializationSchema[(String, Int)] {
  override def serializeKey(element: (String, Int)): Array[Byte] = {
    element._2.toString.getBytes() //序列胡key
  }

  override def serializeValue(element: (String, Int)): Array[Byte] = {
    element._1.getBytes() //序列化value
  }

  override def getTargetTopic(element: (String, Int)): String = {
    null //这里返回要发送的topic名字,没什么用,可以不做处理
  }
}

object addSink_kafka_并自定义序列化和分区 extends App {

  import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

  val streamLocal = StreamExecutionEnvironment.createLocalEnvironment(3)

  import org.apache.flink.api.scala._ //如果数据是有限的(静态数据集)可以引入这个包
  val dataStream: DataStream[(String, Int)] = streamLocal.fromElements(("flink_er", 3), ("f", 1), ("c", 2), ("c", 1), ("d", 5))

  val properties = new Properties()
  properties.setProperty("bootstrap.servers", "master:9092")
  val flinkKafkaProducer = new FlinkKafkaProducer(
    "ceshi01",
    new MySchema(),
    properties,
    Optional.of(new FlinkKafkaPartitioner[(String, Int)] {
      /**
        * @param record      正常的记录
        * @param key         KeyedSerializationSchema中配置的key
        * @param value       KeyedSerializationSchema中配置的value
        * @param targetTopic targetTopic
        * @param partitions  partition列表[0, 1, 2, 3, 4]
        * @return partition
        */
      override def partition(record: (String, Int), key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
        Math.abs(new String(key).hashCode() % partitions.length)
      }
    })
  )
  dataStream.addSink(flinkKafkaProducer)

  streamLocal.execute()
}

相关文章:

  • 【云原生】学习K8s的扩展技能(CRD)
  • Chapter05 修炼python基本功:条件语句和循环
  • 彻底掌握Makeifle(三)
  • 手机抓取蓝牙日志btsnoop的方法汇总(Android一直补充中)
  • 【Vue 开发实战】实战篇 # 30:实现一个可动态改变的页面布局
  • [单片机框架][drivers层][cw2015/ADC] fuelgauge 硬件电量计和软件电量计(一)
  • 【iVX 开发 - 入门】开发环境、应用对象树介绍(含操作演示)
  • CTFshow 代码审计
  • 19-Django REST framework-DRF工程搭建
  • CSP-S信息学奥赛考试大纲(提高级)
  • 电源硬件设计----降压-升压(Buck-Boost)变换器基础
  • C语言循环的嵌套、比较、break语句,continue语句
  • 【数据挖掘算法与应用】——数据挖掘导论
  • Java语言高级特性——泛型
  • 混合模拟退火和教与学的鸽群优化算法-附代码
  • Docker容器管理
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • fetch 从初识到应用
  • JS题目及答案整理
  • web标准化(下)
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • 阿里云前端周刊 - 第 26 期
  • 爱情 北京女病人
  • 二维平面内的碰撞检测【一】
  • 分布式熔断降级平台aegis
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 日剧·日综资源集合(建议收藏)
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 云栖大讲堂Java基础入门(三)- 阿里巴巴Java开发手册介绍
  • 在Unity中实现一个简单的消息管理器
  • Java总结 - String - 这篇请使劲喷我
  • 阿里云ACE认证之理解CDN技术
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (13):Silverlight 2 数据与通信之WebRequest
  • (C语言)fgets与fputs函数详解
  • (笔试题)合法字符串
  • (七)MySQL是如何将LRU链表的使用性能优化到极致的?
  • (转)IOS中获取各种文件的目录路径的方法
  • (转)LINQ之路
  • (转)shell中括号的特殊用法 linux if多条件判断
  • (转载)Google Chrome调试JS
  • .dat文件写入byte类型数组_用Python从Abaqus导出txt、dat数据
  • .net的socket示例
  • .NET与java的MVC模式(2):struts2核心工作流程与原理
  • @CacheInvalidate(name = “xxx“, key = “#results.![a+b]“,multi = true)是什么意思
  • @staticmethod和@classmethod的作用与区别
  • [100天算法】-二叉树剪枝(day 48)
  • [4.9福建四校联考]
  • [ACM] hdu 1201 18岁生日
  • [ASP]青辰网络考试管理系统NES X3.5
  • [BZOJ1060][ZJOI2007]时态同步 树形dp
  • [BZOJ4010]菜肴制作
  • [C#]OpenCvSharp使用帧差法或者三帧差法检测移动物体