当前位置: 首页 > news >正文

延时队列的几种实现方式(只有原理,并没有源码)

延时队列

需求描述

场景一

在淘宝下了订单,过半个小时未支付就取消订单

场景二

还是淘宝(别问,问就是淘宝资深剁手党),发货后超过15天未确认就自动收货


需求分析

​ 本质上都是超过xxx时间,就异步去做一件事。说到异步那基本上就是搞个定时任务去轮询或者消息队列+轮询。基本上有几种实现方式,挨个看一下。


实现方式

DelayQueue+Delayed

Java的并发包java.util.concurrent下提供了延时队列DelayQueue,它内部维护了一个优先级队列PriorityQueue来维护任务顺序,方便取出到时间的任务。PriorityQueue是个二叉堆,这就意味着它的插入、删除的时间复杂度都是O(logn)。

定时任务Quartz

Quartz是一个任务调度框架,不过它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。如果对超时的时间精度要求没那么高的情况下可以使用。

Redis

Redis的Zset

Redis有种数据类型Zset,它利用score属性为集合内元素维护一个顺序,通过Zset就可以实现延时队列。

做法大概是:

  • 任务插入的时候key是你的单号或者唯一ID,score是时间戳
  • 异步去redis用zrange批量取出超时的单号,然后进行处理

Redis的发布订阅功能

Redis也提供了发布订阅功能,可以修改配置文件redis.conf中的:notify-keyspace-events Ex,监听超时的key,然后编写监听器处理。

RocketMQ

RocketMQ的延迟消息机制

RocketMQ提供的延迟消息机制。如果往RocketMQ发送了一条延迟消息,它不会立刻对消费者可见,而是在指定的时间后再投递给消费者。那么我们可以给RocketMQ投递延迟消息,然后到时间去消费,检查订单是否已经支付,如果未支付就取消订单,如果支付就继续走后面的业务逻辑。

RocketMQ事务消息的反查机制

如果RocketMQ在生成订单的时候用上了事务消息,那么可以用事务消息的状态回查机制来替代定时任务。在下单时,给 Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时 间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

RabbitMQ的TTL+DXL实现延时队列

RabbitMQ可以通过消息存活时间TTL+死信队列的Exchange(DXL)来实现延时队列。

Time To Live(TTL):

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身:

  • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
  • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

Dead Letter Exchanges(DLX):

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)

所以RabbitMQ的原理大概是:

通过设置ttl+dlx,消息到指定时间后,投递到另外一个Exchange中去消费,去检查是否已经支付,未支付就取消,支付了就继续走后序的业务流程。

时间轮

时间轮介绍

时间轮有简单时间轮(Simple Timing Wheel)和分层时间轮(Hierarchical Timing Wheel)两类。两者各有利弊,也都有各自的使用场景。Kafka 采用的是分层时间轮。

分层时间轮

分层时间轮可以简单的认为,一圈时间轮8格,每个格子1s,那么第九秒怎么办?那就再往高抽出层的概念,用第二层+第一格来表示。当然Kafka的分层时间轮比这复杂的多。再举个例子:想想我们生活中的手表。手表由时针、分针和秒针组成,它们各自有独立的刻度,但又彼此相关:秒针转动一圈,分针会向前推进一格;分针转动一圈,时针会向前推进一格。这就是典型的分层时间轮。

和手表不太一样的是,Kafka 自己有专门的术语。

在 Kafka 中,手表中的“一格”叫“一 个桶(Bucket)”,而“推进”对应于 Kafka 中的“滴答”,也就是 tick。

除此之外,每个 Bucket 下也不是白板一块,它实际上是一个双向循环链表(Doubly Linked Cyclic List),里面保存了一组延时请求。

在 Kafka 源码中,时间轮对应 utils.timer 包下的 TimingWheel 类,每个 Bucket 下的链 表对应 TimerTaskList 类,链表元素对应 TimerTaskEntry 类,而每个链表元素里面保存的 延时任务对应 TimerTask。 在这些类中,TimerTaskEntry 与 TimerTask 是 1 对 1 的关系,TimerTaskList 下包含多 个 TimerTaskEntry,TimingWheel 包含多个 TimerTaskList。

Kafka 延时请求实模块现

​ 延迟请求模块属于 Kafka 的冷门组件,Kafka通过延时模块来异步循环操作和管理定时任务。内部是基于时间轮算法实现的。

​ 举个例子:比如配置了 acks=all 的生产者发送的请求可能不会立刻完成,要等 ISR 中的所有副本都要成功才会响应这次写入。只有满足了条件或发生了超时,Kafka 才会把该请求标记为完成状态。

​ 这种请求在Kafka内部是通过TimingWheel类建模时间轮模型,SystemTimer封装了底层的时间轮,由 DelayedOperation 调用, 再通过DelayedOperationPurgatory 管理 DelayedOperation。它们共同实现了 Broker 端对于延迟请求的处理,基本思想就是,能立即完成的请求马上完成,否则就放入到缓冲区,再由DelayedOperationPurgatory 类的方法会自动地处理这些延迟请求。

分层时间轮体系

TimerTask 类:建模 Kafka 延时请求。它是一个 Runnable 类,Kafka 使用一个单独线 程异步添加延时请求到时间轮。

