当前位置: 首页 > news >正文

RabbitMQ保证消息被成功发送和消费

一 : 在使用 RabbitMQ 作为消息队列时,保证消息被成功发送和消费是一个非常重要的问题。以下是一些关键点和最佳实践,以确保消息的可靠传输和处理。*
配置方式:

保证消息被成功发送
确认模式(Confirm Mode):生产者可以启用确认模式,确保消息成功到达交换机。
使用 channel.confirmSelect() 启用确认模式。
使用 channel.waitForConfirms() 或 channel.addConfirmListener() 来处理确认消息。
事务模式(Transaction Mode):生产者可以使用事务模式,确保消息成功到达队列。
使用 channel.txSelect() 开启事务,channel.txCommit() 提交事务,channel.txRollback() 回滚事务。
处理发送失败:实现重试机制,可以在发送失败时重试。
使用死信交换机(Dead Letter Exchange, DLX)来存储处理失败的消息。
保证消息被成功消费
手动确认(Manual Acknowledgment):消费者应该使用手动确认模式,确保消息被成功处理后再确认。
使用 channel.basicConsume(queue, false, consumer) 开启手动确认模式。
在消息处理成功后,调用 channel.basicAck(deliveryTag, false) 确认消息。
处理消费失败:实现消费失败的重试机制。
使用死信交换机(DLX)来存储处理失败的消息。
幂等性:确保消费者处理消息的幂等性,避免重复消费导致的问题。

二:通过记录消息到数据库中,采用定时任务轮询方式:

1 这是一个 Spring 组件,用于构建和发布返利消息事件。

//topic 字段从配置文件中获取,表示消息队列的 topic。
//buildEventMessage 方法用于构建 EventMessage 对象,包含随机生成的 ID、当前时间戳和数据。
//topic 方法返回消息队列的 topic。
//RebateMessage 是一个内部类,定义了返利消息的结构@Component
public class SendRebateMessageEvent extends BaseEvent<SendRebateMessageEvent.RebateMessage> {@Value("${spring.rabbitmq.topic.send_rebate}")private String topic;@Overridepublic EventMessage<RebateMessage> buildEventMessage(RebateMessage data) {return EventMessage.<SendRebateMessageEvent.RebateMessage>builder().id(RandomStringUtils.randomNumeric(11)).timestamp(new Date()).data(data).build();}@Overridepublic String topic() {return topic;}@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class RebateMessage {private String userId;private String rebateDesc;private String rebateType;private String rebateConfig;private String bizId;}
}@Data
public abstract class BaseEvent<T> {public abstract EventMessage<T> buildEventMessage(T data);public abstract String topic();@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class EventMessage<T> {private String id;private Date timestamp;private T data;}}

2 生产者示例

//topic 字段从配置文件中获取,表示消息队列的 topic。
//listener 方法使用 @RabbitListener 注解监听指定队列的消息。
//消息到达后,解析消息内容,根据 rebateType 字段的不同,调用相应的服务方法处理消息。
//异常处理机制确保了消息处理的健壮性。@Component
public class RebateMessageCustomer {@Value("${spring.rabbitmq.topic.send_rebate}")private String topic;@Resourceprivate IRaffleActivityAccountQuotaService raffleActivityAccountQuotaService;@Resourceprivate ICreditAdjustService creditAdjustService;@RabbitListener(queuesToDeclare = @Queue(value = "${spring.rabbitmq.topic.send_rebate}"))public void listener(String message) {try {log.info("监听用户行为返利消息 topic: {} message: {}", topic, message);BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> eventMessage = JSON.parseObject(message, new TypeReference<BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage>>() {}.getType());SendRebateMessageEvent.RebateMessage rebateMessage = eventMessage.getData();switch (rebateMessage.getRebateType()) {case "sku":SkuRechargeEntity skuRechargeEntity = new SkuRechargeEntity();skuRechargeEntity.setUserId(rebateMessage.getUserId());skuRechargeEntity.setSku(Long.valueOf(rebateMessage.getRebateConfig()));skuRechargeEntity.setOutBusinessNo(rebateMessage.getBizId());skuRechargeEntity.setOrderTradeType(OrderTradeTypeVO.rebate_no_pay_trade);raffleActivityAccountQuotaService.createOrder(skuRechargeEntity);break;case "integral":TradeEntity tradeEntity = new TradeEntity();tradeEntity.setUserId(rebateMessage.getUserId());tradeEntity.setTradeName(TradeNameVO.REBATE);tradeEntity.setTradeType(TradeTypeVO.FORWARD);tradeEntity.setAmount(new BigDecimal(rebateMessage.getRebateConfig()));tradeEntity.setOutBusinessNo(rebateMessage.getBizId());creditAdjustService.createOrder(tradeEntity);break;}} catch (AppException e) {if (ResponseCode.INDEX_DUP.getCode().equals(e.getCode())) {log.warn("监听用户行为返利消息,消费重复 topic: {} message: {}", topic, message, e);return;}throw e;} catch (Exception e) {log.error("监听用户行为返利消息,消费失败 topic: {} message: {}", topic, message, e);throw e;}}
}

