当前位置: 首页 > news >正文

RabbitMQ消息的可靠传输和防止消息丢失

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "myQueue";public static final String EXCHANGE_NAME = "myExchange";public static final String ROUTING_KEY = "myRoutingKey";@Beanpublic Queue myQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic DirectExchange myExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding myBinding(Queue myQueue, DirectExchange myExchange) {return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);}
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 设置发布确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("Message delivered successfully: " + correlationData);} else {System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);}}});// 设置消息返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("Returned Message: " + new String(message.getBody()) +", replyCode: " + replyCode + ", replyText: " + replyText +", exchange: " + exchange + ", routingKey: " + routingKey);});}public void sendMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);}
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void handleMessage(String message, Channel channel, Message message) throws Exception {try {// 处理消息System.out.println("Received Message: " + message);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 消费失败,重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:cloud:stream:bindings:input:destination: myQueueconsumer:retry:max-attempts: 5backOffPolicy:initialInterval: 1000multiplier: 2.0maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。

相关文章:

  • vscode 调试
  • Redis之线程IO模型
  • Redisson实现分布式锁
  • C语言 -- 宏的变长参数定义
  • kotlin lambda 表达式的原理、语法和详细用法
  • 专业纸箱厂:品质之选
  • JS 实现Date日期格式的本地化
  • Halcon 多相机统一坐标系
  • 2024年6月四六级考试复盘
  • 【Python】PySide6使用入门和注意事项
  • springboot整合sentinel接口熔断
  • 在线兴趣教学类线上学习APP应用开发部署程序组建研发团队需要准备什么?
  • js如何添加新元素到数组中
  • pytest中失败用例重跑
  • A5M2报错【列 pd.adsrc 不存在】
  • [数据结构]链表的实现在PHP中
  • “Material Design”设计规范在 ComponentOne For WinForm 的全新尝试!
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • Create React App 使用
  • LintCode 31. partitionArray 数组划分
  • python docx文档转html页面
  • -- 查询加强-- 使用如何where子句进行筛选,% _ like的使用
  • 后端_MYSQL
  • 通过几道题目学习二叉搜索树
  • 小程序开发之路(一)
  • 在weex里面使用chart图表
  • 字符串匹配基础上
  • raise 与 raise ... from 的区别
  • 不要一棍子打翻所有黑盒模型,其实可以让它们发挥作用 ...
  • 组复制官方翻译九、Group Replication Technical Details
  • ​queue --- 一个同步的队列类​
  • ​批处理文件中的errorlevel用法
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • # SpringBoot 如何让指定的Bean先加载
  • #if 1...#endif
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • $GOPATH/go.mod exists but should not goland
  • (2)STM32单片机上位机
  • (4)事件处理——(7)简单事件(Simple events)
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (四) 虚拟摄像头vivi体验
  • (原創) X61用戶,小心你的上蓋!! (NB) (ThinkPad) (X61)
  • (转)linux自定义开机启动服务和chkconfig使用方法
  • (转)socket Aio demo
  • (转)大道至简,职场上做人做事做管理
  • ******之网络***——物理***
  • **python多态
  • .bat批处理(二):%0 %1——给批处理脚本传递参数
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .Net core 6.0 升8.0
  • .net core 微服务_.NET Core 3.0中用 Code-First 方式创建 gRPC 服务与客户端
  • .NET MVC之AOP
  • .NET 动态调用WebService + WSE + UsernameToken
  • .NET/C# 编译期能确定的字符串会在字符串暂存池中不会被 GC 垃圾回收掉
  • .net流程开发平台的一些难点(1)