TimerTaskEntry 类:建模时间轮 Bucket 下延时请求链表的元素类型,封装了 TimerTask 对象和定时任务的过期时间戳信息。 TimerTaskList 类:建模时间轮 Bucket 下的延时请求双向循环链表,提供 O(1) 时间复 杂度的请求插入和删除。

TimingWheel 类:建模时间轮类型,统一管理下辖的所有 Bucket 以及定时任务。

分层时间轮的上层组件

包括 Timer 接口及其实现类 SystemTimer、DelayedOperation 类以及 DelayedOperationPurgatory 类:

SystemTimer 类:Kafka 定义的定时器类,封装了底层分层时间轮,实现了时间轮 Bucket 的管理以及时钟向前推进功能。它是实现延迟请求后续被自动处理的基础。

DelayedOperation 类:延迟请求的高阶抽象类,提供了完成请求以及请求完成和过期 后的回调逻辑实现。

DelayedOperationPurgatory 类:Purgatory 实现类,该类定义了 WatcherList 对象 以及对 WatcherList 的操作方法,而 WatcherList 是实现延迟请求后续自动处理的关键 数据结构。

基于kafka实现延迟队列

基于Kafka怎么实现延迟队列?看下网上大神的描述:

  1. 在发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如delay-minutes-1
  2. 写一段代码拉取delay-minutes-1中的消息,将满足条件的消息发送到真正的目标主题里。

​ 可以看出,实际上如果非要用Kafka去做延时队列,可以,但是基本上跟自己实现也差不多了。并且自己实现的时候,拉取消息的时候还要去做特殊的处理,不能发现消息没有过期直接不处理。因为如果你没在指定的时间(max.poll.interval.ms参数配置)去处理,kafka会认为你这个消费者挂掉了,然后去做reblance处理把你踢掉。而reblance可以参考之前整理的博客,并不是很友好的操作,跟JVMGC导致的STW差不多。

总结

列举出了几种延时队列的实践方式,各有优劣:(自己总结,如果有不正确的地方请指正)

  • DelayQueue+异步:底层的数据结构是二叉堆,插入、删除的时间复杂度是O(logn),需要定时扫描很庞大的一个订单信息,数据量很大的时候不是特别的友好,而且宕机数据会丢失
  • 定时任务Quartz,分布式情况下可能会有问题
  • Redis的Zset+异步:可以批量处理,不过准确性取决于你异步处理的频率
  • Redis的发布订阅:实际上就是订阅每个key过期的事件,意味着不能批量处理,当数据量过大的时候非常不友好
  • RocketMQ的延迟消息:数据量大的时候能否处理的过来要依赖于RocketMQ的延迟消息处理能力;是否及时处理也要依赖于消费者那边的处理能力;
  • RocketMQ的事务消息反查机制:实际上是一条一条的处理,数据量大的时候不是特别的友好;
  • RabbitMQ的TTL+DXL:如果数据量过大,消费者消费死信队列数据的时候处理不及时导致消息积压,RabbitMQ的性能会急剧下降。这是RabbitMQ本身特性造成的;

​ 当然也可以参考Kafka中的时间轮算法,自己去实现延时队列;个人认为基于时间轮算法实现的延时队列添加、查找时间复杂度都是

O(1),单看添加、查找的时间复杂度是最好的。

相关文章:

  • DDD整理(概念篇)
  • DDD的分层架构设计
  • 面试记录之synchronized的惨败经历
  • 面试复盘整理
  • Go语言基础_数据类型、基本语法篇
  • Go学习笔记_环境搭建
  • Markdown学习
  • Markdown下载客户端
  • JDK,JRE,JVM三者的区别
  • 2020-12-01
  • 2020-12-02
  • 比较三个数字,求出最大值
  • Scanner限制次数猜数字
  • ArrayList,随机抽取6个数字在【1-33】中的随机数,并且遍历
  • 利用ArrayList遍历集合
  • JavaScript-如何实现克隆(clone)函数
  • 【Linux系统编程】快速查找errno错误码信息
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • 【附node操作实例】redis简明入门系列—字符串类型
  • co.js - 让异步代码同步化
  • create-react-app项目添加less配置
  • CSS实用技巧
  • java8-模拟hadoop
  • Javascripit类型转换比较那点事儿,双等号(==)
  • Java到底能干嘛?
  • mysql_config not found
  • 编写符合Python风格的对象
  • 构建二叉树进行数值数组的去重及优化
  • 关于Android中设置闹钟的相对比较完善的解决方案
  • 老板让我十分钟上手nx-admin
  • 每天10道Java面试题,跟我走,offer有!
  • 前端技术周刊 2019-02-11 Serverless
  • 前端路由实现-history
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 小试R空间处理新库sf
  • 源码安装memcached和php memcache扩展
  • 《码出高效》学习笔记与书中错误记录
  • 大数据全解:定义、价值及挑战
  • ​LeetCode解法汇总518. 零钱兑换 II
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • (+4)2.2UML建模图
  • (C语言)fread与fwrite详解
  • (Python第六天)文件处理
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (二十三)Flask之高频面试点
  • (经验分享)作为一名普通本科计算机专业学生,我大学四年到底走了多少弯路
  • (论文阅读40-45)图像描述1
  • (三)elasticsearch 源码之启动流程分析
  • (算法设计与分析)第一章算法概述-习题
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)可以带来幸福的一本书
  • ***监测系统的构建(chkrootkit )
  • .jks文件(JAVA KeyStore)
  • .NET下的多线程编程—1-线程机制概述