当前位置: 首页 > news >正文

RabbitMQ的Direct Exchange模式实现的消息发布案例

Producer生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducer {private final static String EXCHANGE_NAME = "direct_message_exchange";private final static String EXCHANGE_TYPE = "direct";public static void main(String[] args) {// 1. 创建连接工厂,设置连接参数ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672); // RabbitMQ默认端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");try (Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 2. 声明交换机 (direct类型,持久化)channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);// 3. 声明队列 (持久化,非独占,连接断开时不自动删除)channel.queueDeclare("queue5", true, false, false, null);channel.queueDeclare("queue6", true, false, false, null);channel.queueDeclare("queue7", true, false, false, null);// 4. 绑定队列到交换机,设置路由键channel.queueBind("queue5", EXCHANGE_NAME, "order");channel.queueBind("queue6", EXCHANGE_NAME, "order");channel.queueBind("queue7", EXCHANGE_NAME, "course");// 5. 准备要发送的消息String message = "你好,学相伴:www.kuangstudy.com";// 6. 向交换机发送消息,使用路由键 "course"channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8"));System.out.println("消息发送成功!");} catch (Exception ex) {// 捕获异常并打印堆栈信息ex.printStackTrace();System.out.println("消息发送出现异常...");} finally {// 在try-with-resources中,不再需要显式关闭连接和通道// 会自动关闭连接和通道}}
}

功能点:

  1. 声明了一个Direct类型的交换机,并绑定了三个队列(queue5queue6queue7)。其中queue5queue6都绑定到order路由键,而queue7绑定到course路由键。
  2. 发送了一条消息到course路由键绑定的队列中(即queue7)。

Consumer消费者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME = "queue7"; // 与生产者的绑定一致private final static String EXCHANGE_NAME = "direct_message_exchange";private final static String EXCHANGE_TYPE = "direct";public static void main(String[] args) {// 1. 创建连接工厂,设置连接参数ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672); // RabbitMQ默认端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");try (Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 2. 声明交换机和队列,与生产者保持一致channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 3. 绑定队列到交换机,路由键为"course"channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course");System.out.println(" [*] 等待接收消息...");// 4. 定义接收消息的回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] 接收到的消息: '" + message + "'");// 这里可以添加进一步的消息处理逻辑};// 5. 开始消费消息 (自动应答)channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });} catch (Exception ex) {// 捕获异常并打印堆栈信息ex.printStackTrace();System.out.println("消费者运行中出现异常...");}}
}

功能点: 

   1.  与生产者保持一致:消费者的队列名称、交换机名称和路由键与生产者保持一致,即监听queue7队列,并接收路由键为course的消息。

   2. 回调函数处理消息:使用DeliverCallback来定义收到消息后的处理逻辑。在回调函数中,delivery.getBody()获取消息内容,随后可以对消息进行处理、存储或其他业务逻辑操作。

   3 自动应答basicConsume中的true表示自动应答(auto-acknowledge),即消息处理完毕后,RabbitMQ会自动确认消息已成功处理。如果需要手动应答,可以将true替换为false,并在处理完成后调用channel.basicAck()来手动确认消息。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • GameGen-O:大模型颠覆3A游戏开发
  • chrome浏览器如何设置自动播放音视频
  • 动态代理相关知识点
  • 如何从github中克隆指定文件夹
  • 打包部署若依(RuoYi)SpringBoot后端和Vue前端图文教程
  • Gitlab 中几种不同的认证机制(Access Tokens,SSH Keys,Deploy Tokens,Deploy Keys)
  • 209.长度最小的子数组(滑动窗口类)
  • Proteus 仿真设计:开启电子工程创新之门
  • 基于深度学习的信号滤波:创新技术与应用挑战
  • 计算机专业毕业设计选题指南:避开这些坑,让你轻松毕业-附选题推荐(精选题目汇总大全)
  • AutoSar AP通信的事件订阅
  • 原生 JavaScript 封装 JSONP 跨域请求
  • 【系统架构设计师-2021年真题】案例分析-答案及详解
  • [羊城杯 2020]Blackcat1
  • Cross-Encoder实现文本匹配(重排序模型)
  • Git同步原始仓库到Fork仓库中
  • Linux下的乱码问题
  • mysql_config not found
  • Odoo domain写法及运用
  • scala基础语法(二)
  • Synchronized 关键字使用、底层原理、JDK1.6 之后的底层优化以及 和ReenTrantLock 的对比...
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • vue-cli3搭建项目
  • 百度小程序遇到的问题
  • 多线程 start 和 run 方法到底有什么区别?
  • 浅谈Golang中select的用法
  • 深度学习在携程攻略社区的应用
  • 问题之ssh中Host key verification failed的解决
  • 新手搭建网站的主要流程
  • 昨天1024程序员节,我故意写了个死循环~
  • #### golang中【堆】的使用及底层 ####
  • #70结构体案例1(导师,学生,成绩)
  • #pragma pack(1)
  • #Z2294. 打印树的直径
  • (+4)2.2UML建模图
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (二十三)Flask之高频面试点
  • (三) diretfbrc详解
  • (译)2019年前端性能优化清单 — 下篇
  • (原创) cocos2dx使用Curl连接网络(客户端)
  • *算法训练(leetcode)第四十天 | 647. 回文子串、516. 最长回文子序列
  • .NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布
  • .net core 控制台应用程序读取配置文件app.config
  • .NET IoC 容器(三)Autofac
  • .net 受管制代码
  • .NET框架类在ASP.NET中的使用(2) ——QA
  • .NET业务框架的构建
  • ?
  • ??如何把JavaScript脚本中的参数传到java代码段中
  • @CacheInvalidate(name = “xxx“, key = “#results.![a+b]“,multi = true)是什么意思
  • @manytomany 保存后数据被删除_[Windows] 数据恢复软件RStudio v8.14.179675 便携特别版...
  • [2010-8-30]
  • [20150707]外部表与rowid.txt
  • [AIGC] CompletableFuture的重要方法有哪些?
  • [AIGC] Nacos:一个简单 yet powerful 的配置中心和服务注册中心