当前位置: 首页 > news >正文

使用事件和消息队列实现分布式事务(转+补充)

虽然本文并非笔者原创,但是我们在非强依赖的事务中原理上也是采用这种方式处理的,不过因为没有仔细去总结,最近在整理和总结时看到了,故转载并做部分根据我们实际情况的完善和补充。

不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一种方法同样可以解决分布式事务问题, 并且性能较好, 这就是我这篇文章要介绍的使用事件,本地事务以及消息队列来实现分布式事务.

我们从一个简单的实例入手. 基本所有互联网应用都会有用户注册的功能. 在这个例子中, 我们对于用户注册有两步操作: 
1. 注册成功, 保存用户信息.
2. 需要给用户发放一张代金券, 目的是鼓励用户进行消费.
如果是一个单一架构应用, 实现这个功能非常简单: 在一个本地事务里, 往用户表插一条记录, 并且在代金券表里插一条记录, 提交事务就完成了. 但是如果我们的应用是用微服务实现的, 可能用户和代金券是两个独立的服务, 他们有各自的应用和数据库, 那么就没有办法简单的使用本地事务来保证操作的原子性了. 现在来看看如何使用事件机制和消息队列来实现这个需求.(我在这里使用的消息队列是kafka, 原理同样适用于ActiveMQ/RabbitMQ等其他队列)

我们会为用户注册这个操作创建一个事件, 该事件就叫做用户创建事件(USER_CREATED). 用户服务成功保存用户记录后, 会发送用户创建事件到消息队列, 代金券服务会监听用户创建事件, 一旦接收到该事件, 代金券服务就会在自己的数据库中为该用户创建一张代金券. 好了, 这些步骤看起来都相当的简单直观, 但是怎么保证事务的原子性呢? 考虑下面这两个场景:
1. 用户服务在保存用户记录, 还没来得及向消息队列发送消息之前就宕机了. 怎么保证用户创建事件一定发送到消息队列了?
2. 代金券服务接收到用户创建事件, 还没来得及处理事件就宕机了. 重新启动之后如何消费之前的用户创建事件?
这两个问题的本质是: 如何让操作数据库和操作消息队列这两个操作成为一个原子操作. 不考虑2PC, 这里我们可以通过事件表来解决这个问题. 下面是类图. 

EventPublish是记录待发布事件的表. 其中:
id: 每个事件在创建的时候都会生成一个全局唯一ID, 例如UUID.
status: 事件状态, 枚举类型. 现在只有两个状态: 待发布(NEW), 已发布(PUBLISHED).
payload: 事件内容. 这里我们会将事件内容转成json存到这个字段里.
eventType: 事件类型, 枚举类型. 每个事件都会有一个类型, 比如我们之前提到的创建用户USER_CREATED就是一个事件类型.
EventProcess是用来记录待处理的事件. 字段与EventPublish基本相同.

我们首先看看事件的发布过程. 下面是用户服务发布用户创建事件的顺序图. 
1. 用户服务在接收到用户请求后开启事务, 在用户表创建一条用户记录, 并且在EventPublish表创建一条status为NEW的记录, payload记录的是事件内容, 提交事务.
2. 用户服务中的定时器首先开启事务, 然后查询EventPublish是否有status为NEW的记录, 查询到记录之后, 拿到payload信息, 将消息发布到kafka中对应的topic.
发送成功之后, 修改数据库中EventPublish的status为PUBLISHED, 提交事务.

下面是代金券服务处理用户创建事件的顺序图. 
1. 代金券服务接收到kafka传来的用户创建事件(实际上是代金券服务主动拉取的消息, 先忽略消息队列的实现), 在EventProcess表创建一条status为NEW的记录, payload记录的是事件内容, 如果保存成功, 向kafka返回接收成功的消息.
2. 代金券服务中的定时器首先开启事务, 然后查询EventProcess是否有status为NEW的记录, 查询到记录之后, 拿到payload信息, 交给事件回调处理器处理, 这里是直接创建代金券记录. 处理成功之后修改数据库中EventProcess的status为PROCESSED, 最后提交事务.

