当前位置: 首页 > news >正文

RabbitMQ学习系列(六):RabbitMQ消息确认机制

(一)概述

rabbitmq在使用过程中会遇到一个问题:生产者将消息发送出去后,消息有没有达到rabbitmq,默认是不知道的。

有两种解决方式:1.AMQP实现事务机制;2.Confirm模式

(二)事务机制

事务机制通过三段代码控制事务的执行:

1 channel.txSelect(); 将当前channel设置成transaction
2 channel.txCommit(); 提交事务
3 channel.txRollback(); 事务回滚

如果生产者因为一些错误没有将事务发送出去,那就会触发事务回滚机制,以达到消息确认的目的。

通过简单队列实现事务机制:

生产者

public class Sent {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello";
        try{
            //1.开启事务
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("消息发送成功");
            //2.提交事务
            channel.txCommit();
        }catch (Exception e){
            //3.事务回滚
            channel.txRollback();
            System.out.println("channel rollback");
        }
    }
}

消费者

public class Receive {
    private static final String QUEUE_NAME="tx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("receive:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

但是每个消息都创建一个事务很耗时,并且降低rabbit的吞吐量。

(三)Confirm模式

生产者将channel设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

Confirm模式有同步和异步两种,

3.1 同步

channel.confirmSelect(); 开启Confirm模式
单条同步:队列收到消息后会返回 waitForConfirms()
if(channel.waitForConfirms()==true)发送成功 else 发送失败
批量同步:发送批量数据
channel.waitForConfirmsOrDie():有一条发送失败触发IOException

同步是指一条数据发送出去后直到收到回复,下面一条数据才能发送。

当然也可以一批数据一起发送,直到回到回复,后面一批数据才能发送。

通过代码模拟,还是用简单队列,

单条同步生产者:

public class Sent {
    private static final String QUEUE_NAME="confirm_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello";
        //开启confirm
        channel.confirmSelect();
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        //判断是否收到waitForConfirms
        if (!channel.waitForConfirms()){
            System.out.println("发送失败");
        }else {
            System.out.println("发送成功");
        }
    }
}

消费者的代码和之前一致就不贴了。

批量同步生产者

public class Sent1 {
    private static final String QUEUE_NAME="confirm_queue1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.confirmSelect();
        for (int i=0;i<10;i++){
            String msg="i:"+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        //只要有一个消息未确认就会IOException
        channel.waitForConfirmsOrDie();
        System.out.println("全部发送成功");
    }
}

3.2 异步

异步是指发送数据之后,不用等待返回消息,而是由异步监听。每条消息发送时都会有一个deliveryTag,由异步监听来确认是否送达

channel.confirmSelect(); 开启Confirm模式
channel.addConfirmListener()异步监听发送方确认模式;

通过代码实践:

public class Sent2 {
    private static final String QUEUE_NAME="confirm_queue2";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //开启confirm模式
        channel.confirmSelect();
        for (int i=0;i<10;i++){
            String msg="i:"+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        //异步监听确认和未确认的消息
       channel.addConfirmListener(new ConfirmListener() {
           //确认的消息
           public void handleAck(long deliveryTag, boolean multiple) throws IOException {
               System.out.println("确认消息Tag:"+deliveryTag+"是否批量确认:"+multiple);
           }
           //未确认的消息
           public void handleNack(long deliveryTag, boolean multiple) throws IOException {
               System.out.println("未确认消息Tag:"+deliveryTag);
           }
       });
    }
}

multiple的意思是是否对消息进行了批量确认,上面一段代码的运行结果:

相关文章:

  • cisco 交换机自动备份配置
  • 应届毕业生因为疫情休息在家,可以通过哪些途径提高自己?
  • APP产品交互设计分析总结(不断更新中...)
  • 以SpringBoot作为后台实践ajax异步刷新
  • 观察:阿里的VR实验室能解决什么问题?
  • JavaIO详解--JavaIO的整体结构以及File类的使用
  • 学习笔记:对象,原型和继承(1)
  • JavaIO详解--快速学懂字节流与字符流
  • 搭建IM服务 so easy
  • JavaIO详解--尽可能将BIO、NIO、AIO讲得通俗易懂
  • 用jedis获取redis连接(集群和非集群状态下)
  • Mysql的索引调优详解:如何去创建索引以及避免索引失效
  • (经验分享)作为一名普通本科计算机专业学生,我大学四年到底走了多少弯路
  • 经验分享:JAVA实习生刚进公司主要做些什么?以及进入职场后我的心理变化
  • oracle迁移mysql数据库注意(转)
  • $translatePartialLoader加载失败及解决方式
  • CSS魔法堂:Absolute Positioning就这个样
  • Debian下无root权限使用Python访问Oracle
  • docker-consul
  • HTTP请求重发
  • Median of Two Sorted Arrays
  • Python 反序列化安全问题(二)
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • Shadow DOM 内部构造及如何构建独立组件
  • Vue UI框架库开发介绍
  • vue-router 实现分析
  • -- 查询加强-- 使用如何where子句进行筛选,% _ like的使用
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 简单易用的leetcode开发测试工具(npm)
  • 近期前端发展计划
  • 如何设计一个微型分布式架构?
  • 用jquery写贪吃蛇
  • Linux权限管理(week1_day5)--技术流ken
  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • 选择阿里云数据库HBase版十大理由
  • ​​​​​​​​​​​​​​Γ函数
  • ​​快速排序(四)——挖坑法,前后指针法与非递归
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • #APPINVENTOR学习记录
  • #每天一道面试题# 什么是MySQL的回表查询
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (二)pulsar安装在独立的docker中,python测试
  • (附源码)ssm码农论坛 毕业设计 231126
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • .dat文件写入byte类型数组_用Python从Abaqus导出txt、dat数据
  • .gitignore文件---让git自动忽略指定文件
  • .md即markdown文件的基本常用编写语法
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .Net 中Partitioner static与dynamic的性能对比
  • .net访问oracle数据库性能问题
  • .NET微信公众号开发-2.0创建自定义菜单
  • .net知识和学习方法系列(二十一)CLR-枚举
  • @angular/cli项目构建--Dynamic.Form