当前位置: 首页 > news >正文

Java 如何实现一个简单 RabbitMQ 示例

本文建立在你在 Linux 上完成安装 RabbitMQ 的基础上。

1、生产者代码

顾名思义,生产者是用来生产消息供消费者消费

package com.wen.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;public class Product {public final static String QUEUE_NAME = "my-mq-queue";public final static String HOST = "10.106.182.54";public final static String USERNAME = "root";public final static String PASSWORD = "123456";public final static String EXCHANGE_NAME = "";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();HashMap<String, Object> map = new HashMap<>();map.put("x-message-ttl", 10000);channel.queueDeclare(QUEUE_NAME, false, false, true, map);String message = "Hello, How are you";channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println("消息已经发出!");channel.close();connection.close();} catch (IOException e) {throw new RuntimeException(e);}}
}

2、消费者代码

package com.wen.rabbitmq;import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;public class Consumer {public final static String QUEUE_NAME = "my-mq-queue";public final static String HOST = "10.106.182.54";public final static String USERNAME = "root";public final static String PASSWORD = "123456";public final static String EXCHANGE_NAME = "";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);Connection connection;try {connection = factory.newConnection();Channel channel = connection.createChannel();HashMap<String, Object> map = new HashMap<>();map.put("x-message-ttl", 10000);channel.queueDeclare(QUEUE_NAME, false, false ,true, map);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String s = new String(body, "UTF-8");System.out.println("消费端接收到消息:" + s);}};channel.basicConsume(QUEUE_NAME, true, consumer);} catch (Exception e) {e.printStackTrace();}}
}

3、示例解读

1、首先,我们先创建 QUEUE_NAME 队列为消息的发送做准备,RabbitMQ 的消息是需要存储在消息队列中的,消息队列是测试 QPS 的一个关键点。HOST USERNAME PASSWORD 等没啥好说的,我们要链接到自己的 RabbitMQ 服务上,使用 ConnectionFactory 进行连接。

2、消息生产出来后,需要一个通道(channel)去传输信息,这个通道会和交换器有交集,这里作为一个简单demo就不再多讲。

3、channel.queueDeclare(QUEUE_NAME, false, false ,true, map);

queueDeclare方法是用来创建一个队列,生产者和消费者都能使用这个方法进行创建队列,但如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须要先取消订阅。并且,消费者和生产者创建的队列里面的信息要一致。

  • 参数一:队列名字
  • 参数二:是否设置持久化
  • 参数三:是否设置排他
  • 参数四:是否设置自动删除
  • 参数五:设置队列中的一些参数,这是一个 Map,常见的有过期时间,优先级等。

注:如果生产者设置了过期时间,那么消费者创建的队列中也要有相应的参数。

4、channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());,这串代码是用来发送消息。

  • 参数一:交换器名字
  • 参数二:路由键名字,交换器会根据路由键将消息存储到相应的队列中。
  • 参数三:消息的基本属性集,内包含14个成员,可自行查找。
  • 参数四:消息体,是一个 byte[]

5、消费者这部分也要先建立一个消息队列 my-mq-queue ,和生产者保持一致。消费消息有两种模式,推模式和拉模式。本文示例采用的推模式,推模式使用 basicConsume 方法进行消费。接收消息通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。

6、DefaultConsumer 类中我们要重写 handleDelivery 方法,这个方法是用来处理消息传递的细节。handleDelivery 方法的参数如下:

  • String consumerTag:消费者标签
  • Envelope envelope:消息元数据
  • AMQP.BasicProperties properties:优先级,过期时间等属性
  • byte[] body:消息

7、使用 basicConsume 进行消费。

  • 参数一:队列名字
  • 参数二:是否设置自动确认机制
  • 参数三:消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 前端速通面经八股系列(六)—— Vue(下)
  • python的版本如何选择?
  • 【Python 报错已解决】`TypeError: ‘method‘ object is not subscriptable`
  • 如何有效防御区块链中的黑客攻击
  • Elasticsearch 8.13.4 LocalDateTime类型转换问题
  • OpenCV小练习:人脸检测
  • [Linux]如何將A主機的docker image轉移到B主機,並在B主機中重新配置和執行該docker image?
  • C++(this指针/常函数与常对象/拷贝构造函数/赋值函数/静态成员/静态成员函数/单列模式)
  • JAVA中的元注解
  • 【nvm】解决问题: Could not retrieve https://nodejs.org/dist/index.json.
  • 学习 TagUI 踩过的坑
  • 防抖函数 debounce debouncePromise
  • 少走弯路,ESP32 读取Micro SD(TF)播放mp3的坑路历程。
  • ET6框架(七)Excel配置工具
  • 【C++标准模版库】模拟实现容器适配器:stack、queue、priority_queue(优先级队列)
  • php的引用
  • 【刷算法】求1+2+3+...+n
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • docker容器内的网络抓包
  • Java到底能干嘛?
  • learning koa2.x
  • Linux各目录及每个目录的详细介绍
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • rc-form之最单纯情况
  • React组件设计模式(一)
  • SQLServer插入数据
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 程序员最讨厌的9句话,你可有补充?
  • 高度不固定时垂直居中
  • 基于axios的vue插件,让http请求更简单
  • 猫头鹰的深夜翻译:Java 2D Graphics, 简单的仿射变换
  • 深入浅出webpack学习(1)--核心概念
  • ​香农与信息论三大定律
  • #Datawhale X 李宏毅苹果书 AI夏令营#3.13.2局部极小值与鞍点批量和动量
  • #基础#使用Jupyter进行Notebook的转换 .ipynb文件导出为.md文件
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (C语言)二分查找 超详细
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (ibm)Java 语言的 XPath API
  • (LeetCode C++)盛最多水的容器
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (紀錄)[ASP.NET MVC][jQuery]-2 純手工打造屬於自己的 jQuery GridView (含完整程式碼下載)...
  • (一)u-boot-nand.bin的下载
  • (原)本想说脏话,奈何已放下
  • (转)重识new
  • *p=a是把a的值赋给p,p=a是把a的地址赋给p。
  • .NET MAUI Sqlite程序应用-数据库配置(一)
  • .net下简单快捷的数值高低位切换
  • .NET中两种OCR方式对比
  • .NET中统一的存储过程调用方法(收藏)
  • ::前边啥也没有
  • @DependsOn:解析 Spring 中的依赖关系之艺术