回过头来看我们之前提出的两个问题:
1. 用户服务在保存用户记录, 还没来得及向消息队列发送消息之前就宕机了. 怎么保证用户创建事件一定发送到消息队列了?
根据事件发布的顺序图, 我们把创建事件和发布事件分成了两步操作. 如果事件创建成功, 但是在发布的时候宕机了. 启动之后定时器会重新对之前没有发布成功的事件进行发布. 如果事件在创建的时候就宕机了, 因为事件创建和业务操作在一个数据库事务里, 所以对应的业务操作也失败了, 数据库状态的一致性得到了保证.
2. 代金券服务接收到用户创建事件, 还没来得及处理事件就宕机了. 重新启动之后如何消费之前的用户创建事件?
根据事件处理的顺序图, 我们把接收事件和处理事件分成了两步操作. 如果事件接收成功, 但是在处理的时候宕机了. 启动之后定时器会重新对之前没有处理成功的事件进行处理. 如果事件在接收的时候就宕机了, kafka会重新将事件发送给对应服务.

通过这种方式, 我们不用2PC, 也保证了多个数据源之间状态的最终一致性.
和2PC/3PC这种同步事务处理的方式相比, 这种异步事务处理方式具有异步系统通常都有的优点:
1. 事务吞吐量大. 因为不需要等待其他数据源响应.
2. 容错性好. A服务在发布事件的时候, B服务甚至可以不在线.
缺点:
1. 编程与调试较复杂.
2. 容易出现较多的中间状态. 比如上面的例子, 在用户服务已经保存了用户并发布了事件, 但是代金券服务还没来得及处理之前, 用户如果登录系统, 会发现自己是没有代金券的. 这种情况可能在有些业务中是能够容忍的, 但是有些业务却不行. 所以开发之前要考虑好.

另外, 上面的流程在实现的过程中还有一些可以改进的地方:
1. 定时器在更新EventPublish状态为PUBLISHED的时候, 可以一次批量更新多个EventProcess的状态.
2. 定时器查询EventProcess并交给事件回调处理器处理的时候, 可以使用线程池异步处理, 加快EventProcess处理周期.
3. 在保存EventPublish和EventProcess的时候同时保存到Redis, 之后的操作可以对Redis中的数据进行, 但是要小心处理缓存和数据库可能状态不一致问题.
4. 针对Kafka, 因为Kafka的特点是可能重发消息, 所以在接收事件并且保存到EventProcess的时候可能报主键冲突的错误(因为重复消息id是相同的), 这个时候可以直接丢弃该消息.

补充点:

1、我们使用的是rabbitmq cluster;

2、有HA要求,就要防止重复SPOF,而涉及到scheduler的时候,就需要防止重复调度,对此,可以使用quartz cluster(可以容忍一定延时)或者分布式事务协调器比如zookeeper,前者相对后者系统结构简单得多,对于这些非强依赖的情况,因为仅在没有等待处理的队列才会到下一个调度间隔,因此进一步使用多线程以及数据库优化之后,延时通常在10ms内全部已经push到对方处理了。

3、对于需要回滚和性能要求极高的业务,上述模式无法直接套用。比如在证券买卖中,延时性是个极为重要的特性,因此不可能在资金冻结之后最长可能超过10ms才发出买单,对于此,建议的设计是这些功能整体VIP化+可信请求处理。而对于需要回滚的操作,其中一种处理方式是,对于每个业务功能,都增加一个对应的对冲逻辑,当对端处理失败的时候,push相应的对冲消息即可。

