当前位置: 首页 > news >正文

Springboot整合rabbitmq并实现消息可靠性和持久性

Springboot整合rabbitmq并实现消息可靠性和持久性

  • 1. 环境准备
  • 2. 创建 Spring Boot 项目
  • 3. 配置 RabbitMQ
  • 4. 实现消息的可靠性和持久性
    • 4.1 发送可靠的消息
    • 4.2 接收可靠的消息
  • 5. 运行应用程序
  • 6. 总结

Spring Boot 是一种快速构建应用程序的框架,而 RabbitMQ 是一种消息代理,它提供了可靠的消息传递服务。将 Spring Boot 与 RabbitMQ 结合使用,可以方便地实现消息传递,同时确保消息的可靠性和持久性。

在这里插入图片描述

本文将演示如何使用 Spring Boot 整合 RabbitMQ,并实现消息的可靠性和持久性。

1. 环境准备

在开始之前,您需要安装以下软件:

JDK 1.8 或更高版本
Maven 3.0 或更高版本
RabbitMQ 3.5.0 或更高版本

2. 创建 Spring Boot 项目

首先,我们需要创建一个 Spring Boot 项目。您可以使用 Spring Initializr 快速创建项目。

创建项目时,请确保选择以下依赖项:

Spring Boot DevTools
Spring Web
Spring AMQP

这些依赖项将帮助我们创建一个 Spring Boot 应用程序,并使用 Spring AMQP 库连接到 RabbitMQ。

3. 配置 RabbitMQ

在连接到 RabbitMQ 之前,我们需要配置 RabbitMQ。您可以使用以下命令安装 RabbitMQ:

brew install rabbitmq

安装完成后,启动 RabbitMQ 服务:

brew services start rabbitmq

然后,打开浏览器,访问 http://localhost:15672,您将看到 RabbitMQ 的管理控制台。在这里,您可以创建队列,交换机等。

现在,我们需要创建一个队列和交换机。我们将使用 rabbitTemplate 来发送消息和接收消息,所以我们需要确保队列和交换机已经创建。

您可以使用以下代码创建队列和交换机:

@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "my-queue";
    public static final String EXCHANGE_NAME = "my-exchange";
    public static final String ROUTING_KEY = "my-routing-key";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

这将创建一个名为 my-queue 的队列和一个名为 my-exchange 的交换机,并将它们绑定在一起,使用 my-routing-key 作为路由键。

4. 实现消息的可靠性和持久性

现在,我们已经设置好了 RabbitMQ,并且有了一个队列和一个交换机,我们可以开始实现消息的可靠性和持久性了。

4.1 发送可靠的消息

要发送可靠的消息,我们需要确保消息已经成功到达 RabbitMQ,并且已经被持久化。为此,我们可以使用以下代码:

@Service
public class MessageSender {
    private final RabbitTemplate rabbitTemplate;
	public void sendMessage(String message) {
	    rabbitTemplate.setMandatory(true);
	    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
	        if (!ack) {
	            // 处理消息发送失败的情况
	        }
	    });
		// 构建消息体
	    Message msg = MessageBuilder.withBody(message.getBytes())
	            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
	            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
	            .build();
		// 发送MQ消息
	    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
	}
}

在这里,我们使用 rabbitTemplate 来发送消息,并启用了 mandatory 选项,这将确保消息被成功发送到 RabbitMQ。

我们还设置了 confirmCallback 回调函数,用于处理消息发送失败的情况。如果消息无法被正确发送,我们可以在这个回调函数中执行处理逻辑。

此外,我们还设置了消息的传输模式为持久模式,这将确保消息在 RabbitMQ 挂掉时不会丢失。

4.2 接收可靠的消息

为了接收可靠的消息,我们需要在 RabbitMQ 中创建一个消费者,并启用手动确认模式。手动确认模式将允许我们在处理消息之后,手动向 RabbitMQ 确认该消息已被处理。

以下是一个简单的消费者示例:

