当前位置: 首页 > news >正文

RabbitMQ学习系列(四):发布-订阅模型详解

(一)发布-订阅模型(Publish/Subscribe)

发布订阅模型的结构图如下所示:

和前两个的模型结构不同,在发布订阅模型中多了一个X(exchange),exchange是一个交换机,生产者不是直接将消息发送给队列,而是先发送给交换机。消费者可以通过队列去订阅这个交换机,每个消费者对应于自己的一个队列。

这个结构就好像我们订阅微信公众号一样,作者将文章发送到自己的公众号上,只有订阅过该公众号的人才能收到消息。因此这个模型被称为发布-订阅模型。

(二)发布-订阅模型实践

发布订阅模型中多了交换机的存在,而我们在rabbitmq的可视化界面中就见到过exchange

继续通过代码展示该模型:

2.1 工具类

工具类和之前都一样,不做介绍了

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

2.2 生产者

public class Sent {
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,设置类型为fanout
        channel.exchangeDeclare(EXCHANGE,"fanout");
        String msg="hello world";
        channel.basicPublish(EXCHANGE,"",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

生产者在发布订阅模型中不再绑定队列,而是绑定交换机。exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

2.3 消费者一

public class Receive1 {
    private static final String QUEUE_NAME="ps_queue1";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者的代码和之前一样,唯一的区别是增加了队列和交换机的绑定channel.queueBind();

2.4 消费者二

public class Receive2 {
    private static final String QUEUE_NAME="ps_queue2";
    private static final String EXCHANGE="ps_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列和交换机绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE,"");
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动回复
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

2.5 运行项目

由于此时rabbitmq中不存在名称为ps_exchange的交换机,因此我们可以手动在rabbitmq的可视化界面中创建,也可以运行一次生产者来创建交换机。接着运行两个消费者和生产者,可以看到生产者发送出去的消息被消费者收到。

观察此时的可视化界面,可以看到该交换机上已经绑定了两个队列:

(三)Exchange类型介绍

exchange的种类有4中,分别是Direct 、Fanout 、Topic、Headers。接下来会做详细介绍。

3.1 Fanout(不处理路由键)

直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。即交换机将消息从生产者获取之后,直接发给订阅的队列。

3.2 Direct(处理路由键)

交换机和队列绑定时会设置路由键(routingkey),当消息从生产者发送给交换机时也会发送一个路由键。只有当这两个路由键相同时,交换机才会把消息发送给队列。

3.3 Topic(可以有通配符)

Topic和Direct类似,只不过Direct要求路由键完全相同,但是Topic可以使用通配符进行匹配,如#,*

3.4 header(根据header匹配)

在发布消息的时候就需要传入header值,其中的header就是binding时的arguments参数

相关文章:

  • Android进阶学习
  • RabbitMQ学习系列(五):routing路由模式和Topic主题模式
  • RabbitMQ学习系列(六):RabbitMQ消息确认机制
  • cisco 交换机自动备份配置
  • 应届毕业生因为疫情休息在家,可以通过哪些途径提高自己?
  • APP产品交互设计分析总结(不断更新中...)
  • 以SpringBoot作为后台实践ajax异步刷新
  • 观察:阿里的VR实验室能解决什么问题?
  • JavaIO详解--JavaIO的整体结构以及File类的使用
  • 学习笔记:对象,原型和继承(1)
  • JavaIO详解--快速学懂字节流与字符流
  • 搭建IM服务 so easy
  • JavaIO详解--尽可能将BIO、NIO、AIO讲得通俗易懂
  • 用jedis获取redis连接(集群和非集群状态下)
  • Mysql的索引调优详解:如何去创建索引以及避免索引失效
  • #Java异常处理
  • [译]前端离线指南(上)
  • 【技术性】Search知识
  • 【刷算法】求1+2+3+...+n
  • E-HPC支持多队列管理和自动伸缩
  • ES10 特性的完整指南
  • gcc介绍及安装
  • Linux CTF 逆向入门
  • Linux快速配置 VIM 实现语法高亮 补全 缩进等功能
  • Linux中的硬链接与软链接
  • Next.js之基础概念(二)
  • Node项目之评分系统(二)- 数据库设计
  • React Native移动开发实战-3-实现页面间的数据传递
  • React-Native - 收藏集 - 掘金
  • ReactNativeweexDeviceOne对比
  • 互联网大裁员:Java程序员失工作,焉知不能进ali?
  • 面试遇到的一些题
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 新书推荐|Windows黑客编程技术详解
  • 最简单的无缝轮播
  • const的用法,特别是用在函数前面与后面的区别
  • ​LeetCode解法汇总2182. 构造限制重复的字符串
  • # Swust 12th acm 邀请赛# [ E ] 01 String [题解]
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • $.type 怎么精确判断对象类型的 --(源码学习2)
  • (06)金属布线——为半导体注入生命的连接
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (SpringBoot)第二章:Spring创建和使用
  • (编译到47%失败)to be deleted
  • (二刷)代码随想录第15天|层序遍历 226.翻转二叉树 101.对称二叉树2
  • (附源码)计算机毕业设计ssm-Java网名推荐系统
  • (附源码)计算机毕业设计SSM保险客户管理系统
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (牛客腾讯思维编程题)编码编码分组打印下标(java 版本+ C版本)
  • (七)MySQL是如何将LRU链表的使用性能优化到极致的?
  • *1 计算机基础和操作系统基础及几大协议
  • .FileZilla的使用和主动模式被动模式介绍
  • .NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务
  • .Net CoreRabbitMQ消息存储可靠机制
  • .NET Remoting学习笔记(三)信道