4、对于某些在短时间内发出大量请求的业务逻辑,比如对于一个账户,一下子发出成千上万请求的业务逻辑,如此通常服务端是通过线程池处理,如果全部请求到到一个线程池中处理,占据了所有的线程和数据库连接,会导致其他用户全部出现hang的情况,这种情况下,服务端需要采用pools of single thread executor,而非通用pools的模式,可能还要进一步针对账号进行二次mod。

在一个系统中,有可能这三种方式都涉及么,如果确实需要分布式系统的规模,那么大部分情况下会需要两种结构并存,某些行业和场景中,三种结构都需要。

对于分布式系统的架构,一定要先按照子系统独立拆分(这是为了减少升级、开发、运维复杂性和成本),然后按照业务数据分库分表拆分(这是为了尽可能晚的引入分布式事务,分布式事务一旦引入,很可能使得开发、测试成本剧增,而且在绝大部分情况下很多系统是拆分过度、架构不合理,而非没有采用分布式的原因),分布式系统绝对不是、也不能因为数据量、并发数很大就构造分布式系统,而是因为业务子系统太多、太复杂、又不得不进行大量的子系统间交互才衍生而来。各独立运行系统间的交互是不能有太多功能的,否则业务架构的设计上就存在疑虑,为了分布式而分布式会使得系统的维护成本极为高昂,同时分布式系统大量依赖于MQ和RPC框架,但是不是使用了RPC和MQ就一定要将系统复杂到分布式系统。

相关文章:

  • JFinal极速开发框架使用笔记(三) 分析Model和ActiveRecord
  • 3138 栈练习2
  • innerHTML、html('')与empty在IE上不同的区别
  • 配置tomcat监听80端口、配置tomcat虚拟机、tomcat日志
  • 关于Docker的一些常识
  • linux下tar、zip 压缩文件不带文件路径
  • 【Amaple教程】5. 插件
  • 数值的整数次方
  • 编写高质量iOS与OS X代码的52个有效方法(二)
  • LAMP下安装zabbix流水
  • Redis进阶实践之六Redis Desktop Manager连接Windows和Linux系统上的Redis服务
  • 自适应滤波:最小均方误差滤波器(LMS、NLMS)
  • JVM笔记
  • 游戏化思维:从工具到玩具
  • Python之递归函数
  • 【399天】跃迁之路——程序员高效学习方法论探索系列(实验阶段156-2018.03.11)...
  • 2018一半小结一波
  • docker容器内的网络抓包
  • gitlab-ci配置详解(一)
  • JavaScript DOM 10 - 滚动
  • php ci框架整合银盛支付
  • spark本地环境的搭建到运行第一个spark程序
  • underscore源码剖析之整体架构
  • ViewService——一种保证客户端与服务端同步的方法
  • 测试开发系类之接口自动化测试
  • 飞驰在Mesos的涡轮引擎上
  • 紧急通知:《观止-微软》请在经管柜购买!
  • 漂亮刷新控件-iOS
  • 让你的分享飞起来——极光推出社会化分享组件
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 一些关于Rust在2019年的思考
  • 在Mac OS X上安装 Ruby运行环境
  • puppet连载22:define用法
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • (16)UiBot:智能化软件机器人(以头歌抓取课程数据为例)
  • (2/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (C++17) optional的使用
  • (ZT)一个美国文科博士的YardLife
  • (附源码)spring boot网络空间安全实验教学示范中心网站 毕业设计 111454
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (九)信息融合方式简介
  • (三) diretfbrc详解
  • (三十五)大数据实战——Superset可视化平台搭建
  • (十六)串口UART
  • (转)关于如何学好游戏3D引擎编程的一些经验
  • .NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布
  • .NET CORE 3.1 集成JWT鉴权和授权2
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .net framework 4.0中如何 输出 form 的name属性。
  • .NET处理HTTP请求
  • :O)修改linux硬件时间
  • @ 代码随想录算法训练营第8周(C语言)|Day57(动态规划)
  • @RequestParam,@RequestBody和@PathVariable 区别