3 消费者示例


//这个方法用于保存用户返利记录,并在事务中插入用户行为返利订单和任务对象。
//在事务外,同步发送 MQ 消息。
//发送消息时,调用 eventPublisher.publish 方法发布消息到指定的 topic。public void saveUserRebateRecord(String userId, List<BehaviorRebateAggregate> behaviorRebateAggregates) {try {dbRouter.doRouter(userId);transactionTemplate.execute(status -> {try {for (BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates) {BehaviorRebateOrderEntity behaviorRebateOrderEntity = behaviorRebateAggregate.getBehaviorRebateOrderEntity();UserBehaviorRebateOrder userBehaviorRebateOrder = new UserBehaviorRebateOrder();userBehaviorRebateOrder.setUserId(behaviorRebateOrderEntity.getUserId());userBehaviorRebateOrder.setOrderId(behaviorRebateOrderEntity.getOrderId());userBehaviorRebateOrder.setBehaviorType(behaviorRebateOrderEntity.getBehaviorType());userBehaviorRebateOrder.setRebateDesc(behaviorRebateOrderEntity.getRebateDesc());userBehaviorRebateOrder.setRebateType(behaviorRebateOrderEntity.getRebateType());userBehaviorRebateOrder.setRebateConfig(behaviorRebateOrderEntity.getRebateConfig());userBehaviorRebateOrder.setOutBusinessNo(behaviorRebateOrderEntity.getOutBusinessNo());userBehaviorRebateOrder.setBizId(behaviorRebateOrderEntity.getBizId());userBehaviorRebateOrderDao.insert(userBehaviorRebateOrder);TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task = new Task();task.setUserId(taskEntity.getUserId());task.setTopic(taskEntity.getTopic());task.setMessageId(taskEntity.getMessageId());task.setMessage(JSON.toJSONString(taskEntity.getMessage()));task.setState(taskEntity.getState().getCode());taskDao.insert(task);}return 1;} catch (DuplicateKeyException e) {status.setRollbackOnly();log.error("写入返利记录,唯一索引冲突 userId: {}", userId, e);throw new AppException(ResponseCode.INDEX_DUP.getCode(), ResponseCode.INDEX_DUP.getInfo());}});} finally {dbRouter.clear();}for (BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates) {TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task = new Task();task.setUserId(taskEntity.getUserId());task.setMessageId(taskEntity.getMessageId());try {eventPublisher.publish(taskEntity.getTopic(), taskEntity.getMessage());taskDao.updateTaskSendMessageCompleted(task);} catch (Exception e) {log.error("写入返利记录,发送MQ消息失败 userId: {} topic: {}", userId, task.getTopic());taskDao.updateTaskSendMessageFail(task);}}
}public List<String> createOrder(BehaviorEntity behaviorEntity) {// 1. 查询返利配置List<DailyBehaviorRebateVO> dailyBehaviorRebateVOS = behaviorRebateRepository.queryDailyBehaviorRebateConfig(behaviorEntity.getBehaviorTypeVO());if (null == dailyBehaviorRebateVOS || dailyBehaviorRebateVOS.isEmpty()) return new ArrayList<>();// 2. 构建聚合对象List<String> orderIds = new ArrayList<>();List<BehaviorRebateAggregate> behaviorRebateAggregates = new ArrayList<>();for (DailyBehaviorRebateVO dailyBehaviorRebateVO : dailyBehaviorRebateVOS) {// 拼装业务ID;用户ID_返利类型_外部透彻业务IDString bizId = behaviorEntity.getUserId() + Constants.UNDERLINE + dailyBehaviorRebateVO.getRebateType() + Constants.UNDERLINE + behaviorEntity.getOutBusinessNo();BehaviorRebateOrderEntity behaviorRebateOrderEntity = BehaviorRebateOrderEntity.builder().userId(behaviorEntity.getUserId()).orderId(RandomStringUtils.randomNumeric(12)).behaviorType(dailyBehaviorRebateVO.getBehaviorType()).rebateDesc(dailyBehaviorRebateVO.getRebateDesc()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).outBusinessNo(behaviorEntity.getOutBusinessNo()).bizId(bizId).build();orderIds.add(behaviorRebateOrderEntity.getOrderId());// MQ 消息对象SendRebateMessageEvent.RebateMessage rebateMessage = SendRebateMessageEvent.RebateMessage.builder().userId(behaviorEntity.getUserId()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).bizId(bizId).build();// 构建事件消息BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> rebateMessageEventMessage = sendRebateMessageEvent.buildEventMessage(rebateMessage);// 组装任务对象TaskEntity taskEntity = new TaskEntity();taskEntity.setUserId(behaviorEntity.getUserId());taskEntity.setTopic(sendRebateMessageEvent.topic());taskEntity.setMessageId(rebateMessageEventMessage.getId());taskEntity.setMessage(rebateMessageEventMessage);taskEntity.setState(TaskStateVO.create);BehaviorRebateAggregate behaviorRebateAggregate = BehaviorRebateAggregate.builder().userId(behaviorEntity.getUserId()).behaviorRebateOrderEntity(behaviorRebateOrderEntity).taskEntity(taskEntity).build();behaviorRebateAggregates.add(behaviorRebateAggregate);}// 3. 存储聚合对象数据behaviorRebateRepository.saveUserRebateRecord(behaviorEntity.getUserId(), behaviorRebateAggregates);// 4. 返回订单ID集合return orderIds;}@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {/** 活动ID */private String userId;/** 消息主题 */private String topic;/** 消息编号 */private String messageId;/** 消息主体 */private BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> message;/** 任务状态;create-创建、completed-完成、fail-失败 */private TaskStateVO state;}@Data
public class Task {/** 自增ID */private String id;/** 活动ID */private String userId;/** 消息主题 */private String topic;/** 消息编号 */private String messageId;/** 消息主体 */private String message;/** 任务状态;create-创建、completed-完成、fail-失败 */private String state;/** 创建时间 */private Date createTime;/** 更新时间 */private Date updateTime;}

4 定时任务示例

// @Scheduled(cron = "0/5 * * * * ?")
public void exec_db01() {try {// 设置库表dbRouter.setDBKey(1);dbRouter.setTBKey(0);// 查询未发送的任务List<TaskEntity> taskEntities = taskService.queryNoSendMessageTaskList();if (taskEntities.isEmpty()) return;// 发送MQ消息for (TaskEntity taskEntity : taskEntities) {try {taskService.sendMessage(taskEntity);taskService.updateTaskSendMessageCompleted(taskEntity.getUserId(), taskEntity.getMessageId());} catch (Exception e) {log.error("定时任务,发送MQ消息失败 userId: {} topic: {}", taskEntity.getUserId(), taskEntity.getTopic());taskService.updateTaskSendMessageFail(taskEntity.getUserId(), taskEntity.getMessageId());}}} catch (Exception e) {log.error("定时任务,扫描MQ任务表发送消息失败。", e);} finally {dbRouter.clear();}
}@Data
public class TaskEntity {/** 活动ID */private String userId;/** 消息主题 */private String topic;/** 消息编号 */private String messageId;/** 消息主体 */private String message;}

三: 串联流程

生产消息:saveUserRebateRecord 方法在事务中插入用户行为返利订单和任
务对象。
在事务外,调用 eventPublisher.publish 方法发布消息到指定的 
topic。
消息构建和发布:SendRebateMessageEvent 类构建 EventMessage 对象,包含随机
生成的 ID、当前时间戳和数据,并返回消息队列的 topic。
消费消息:RebateMessageCustomer 类监听指定队列的消息,解析消息内容,
根据 rebateType 字段的不同,调用相应的服务方法处理消息。
定时任务补偿:SendMessageTaskJob 类定时扫描数据库中的任务表,发送未发送的
消息到 MQ 队列,并更新任务状态。如果发送失败,记录错误日志并
更新任务状态为发送失败。

四:配置文件

spring:rabbitmq:addresses: ****port: ***username: **password: **listener:simple:prefetch: 1 # 每次投递n个消息,消费完在投递n个topic:send_rebate: send_rebate

五: 消费失败

消息发送:
生产者在发送消息时,会将消息的相关信息(如消息内容、发送状态等)记录到 task 表中。
如果消息发送成功,则更新 task 表中的状态为“已发送”。
如果消息发送失败,则更新 task 表中的状态为“发送失败”。
定时任务会扫描 task 表,查找状态为“发送失败”的消息,并重试发送。消息消费:
消费者在处理消息时,也会将消息的相关信息(如消息内容、处理状态等)记录到 task 表中。
如果消息处理成功,则更新 task 表中的状态为“已处理”。
如果消息处理失败,则更新 task 表中的状态为“处理失败”。
定时任务会扫描 task 表,查找状态为“处理失败”的消息,并重试处理。
通过这种方式,可以确保即使消息在发送或消费过程中出现失败,也能够通过重试机制最终成功发送或处理。

示例:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate TaskRepository taskRepository;@Transactionalpublic void sendMessage(String message) {try {rabbitTemplate.convertAndSend("send_rebate", message);TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("SENT");taskRepository.save(task);} catch (Exception e) {TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("SEND_FAILED");taskRepository.save(task);}}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer {@Autowiredprivate TaskRepository taskRepository;@RabbitListener(queues = "${spring.rabbitmq.topic.send_rebate}")public void handleMessage(String message) {try {// 处理消息的逻辑System.out.println("Processing message: " + message);// 假设处理成功TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("PROCESSED");taskRepository.save(task);} catch (Exception e) {// 处理失败的逻辑TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("PROCESS_FAILED");taskRepository.save(task);}}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class RetryTaskJob {@Autowiredprivate TaskRepository taskRepository;@Autowiredprivate MessageService messageService;@Autowiredprivate MessageConsumer messageConsumer;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void retryFailedMessages() {// 重试发送失败的消息List<TaskEntity> sendFailedTasks = taskRepository.findByStatus("SEND_FAILED");for (TaskEntity task : sendFailedTasks) {try {messageService.sendMessage(task.getMessage());task.setStatus("SENT");taskRepository.save(task);} catch (Exception e) {// 记录日志或进行其他处理}}// 重试处理失败的消息List<TaskEntity> processFailedTasks = taskRepository.findByStatus("PROCESS_FAILED");for (TaskEntity task : processFailedTasks) {try {messageConsumer.handleMessage(task.getMessage());task.setStatus("PROCESSED");taskRepository.save(task);} catch (Exception e) {// 记录日志或进行其他处理}}}
}

六: 防止重复消费

保证消息消费的幂等性是确保消息系统可靠性的重要一环。幂等性意
味着无论消息被处理一次还是多次,结果都是相同的。以下是一些常见的策略来保证消息消费的幂等性:唯一标识符:为每条消息生成一个唯一的标识符(如 UUID),并在
处理消息时检查该标识符是否已经被处理过。如果已经处理过,则忽略该消息。状态检查:在处理消息之前,检查系统的状态,确保该消息对应的操
作尚未执行。例如,如果消息是要更新某个资源,可以先检查该资源的状态,确保更新操作尚未执行。数据库唯一约束:在数据库中为消息处理结果创建唯一约束,确保相
同的消息不会被重复处理。幂等API设计:设计幂等的API,确保相同的请求多次执行不会产生不
同的结果。例如,使用“PUT”方法更新资源,而不是“POST”方法。

使用 Redis 来实现消息消费的幂等性是一个非常有效的方法。Redis 是一个高性能的内存数据库,适合用于存储临时状态信息。
1 消费者处理消息并记录到 Redis

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
public class MessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "${spring.rabbitmq.topic.send_rebate}")public void handleMessage(String message) {// 假设消息中包含唯一标识符String messageId = extractMessageId(message);// 检查消息是否已经处理过或正在处理if (redisTemplate.hasKey(messageId)) {System.out.println("Message already processed or being processed: " + messageId);return;}// 将消息ID存入Redis,设置过期时间redisTemplate.opsForValue().set(messageId, "PROCESSING", 60, TimeUnit.SECONDS);try {// 处理消息的逻辑System.out.println("Processing message: " + message);// 假设处理成功redisTemplate.opsForValue().set(messageId, "PROCESSED", 60, TimeUnit.SECONDS);} catch (Exception e) {// 处理失败的逻辑redisTemplate.delete(messageId); // 删除Redis中的记录,以便可以重试}}private String extractMessageId(String message) {// 假设消息中包含唯一标识符,例如 JSON 格式中的 "id" 字段// 这里只是一个示例,实际实现可能需要解析消息内容return message.substring(0, 36); // 假设 UUID 长度为 36}
}
spring:redis:host: localhostport: 6379
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

通过上述方法和配置,您可以确保消息消费的幂等性。消费者在处理消息之前会检查消息的唯一标识符是否已经存在于 Redis 中,如果存在,则忽略该消息,从而避免重复处理。同时,通过设置过期时间,可以确保在处理过程中出现异常时,Redis 中的记录会被删除,从而允许消息重试。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 网络安全筑基篇——XSS、XML、XXE
  • 全新TTT架构:挑战Transformer和Mamba的霸主地位
  • 从Helm到 Operator:Kubernetes应用管理的进化
  • 二叉苹果树
  • 如何用Vue3和Plotly.js创建交互式表格?
  • 【亲测有效】Linux/Ubuntu远程服务器使用plt.show()没有反应,vscode ssh 远程ubuntu,plt.show不显示图片问题
  • 泰勒公式中拉格朗日余项和佩亚诺余项的区别及具体的应用场景案例
  • Vue3 根据相对路径加载vue组件
  • PostgreSQL 中如何处理数据的并发更新冲突解决?
  • Git 操作总结
  • 移动应用:商城购物类,是最常见的,想出彩或许就差灵犀一指
  • 插入排序算法(C语言版)
  • HTML5使用<progress>进度条、<meter>刻度条
  • 如何用 Python 绕过 cloudflare(5秒盾) 抓取数据:也不是很难嘛!
  • 红日靶场----(三)漏洞利用
  • 【附node操作实例】redis简明入门系列—字符串类型
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • ES2017异步函数现已正式可用
  • javascript面向对象之创建对象
  • js算法-归并排序(merge_sort)
  • linux学习笔记
  • maven工程打包jar以及java jar命令的classpath使用
  • scala基础语法(二)
  • supervisor 永不挂掉的进程 安装以及使用
  • windows下如何用phpstorm同步测试服务器
  • 浮动相关
  • 工作手记之html2canvas使用概述
  • 记录一下第一次使用npm
  • 看域名解析域名安全对SEO的影响
  • 那些被忽略的 JavaScript 数组方法细节
  • 设计模式走一遍---观察者模式
  • 腾讯视频格式如何转换成mp4 将下载的qlv文件转换成mp4的方法
  • 小程序 setData 学问多
  • TPG领衔财团投资轻奢珠宝品牌APM Monaco
  • 大数据全解:定义、价值及挑战
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • ​io --- 处理流的核心工具​
  • ()、[]、{}、(())、[[]]命令替换
  • (1)(1.19) TeraRanger One/EVO测距仪
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (Redis使用系列) Springboot 使用redis实现接口幂等性拦截 十一
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (二)Linux——Linux常用指令
  • (二刷)代码随想录第15天|层序遍历 226.翻转二叉树 101.对称二叉树2
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (附源码)springboot优课在线教学系统 毕业设计 081251
  • (三)SvelteKit教程:layout 文件
  • (十)Flink Table API 和 SQL 基本概念
  • (十三)MipMap
  • (转)母版页和相对路径
  • .apk 成为历史!
  • .bat批处理(一):@echo off
  • .NET 4.0中使用内存映射文件实现进程通讯
  • .NET Core 中插件式开发实现
  • .net web项目 调用webService