当前位置: 首页 > news >正文

【Redis笔记】基于Redis的Stream结构作为消息队列,实现异步任务

使用redis命令创建消息队列

在redis-cli中执行如下指令

XGROUP CREATE key groupName ID [MKSTREAM]

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

示例:

XGROUP CREATE streams.orders g1 0 MKSTREAM

编写Lua脚本,向redis消息队列中发送消息

-- lua脚本中其他事项处理部分-- 获取调用的参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]-- key
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId-- 业务
-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足return 1
end
-- 判断用户是否已经下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 存在说明重复下单return 2
end
-- 扣库存,下单
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
-- 发消息到队列, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0

业务代码——执行Lua脚本

	private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();// 从resources目录下加载脚本SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));// lua脚本执行返回值SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {// 获取用户Long userId = UserHolder.getUser().getId();// 订单Idlong orderId = redisIdWorker.nextId("order");// 执行lua脚本int result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId)).intValue();// 判断结果为0if (result != 0) {// 不为0,没有购买资格return Result.fail(result == 1 ? "库存不足" : "不能重复下单");}// 获取代理对象(事务)proxy = (IVoucherOrderService) AopContext.currentProxy();// 返回订单信息return Result.ok(orderId);}

业务代码——从消息队列获取消息并处理

	// 线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 注解含义,在Bean被创建完毕后执行@PostConstructprivate void init() {// SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}// 从消息队列中获取消息,异步下单private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 count 1 block 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 判断消息是否获取成功if (list == null || list.isEmpty()) {// 如果获取失败,说明没有消息,继续下一次循环continue;}// 解析消息中的订单消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 如果获取成功,执行下单handleVoucherOrder(voucherOrder);// ACK确认,SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单处理异常", e);// 发生异常后去pending-list中处理消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 count 1 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 判断消息是否获取成功if (list == null || list.isEmpty()) {// 如果获取失败,说明pending-list没有异常消息,结束循环break;}// 解析消息中的订单消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 如果获取成功,执行下单handleVoucherOrder(voucherOrder);// ACK确认,SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单处理异常", e);}}}}

相关文章:

  • 宏集案例 | 风电滑动轴承齿轮箱内多点温度采集与处理
  • 【前端】处理一次性十万条数据渲染方案(不考虑后端分页)
  • 【安卓与苹果区别】详细讲解
  • uniapp发行H5获取当前页面query
  • QT UI设计
  • 【LeetCode周赛】第388场周赛
  • C while 循环
  • C++ lambda函数个人理解
  • 【话题】2024年AI辅助研发趋势,有那些应用领域
  • 【STL】string各种函数的应用
  • TinyEMU之RISCV-PK编译
  • SpringCloud-Alibaba-Nacos教程
  • vs2022 错误(活动) E1696 无法打开 源 文件 “bits/stdc++.h“解决办法
  • Github上哪些好用的工具
  • 2022 年河南省中等职业教育技能大赛
  • Docker: 容器互访的三种方式
  • egg(89)--egg之redis的发布和订阅
  • Javascripit类型转换比较那点事儿,双等号(==)
  • JS笔记四:作用域、变量(函数)提升
  • Js基础知识(一) - 变量
  • js正则,这点儿就够用了
  • MD5加密原理解析及OC版原理实现
  • mysql中InnoDB引擎中页的概念
  • Node 版本管理
  • rabbitmq延迟消息示例
  • yii2中session跨域名的问题
  • 多线程事务回滚
  • 融云开发漫谈:你是否了解Go语言并发编程的第一要义?
  • 数组大概知多少
  • 小李飞刀:SQL题目刷起来!
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • ​ubuntu下安装kvm虚拟机
  • #if #elif #endif
  • #Linux(make工具和makefile文件以及makefile语法)
  • %3cli%3e连接html页面,html+canvas实现屏幕截取
  • (+3)1.3敏捷宣言与敏捷过程的特点
  • (13)Hive调优——动态分区导致的小文件问题
  • (4) PIVOT 和 UPIVOT 的使用
  • (NO.00004)iOS实现打砖块游戏(十二):伸缩自如,我是如意金箍棒(上)!
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (动态规划)5. 最长回文子串 java解决
  • (独孤九剑)--文件系统
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • (转)IOS中获取各种文件的目录路径的方法
  • ******之网络***——物理***
  • ... fatal error LINK1120:1个无法解析的外部命令 的解决办法
  • .axf 转化 .bin文件 的方法
  • .NET C#版本和.NET版本以及VS版本的对应关系
  • .net mvc部分视图
  • .net 简单实现MD5
  • .net 重复调用webservice_Java RMI 远程调用详解,优劣势说明
  • .NET/C# 阻止屏幕关闭,阻止系统进入睡眠状态
  • .Net程序猿乐Android发展---(10)框架布局FrameLayout