当前位置: 首页 > news >正文

基于MQ的分布式事务实现方案

1. 前言

在与不同业务系统之间进行交互时,怎么保证发送的消息对方一定能收到,可能有人说RocketMQ就能做到,如果贵公司用到的消息队列是kafka、rabbitmq、activemq怎么保证不丢?

这里分享一下基于消息的分布式事务解决方案,此种方案是最终一致性的解决方案,不挑MQ,但是前提MQ本身要支持接收到的消息不能丢失。

2. MQ的配置建议

如果要保证MQ接收到的消息不丢,就要配置相关的同步策略或者刷盘策略

主从同步策略

建议主从同步建议设置为主从同步策略为主从同步完再响应,这样单个节点如果挂了,另一个节点的数据还会存在

刷盘策略

消息中间件为了提高效率,默认接收到消息不会立即刷盘,如果要主从同步策略是主节点接收到消息以后立即响应,这会正好主节点宕机,就会导致消息丢失,所以要特别注意下,虽然可以设置成同步刷盘,但是效率就会降低,所以还是建议设置主从同步策略

3. 生产方设计

生产者的职责是必须要保证本地事务提交成功消息一定要发送出去,或者业务处理失败就不发送。

3.1 消息持久化

生产方方案如下,首先需要在业务库中创建一张表,字段大致为:

  • 消息id
  • 业务id
  • 业务方名称 (如果一个库是多个子系统在用就需要这个字段)
  • topic (发送消息主题)
  • 分区
  • 消息体
  • 状态 (0未发送 1已发送)
  • 是否失败重试( 0不重试 1重试)

与本地业务表使用同一个事务,提交则一起提交,回滚则一起回滚,因为使用的同一个事务所以是强一致的,再事务提交以后进行消息数据的发送,发送成功以后则更改消息状态为已发送,具体流程请查看 图1

图1 消息正常发送流程

这里可能还有一点还要考虑,就是在图1的第2步、第3步、第4步会出现失败,具体描述如下:

  1. 如插入本地库成功,但是发送MQ失败
  2. 消息发送成功,但是响应失败,比如超时,其实这会MQ已经接收发送方的消息了,但是发送方不清楚
  3. 消息发送响应都成功了,但是更改本地表状态为已发送失败了。

持久化相关代码

图3 消息数据保存

图4,集成Spring的事务管理器,重写事务提交后发送消息

​ 图4 消息数据发送

3.2 消息补偿设计

以上这三个问题就需要引入补偿任务来处理了,具体查看 图5 ,补偿任务会根据发送状态查询对应的数据,然后进行发送,这里有一点特别注意, 消费方要必须做幂等处理 ,因为图1的第3步、第4步消息都已经发送到MQ了,只是发送方不清楚,所以还会重复发送,另外99.9%的场景是能立即发送成功的,只有很小部分需要做补偿,

​ 图5 消息补偿流程

补偿代码

查询待发送的数据,这里为1分钟之前的,定时任务用的是elastic-job,用其他定时任务也可以

至此整个发送方设计就完成了,下面看看部分

4. 消费方设计

消费方相对比较简单,主要有两点要求

  1. 保证消息不会重复消费
  2. 记录消息便于消息对账,对账主要是极端情况下,那些消息没收到,便于重新投递

以下是消费表的设计

  • 消息id
  • 业务id
  • Topic
  • 分区
  • 偏移量
  • 消息体
  • 状态( 0未消费成功 1消费成功 2消费失败 )
  • 异常信息 (消费失败会记录异常信息)
  • 业务方名称 (如果一个库是多个子系统在用就需要这个字段)

此表也要与业务表处于同一个事务,如果不是一个事务,会出现业务表操作成功、消息表插入失败,如果出现消息重复发送就会出现重复消费的问题,具体查看 图6

图6 消费方处理流程图

消费方代码

这里是kafka的消费代码,通过动态代理,封装KafkaListener类,在处理前进行消息重复判断,在处理后进行消费表的插入,这里需要特别注意一点,业务处理不能把异常自己吃掉,否则上层捕获不到,会认为业务处理成功,从而插入脏数据

图7 消费方部分核心代码

5. 历史数据清理

通过前面介绍,我们创建了2张表,分别为消息发送表、消息消费表,这两张表要特别注意下,如果业务量比较大,数据量会快速增长,所以需要删除已经处理成功的数据,通过配置两个定时任务,保留一定的时间数据,其他时间的数据就可以删除了,代码如下

图8 发送方清理数据代码

图9 消费方清理数据代码

相关文章:

  • 基于PaddleOCR开发easy click文字识别插件
  • 建立对象模型— 如何确定类与对象?
  • VMware Workstation Pro16 的下载与安装
  • java计算机毕业设计基于安卓Android的电子废弃物回收利用APP
  • PostgreSQL LISTEN 与NOTIFY命令
  • 基于ssm的养老智慧服务平台毕业设计源码071526
  • MySQL 安装详细步骤
  • 关于 SAP ABAP CL_HTTP_CLIENT API 中的 SSL_ID 参数
  • JavaFX、贷款服务器
  • Powershell历史执行记录
  • elementui中表格组件的高度修改没效果
  • 难受啊,早饭忘记吃了
  • 标签上有什么defer和async属性?<script>
  • 放大器的稳定性分析举例
  • ipsec vxn详解
  • 10个最佳ES6特性 ES7与ES8的特性
  • C++11: atomic 头文件
  • CSS盒模型深入
  • JavaScript设计模式系列一:工厂模式
  • JSONP原理
  • SegmentFault 2015 Top Rank
  • spring boot下thymeleaf全局静态变量配置
  • vagrant 添加本地 box 安装 laravel homestead
  • Vue.js源码(2):初探List Rendering
  • Web标准制定过程
  • yii2权限控制rbac之rule详细讲解
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 一份游戏开发学习路线
  • 一个项目push到多个远程Git仓库
  • 智能合约开发环境搭建及Hello World合约
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • # 睡眠3秒_床上这样睡觉的人,睡眠质量多半不好
  • #define,static,const,三种常量的区别
  • #我与Java虚拟机的故事#连载14:挑战高薪面试必看
  • (1)(1.13) SiK无线电高级配置(五)
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测
  • (二)Linux——Linux常用指令
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (算法二)滑动窗口
  • (原創) 如何解决make kernel时『clock skew detected』的warning? (OS) (Linux)
  • (转) Face-Resources
  • (转)Scala的“=”符号简介
  • .cn根服务器被攻击之后
  • .NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务
  • .net 设置默认首页
  • .NET 使用 ILMerge 合并多个程序集,避免引入额外的依赖
  • /etc/X11/xorg.conf 文件被误改后进不了图形化界面
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)
  • @LoadBalanced 和 @RefreshScope 同时使用,负载均衡失效分析
  • [BZOJ4566][HAOI2016]找相同字符(SAM)
  • [C++]Leetcode17电话号码的字母组合