当前位置: 首页 > news >正文

RabbitMQ 消息队列:生产者与消费者实现详解

在分布式系统中,消息队列(Message Queue, MQ)是一种重要的组件,用于解耦系统、异步处理任务以及实现系统间的通信。RabbitMQ 是一个流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在本文中,我们将通过 Java 示例来演示如何使用 RabbitMQ 的生产者和消费者模型。

1. 环境准备

首先,确保你已经安装了 RabbitMQ 服务器,并且 Java 环境已经配置妥当。你还需要在项目中引入 RabbitMQ 的 Java 客户端库。通常,这可以通过 Maven 或 Gradle 等构建工具来实现。

2. 生产者(Producer)

生产者负责发送消息到队列。以下是生产者的 Java 实现代码及其注释:

package com.qcby.rabbitmq.mq1;  import com.qcby.rabbitmq.connection.RabbitMQConnection; // 自定义的连接管理类  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  // 生产者类  
public class Producer {  private static final String QUEUE_NAME = "boyatopMember"; // 队列名称  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 创建连接  // 通过自定义的 RabbitMQConnection 类获取连接  Connection connection = RabbitMQConnection.getConnection("/boyavirtualHosts"); // 假设这是连接到特定虚拟主机的连接  // 2. 设置通道  // 通道是大多数 RabbitMQ API 调用的入口点  Channel channel = connection.createChannel();  // 3. 设置消息  String msg = "hello world"; // 待发送的消息  System.out.println("msg:" + msg); // 在控制台输出消息内容  // 4. 发送消息到队列  // 第一个参数是交换机名称,这里使用空字符串表示默认交换机(direct类型)  // 第二个参数是队列名称  // 第三个参数是消息的其他属性,这里传null表示默认  // 第四个参数是消息体,需要是字节数组形式  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());  // 5. 关闭通道和连接  channel.close();  connection.close();  }  
}
3. 消费者(Consumer)

消费者负责从队列中接收并处理消息。以下是消费者的 Java 实现代码及其注释:

package com.qcby.rabbitmq.mq1;  import com.qcby.rabbitmq.connection.RabbitMQConnection; // 自定义的连接管理类  
import com.rabbitmq.client.*;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  // 消费者类  
public class Consumer {  private static final String QUEUE_NAME = "boyatopMember"; // 队列名称  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 创建连接  Connection connection = RabbitMQConnection.getConnection("/boyavirtualHosts"); // 连接到特定虚拟主机  // 2. 设置通道  Channel channel = connection.createChannel();  // 3. 定义消费者  // 使用 DefaultConsumer 类并覆盖 handleDelivery 方法来处理接收到的消息  DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String msg = new String(body, "UTF-8"); // 将字节数组转换为字符串  System.out.println("消费者获取消息:" + msg); // 在控制台输出消息内容  }  };  // 4. 监听队列  // 第一个参数是队列名称  // 第二个参数是是否自动确认消息,true表示自动确认,false表示需要手动确认  // 第三个参数是消费者实例  channel.basicConsume(QUEUE_NAME, true, defaultConsumer);  // 注意:这里的代码实际上会阻塞等待消息的到来。  // 在实际应用中,你可能需要保持这个消费者程序持续运行,直到你显式地停止它。  }  
}

4. 自定义连接(connection)
 

package com.qcby.rabbitmq.connection;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RabbitMQConnection {/*** 获取连接** @return*/public static Connection getConnection(String virtualHost) throws IOException, TimeoutException, TimeoutException {// 1.创建连接ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置连接地址connectionFactory.setHost("127.0.0.1");// 3.设置端口号:connectionFactory.setPort(5672);// 4.设置账号和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 5.设置VirtualHostconnectionFactory.setVirtualHost(virtualHost);return connectionFactory.newConnection();}
}
5. 运行与测试
  1. 首先,启动 RabbitMQ 服务器。
  2. 运行 Producer 类的 main 方法发送消息。
  3. 运行 Consumer 类的 main 方法接收并处理消息。

如果一切设置正确,你将在 Consumer 的控制台输出中看到 "消费者获取消息:hello world" 的信息,表明消息已成功从生产者发送到消费者。

6. 结论

通过本文,我们学习了如何在 Java 中使用 RabbitMQ 实现基本的生产者和消费者模型。这仅仅是 RabbitMQ 强大功能的冰山一角,RabbitMQ 还支持多种交换机类型、消息确认机制、消息持久化等高级特性,可以帮助你构建更加健壮和灵活的分布式系统。

相关文章:

  • how to remove the text shadow under app icon on Windows
  • Java 编码系列:反射详解与面试题解析
  • 更新系统提示“系统备份失败”
  • 工厂模式与建造者模式的区别
  • 【js】Node.js的fs的使用方法
  • Spring源码学习:SpringMVC(3)mvcannotation-driven标签解析【RequestMappingHandlerMapping生成】
  • 技术成神之路:设计模式(十六)代理模式
  • Python库matplotlib之五
  • 【RabbitMq源码阅读】分析RabbitMq发送消息源码
  • Robot Operating System——一组三维空间中的位姿(位置和方向)
  • Flink集群部署
  • kafka下载配置
  • Go 1.19.4 序列化和反序列化-Day 16
  • 速盾:视频开cdn合适还是视频点播合适?
  • 大模型智能体在金融公告理解领域的应用 | OPENAIGC开发者大赛高校组AI创新之星奖
  • 【5+】跨webview多页面 触发事件(二)
  • 2017-08-04 前端日报
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • Centos6.8 使用rpm安装mysql5.7
  • C语言笔记(第一章:C语言编程)
  • JavaScript设计模式与开发实践系列之策略模式
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • Less 日常用法
  • Netty 4.1 源代码学习:线程模型
  • node和express搭建代理服务器(源码)
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • React 快速上手 - 06 容器组件、展示组件、操作组件
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 从输入URL到页面加载发生了什么
  • 翻译--Thinking in React
  • 关于springcloud Gateway中的限流
  • 批量截取pdf文件
  • 数据结构java版之冒泡排序及优化
  • 一个JAVA程序员成长之路分享
  • 移动端唤起键盘时取消position:fixed定位
  • Spring第一个helloWorld
  • 如何在 Intellij IDEA 更高效地将应用部署到容器服务 Kubernetes ...
  • ‌分布式计算技术与复杂算法优化:‌现代数据处理的基石
  • #Linux(权限管理)
  • (2024)docker-compose实战 (8)部署LAMP项目(最终版)
  • (SpringBoot)第七章:SpringBoot日志文件
  • (动手学习深度学习)第13章 计算机视觉---微调
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (十七)、Mac 安装k8s
  • (一)Thymeleaf用法——Thymeleaf简介
  • (源码分析)springsecurity认证授权
  • (转) RFS+AutoItLibrary测试web对话框
  • (转)德国人的记事本
  • (转)甲方乙方——赵民谈找工作
  • .Net Remoting常用部署结构
  • .NET Standard / dotnet-core / net472 —— .NET 究竟应该如何大小写?
  • .net 反编译_.net反编译的相关问题
  • .NET 反射的使用
  • .Net 垃圾回收机制原理(二)
  • .NET 项目中发送电子邮件异步处理和错误机制的解决方案