当前位置: 首页 > news >正文

kafka跨地区跨集群同步工具MirrorMaker2 —— 筑梦之路

MM2简介 

KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation

有四种运行MM2的方法:

As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
As a standalone Connect worker.(作为独立的Connect工作者)
In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

MM2集群模式

配置文件示例1:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

注意:

replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作.

参考资料:

【如何保持topic名称一致】

【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)_kafka mm2同步的目标topic名不一样-CSDN博客

配置文件示例2:

#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置

Standalone模式运行

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties配置文件

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties配置文件

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

 启动命令

./connect-standalone.sh worker.properties connector.properties

参考资料:

kafka 异步双活方案 mirror maker2 深度解析 - 知乎

Apache Kafka MirrorMaker 2.0 指南 - Azure HDInsight | Microsoft Learn

简述 Kafka Mirror Maker v2 – K's Blog

Getting up to Speed with MirrorMaker 2 - Confluent

9.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据 Red Hat AMQ 2021.q3 | Red Hat Customer Portal

https://blog.51cto.com/u_14049791/5703235

KafkaMirrorMaker2.0架构与实战全解《三》 - 墨天轮

2.4. Kafka MirrorMaker 2 集群配置 Red Hat AMQ Streams 2.4 | Red Hat Customer Portal

kafka:MirrorMaker-V1(MM1)到MirrorMaker-V2(MM2)-腾讯云开发者社区-腾讯云

【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程_kafka数据同步到另外一个kafka-CSDN博客

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 03-ArcGIS For JavaScript结合ThreeJS功能
  • vue项目实战 - 如果高效的实现防抖和节流
  • 软考-程序员 知识点与部分真题梳理
  • qt多语言翻译不生效的原因
  • 回溯大法总结
  • microsoft的azure语音,开发环境运行正常,发布到centos7线上服务器之后无法运行解决方案
  • OneAPI接入本地大模型+FastGPT调用本地大模型
  • Python期末复习知识点大合集(期末不挂科版)
  • AWS安全性身份和合规性之Identity and Access Management(IAM)
  • 数据库--数据库基础(一)
  • 多微信如何高效管理?一台电脑就能搞定!
  • Android动态设置淡入淡出动画
  • 百度集团:AI重构,走到哪了?
  • JVM-调优之-高cpu线程问题排查
  • 跨境电商赛道,云手机到底能不能化繁为简?
  • 分享一款快速APP功能测试工具
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • download使用浅析
  • FineReport中如何实现自动滚屏效果
  • Java多态
  • k8s 面向应用开发者的基础命令
  • Mocha测试初探
  • SpringCloud集成分布式事务LCN (一)
  • 计算机在识别图像时“看到”了什么?
  • 看域名解析域名安全对SEO的影响
  • 码农张的Bug人生 - 初来乍到
  • 推荐一个React的管理后台框架
  • 我看到的前端
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • ​插件化DPI在商用WIFI中的价值
  • ​水经微图Web1.5.0版即将上线
  • #《AI中文版》V3 第 1 章 概述
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (附源码)ssm码农论坛 毕业设计 231126
  • (附源码)基于SpringBoot和Vue的厨到家服务平台的设计与实现 毕业设计 063133
  • (亲测有效)解决windows11无法使用1500000波特率的问题
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (十八)Flink CEP 详解
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (实战)静默dbca安装创建数据库 --参数说明+举例
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转) RFS+AutoItLibrary测试web对话框
  • (转)http协议
  • .axf 转化 .bin文件 的方法
  • .net 4.0 A potentially dangerous Request.Form value was detected from the client 的解决方案
  • .net 使用ajax控件后如何调用前端脚本
  • .NET/C#⾯试题汇总系列:集合、异常、泛型、LINQ、委托、EF!(完整版)
  • .NET微信公众号开发-2.0创建自定义菜单
  • .NET学习教程二——.net基础定义+VS常用设置
  • .Net转Java自学之路—SpringMVC框架篇六(异常处理)