当前位置: 首页 > news >正文

RabbitMQ学习系列(三):工作队列详解

(一)RabbitMQ工作队列模型结构

工作队列的模型相比简单队列增加了消费者的数量。

生产者提供消息到消息队列中,消费者可以去获取队列中的消息。在工作队列中默认采用轮询分发的方式将消息分发给消费者。所谓轮询分发,就是指不管消费者处理消息的速度是快是慢,都按照顺序轮流把消息发给消费者。

(二)工作队列实践(轮询分发)

使用工作队列的代码和简单队列基本一致,只不过多了几个消费者

2.1 创建工具类

创建工具类的代码在前一篇博客中已经讲到了,这里直接贴上代码:

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

2.2 创建生产者

public class Send {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection();
        //2.创建通道
        Channel channel = connection.createChannel();
        //3.创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

2.3 创建消费者一

为了体现消费者处理消息的快慢,我在两个消费者中分别设置线程休眠1s和2s

public class Receive1 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //创建消费者监听方法
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠实践1s
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

2.4 创建消费者二

public class Receive2 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠时间2s
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

分别将两个消费者运行起来,然后运行生产者发送50条消息,可以发现虽然两个消费者处理消息的能力有快有慢,但是得到的消息都是25条,下面展示消费者1获取的消息部分截图。

(三)公平分发(Fair dispatch)

在某些场景下轮询分发是不合理的,因此工作队列还有公平分发的方式,所谓公平分发,就是能者多劳,处理消息快的人获得消息多,处理消息慢的人获得消息少。公平分发的实现只需要对代码做一些修改:

3.1 修改生产者

对于生产者,只需要对通道增加一条限制,限制通道发送给同一个消费者不得超过一条消息,也就是只有当消费者处理完一条消息以后才会发第二条消息给它。使用channel.basicQos();方法,设置参数为1表示限制一次不超过1条消息。

public class Send {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //限制通道发送给同一个消费者不得超过一条消息
        int prefenchCount=1;
        channel.basicQos(prefenchCount);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

3.2 修改消费者

对于消费者,需要修改三处地方,第一处和生产者一样修改通道的限制信息;第二处关闭消费者的自动应答;第三处设置手动回执,即处理完一条消息后手动发送处理完成的指令给队列。

//保证一次只分发一次
channel.basicQos(1);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
//关闭自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

以下是修改后的消费者代码

public class Receive1 {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

设置完之后工作队列就变成了公平分发方式,测试结果:

3.3 关于自动应答

在前面修改消费者代码的时候,我们关闭了自动应答

boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

这是basicConsume的第二个参数

autoAck=true时,表示开启自动应答,一旦rabbitmq将队列中的消息发送给消费者,这个消息就会从队列中消失。但是如果此时消费者挂掉了,那么这条消息也就彻底消失了。

autoAck=false时,关闭自动应答,rabbitmq将队列中的消息发送给消费者,只有当消费者返回确认之后,队列中的消息才会被删除。

相关文章:

  • RabbitMQ学习系列(四):发布-订阅模型详解
  • Android进阶学习
  • RabbitMQ学习系列(五):routing路由模式和Topic主题模式
  • RabbitMQ学习系列(六):RabbitMQ消息确认机制
  • cisco 交换机自动备份配置
  • 应届毕业生因为疫情休息在家,可以通过哪些途径提高自己?
  • APP产品交互设计分析总结(不断更新中...)
  • 以SpringBoot作为后台实践ajax异步刷新
  • 观察:阿里的VR实验室能解决什么问题?
  • JavaIO详解--JavaIO的整体结构以及File类的使用
  • 学习笔记:对象,原型和继承(1)
  • JavaIO详解--快速学懂字节流与字符流
  • 搭建IM服务 so easy
  • JavaIO详解--尽可能将BIO、NIO、AIO讲得通俗易懂
  • 用jedis获取redis连接(集群和非集群状态下)
  • 【Leetcode】101. 对称二叉树
  • .pyc 想到的一些问题
  • 【347天】每日项目总结系列085(2018.01.18)
  • Android 控件背景颜色处理
  • Asm.js的简单介绍
  • eclipse(luna)创建web工程
  • Electron入门介绍
  • es6--symbol
  • input的行数自动增减
  • isset在php5.6-和php7.0+的一些差异
  • Redux系列x:源码分析
  • SpringBoot 实战 (三) | 配置文件详解
  • sublime配置文件
  • vue-router的history模式发布配置
  • weex踩坑之旅第一弹 ~ 搭建具有入口文件的weex脚手架
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • 创建一种深思熟虑的文化
  • 从0到1:PostCSS 插件开发最佳实践
  • 大主子表关联的性能优化方法
  • 读懂package.json -- 依赖管理
  • 力扣(LeetCode)965
  • 前端面试总结(at, md)
  • 微服务入门【系列视频课程】
  • 小李飞刀:SQL题目刷起来!
  • 原生js练习题---第五课
  • 函数计算新功能-----支持C#函数
  • ​LeetCode解法汇总307. 区域和检索 - 数组可修改
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • #include
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • #QT(智能家居界面-界面切换)
  • #基础#使用Jupyter进行Notebook的转换 .ipynb文件导出为.md文件
  • (aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
  • (Pytorch框架)神经网络输出维度调试,做出我们自己的网络来!!(详细教程~)
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (一)ClickHouse 中的 `MaterializedMySQL` 数据库引擎的使用方法、设置、特性和限制。
  • (一)UDP基本编程步骤
  • (转)关于如何学好游戏3D引擎编程的一些经验
  • (轉貼)《OOD启思录》:61条面向对象设计的经验原则 (OO)
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1