@Service
public class MessageReceiver {
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + msg);

        try {
            // 处理消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理消息处理失败的情况
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

在这里,我们使用 @RabbitListener 注解来监听队列中的消息。当有消息到达队列时,handleMessage() 方法将被调用。

在处理消息之后,我们使用 channel.basicAck() 方法来确认消息已经被处理。如果消息处理失败,我们可以使用 channel.basicNack() 方法来通知 RabbitMQ,让它重新将该消息发送到队列中。

5. 运行应用程序

现在,我们已经准备好了 RabbitMQ 的配置,并且实现了可靠的消息传递。接下来,我们需要运行应用程序,并测试消息的传递是否成功。

运行应用程序的最简单方法是使用 Maven。您可以使用以下命令来运行应用程序:

mvn spring-boot:run

一旦应用程序运行起来,您可以使用 MessageSender 来发送一些消息,然后使用 MessageReceiver 来接收这些消息。

6. 总结

在本文中,我们演示了如何使用 Spring Boot 整合 RabbitMQ,并实现消息的可靠性和持久性。通过使用 RabbitMQ,我们可以轻松地实现应用程序之间的异步通信,从而提高应用程序的性能和可靠性。

要实现可靠的消息传递,我们需要确保消息发送成功,并且在消息处理失败时能够重新处理消息。为此,我们使用了 RabbitMQ 的确认模式和手动确认模式来实现可靠的消息传递。

另外,我们还使用了 RabbitMQ 的持久化机制来确保消息在 RabbitMQ 挂掉时不会丢失。通过这些技术,我们可以实现高效、可靠、持久化的消息传递。

最后,我们还演示了如何使用 Maven 来运行应用程序,并测试消息传递是否成功。如果您想深入了解 RabbitMQ 和 Spring Boot 的更多内容,建议参考官方文档和教程,深入学习这些技术的应用和实现原理。

相关文章:

  • ChatGPT可以作为一个翻译器吗?
  • 一文学会 Spring MVC 表单标签
  • 【联邦学习(Federated Learning)】- 横向联邦学习与联邦平均FedAvg
  • 免费一键生成原创文章-原创文章批量生成
  • 众人围剿,GPT-5招惹了谁
  • Spring Boot 3.0系列【19】核心特性篇之自定义Starter启动器
  • oracle中sql 正则怎么写?
  • 【5G RRC】NR测量Gap介绍
  • 【T+】登录畅捷通T+软件后提示同一个浏览器中不允许存在用户XX同时在线。
  • pom文件详解
  • JVM 类加载器子系统
  • 半小时内实现Esp32-Cam模型训练和图像识别
  • 关于一个大学生写一个题目写一天
  • 【C#进阶】C# 多线程
  • mlq移动最小二乘方法
  • 4个实用的微服务测试策略
  • CentOS7简单部署NFS
  • iBatis和MyBatis在使用ResultMap对应关系时的区别
  • idea + plantuml 画流程图
  • Laravel 实践之路: 数据库迁移与数据填充
  • LeetCode18.四数之和 JavaScript
  • Less 日常用法
  • SAP云平台里Global Account和Sub Account的关系
  • spark本地环境的搭建到运行第一个spark程序
  • Terraform入门 - 3. 变更基础设施
  • 阿里云容器服务区块链解决方案全新升级 支持Hyperledger Fabric v1.1
  • 基于Vue2全家桶的移动端AppDEMO实现
  • 区块链共识机制优缺点对比都是什么
  • 容器化应用: 在阿里云搭建多节点 Openshift 集群
  • 山寨一个 Promise
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 自制字幕遮挡器
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • # 执行时间 统计mysql_一文说尽 MySQL 优化原理
  • #微信小程序:微信小程序常见的配置传旨
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (翻译)Quartz官方教程——第一课:Quartz入门
  • (附源码)ssm智慧社区管理系统 毕业设计 101635
  • (简单有案例)前端实现主题切换、动态换肤的两种简单方式
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (五)c52学习之旅-静态数码管
  • (终章)[图像识别]13.OpenCV案例 自定义训练集分类器物体检测
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • (转)详解PHP处理密码的几种方式
  • ****** 二十三 ******、软设笔记【数据库】-数据操作-常用关系操作、关系运算
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • .jks文件(JAVA KeyStore)
  • .NET 动态调用WebService + WSE + UsernameToken
  • .net 简单实现MD5
  • .net2005怎么读string形的xml,不是xml文件。
  • .net生成的类,跨工程调用显示注释
  • .NET使用存储过程实现对数据库的增删改查
  • .NET值类型变量“活”在哪?