当前位置: 首页 > news >正文

【RabbitMq源码阅读】分析RabbitMq发送消息源码

一:基本介绍

        本文通过demo构建测试代码,debug分析的方法查看RabbitMq源码。

rabbit的中文文档: 官方中文文档

二:测试Demo

2.1 引入Springboot整合的RabbitMq依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 手写获取RabbitMq的连接,通道等信息

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RabbitMq Source Test* @author c* date: 2024-9-26 19:12:27*/
public class RabbitMqSourceTest {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();// 设置rabbitmq的服务器地址connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setPort(AMQP.PROTOCOL.PORT);// 建立连接Connection conn = connectionFactory.newConnection();String exchange = "test-Exchange";String queueName = "test-Queue";String key = "test-Exchange-key";String msg = "测试消息";// 创建一个channelChannel channel = conn.createChannel();// 创建一个直连交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);// 创建一个队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列channel.queueBind(queueName, exchange, key);// 发送消息channel.basicPublish(exchange, key, null, msg.getBytes());channel.close();conn.close();   }}

上面的基本流程

简单理解为:通过连接,获取通道,数据传输

基本步骤:

  1. 获取连接(Connection)
  2. 获取通道(channel)
  3. 创建交换机(Exchange)
  4. 创建队列(Queue)
  5. 队列通过key绑定交换机(Bind)
  6. 往交换机中的key发送消息
  7. 其他方法

(上面创建消息的也是可以直接创建队列,进行消息的发送,源码中会将没有创建exchange设置成默认的,具体可以自己查看一下)

三:详细分析步骤

3.0 获取连接(Connection)

 public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)throws IOException, TimeoutException {if(this.metricsCollector == null) {this.metricsCollector = new NoOpMetricsCollector();}// make sure we respect the provided thread factoryFrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);// set client-provided via a client propertyif (clientProvidedName != null) {Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());properties.put("connection_name", clientProvidedName);params.setClientProperties(properties);}// 如果设置 自动发送为 trueif (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection// 创建连接            
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);// 初始化conn.init();return conn;} else {// 通过 addrs  获取List<Address> addrs = addressResolver.getAddresses();Exception lastException = null;for (Address addr : addrs) {try {// 创建对应的 FrameHandler FrameHandler handler = fhFactory.create(addr, clientProvidedName);// 创建连接                    AMQConnection conn = createConnection(params, handler, metricsCollector);    conn.start();this.metricsCollector.newConnection(conn);return conn;} catch (IOException e) {lastException = e;} catch (TimeoutException te) {lastException = te;}}if (lastException != null) {if (lastException instanceof IOException) {throw (IOException) lastException;} else if (lastException instanceof TimeoutException) {throw (TimeoutException) lastException;}}throw new IOException("failed to connect");}}

3.1 创建渠道(Channel)

    @Overridepublic Channel createChannel() throws IOException {// 确定开启状态ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;// 重点可以看下这里: 创建channelChannel channel = cm.createChannel(this);// 通过 metricsCollector 创建新的channelmetricsCollector.newChannel(channel);return channel;}

 createChannel 方法:

        这里主要是通过channelNumberAllocator分配到一个channelNumber,可以理解为一个唯一标识,具体可以自行看一下它的实现

    public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {// 通过 channelNumberAllocator 获取到一个 channelNumber int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch;}

3.2 创建交换机(Exchange)

    public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);RecordedExchange x = new RecordedExchange(this, exchange).type(type).durable(durable).autoDelete(autoDelete).arguments(arguments);// 记录当前的交换机recordExchange(exchange, x);return ok;}

进入delegate.exchangeDeclare方法,可以看到控制台会创建成功exchange:

3.3 创建队列(Queue)

    @Overridepublic AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {// 这里执行完成会创建成功队列final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);RecordedQueue q = new RecordedQueue(this, ok.getQueue()).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments);if (queue.equals(RecordedQueue.EMPTY_STRING)) {q.serverNamed(true);}recordQueue(ok, q);return ok;}
    @Overridepublic Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,boolean autoDelete, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.DeclareOk)// 通过rpc申明 信息exnWrappingRpc(new Queue.Declare.Builder().queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();}

可以看到此时虽然创建了queue,但是并未绑定到exchang上面,需要进行下面的绑定

3.4 绑定队列

    @Overridepublic AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {// 绑定队列AMQP.Queue.BindOk ok = delegate.queueBind(queue, exchange, routingKey, arguments);recordQueueBinding(queue, exchange, routingKey, arguments);return ok;}
    @Overridepublic Queue.BindOk queueBind(String queue, String exchange,String routingKey, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.BindOk)    // 通过 rpc 申明绑定信息exnWrappingRpc(new Queue.Bind.Builder().queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();}

3.5 发送消息

    @Overridepublic void basicPublish(String exchange, String routingKey,boolean mandatory, boolean immediate,BasicProperties props, byte[] body)throws IOException{if (nextPublishSeqNo > 0) {unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}if (props == null) {props = MessageProperties.MINIMAL_BASIC;}// 组装消息AMQCommand command = new AMQCommand(new Basic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);try {// 发送消息transmit(command);} catch (IOException e) {metricsCollector.basicPublishFailure(this, e);throw e;}// 推送当前的channel 进行发布metricsCollector.basicPublish(this);}

四:总结

发送消息可以理解为以下步骤:

  1. 通过Channel往Rabbit服务端发送消息
  2. 通过PRC申明交换机,队列,绑定等信息
  3. 通过AMQP协议发送消息

   👍如果对你有帮助,给博主一个免费的点赞以示鼓励
欢迎各位🔎点赞👍评论收藏⭐️

相关文章:

  • Robot Operating System——一组三维空间中的位姿(位置和方向)
  • Flink集群部署
  • kafka下载配置
  • Go 1.19.4 序列化和反序列化-Day 16
  • 速盾:视频开cdn合适还是视频点播合适?
  • 大模型智能体在金融公告理解领域的应用 | OPENAIGC开发者大赛高校组AI创新之星奖
  • 语音音频(wav)声纹识别-技术实现-python
  • 【JavaEE初阶】网络原理
  • 性能优化与资源管理:优化Selenium脚本的执行效率,合理管理浏览器实例和系统资源
  • CSS给一行按钮统一设置间隔
  • DarkLabel2.4版本导入MOT17数据集
  • 如何解决跨境电商税务管理难题
  • Android常用C++特性之lambda表达式
  • 2-107 基于matlab的hsv空间双边滤波去雾图像增强算法
  • Linux 简易shell编写
  • #Java异常处理
  • [nginx文档翻译系列] 控制nginx
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • docker python 配置
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • Javascript设计模式学习之Observer(观察者)模式
  • MQ框架的比较
  • opencv python Meanshift 和 Camshift
  • UMLCHINA 首席专家潘加宇鼎力推荐
  • 大型网站性能监测、分析与优化常见问题QA
  • 欢迎参加第二届中国游戏开发者大会
  • 如何编写一个可升级的智能合约
  • 使用parted解决大于2T的磁盘分区
  • 通过npm或yarn自动生成vue组件
  • 做一名精致的JavaScripter 01:JavaScript简介
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • Spring Batch JSON 支持
  • ​iOS安全加固方法及实现
  • ​数据结构之初始二叉树(3)
  • #绘制圆心_R语言——绘制一个诚意满满的圆 祝你2021圆圆满满
  • (C++哈希表01)
  • (HAL)STM32F103C6T8——软件模拟I2C驱动0.96寸OLED屏幕
  • (第二周)效能测试
  • (动态规划)5. 最长回文子串 java解决
  • (附源码)计算机毕业设计ssm电影分享网站
  • (十二)springboot实战——SSE服务推送事件案例实现
  • (四)Controller接口控制器详解(三)
  • (转)Groupon前传:从10个月的失败作品修改,1个月找到成功
  • (转)VC++中ondraw在什么时候调用的
  • (转)四层和七层负载均衡的区别
  • (自用)gtest单元测试
  • .chm格式文件如何阅读
  • .Mobi域名介绍
  • .NET Core中的时区转换问题
  • .NET Framework、.NET Core 、 .NET 5、.NET 6和.NET 7 和.NET8 简介及区别
  • .NET 反射的使用
  • .Net(C#)自定义WinForm控件之小结篇
  • .NET/C# 将一个命令行参数字符串转换为命令行参数数组 args
  • .net分布式压力测试工具(Beetle.DT)