当前位置: 首页 > news >正文

RocketMQ的TAG过滤和SQL过滤机制

写作目的

项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息

  • TAG
    如果使用TAG去获取自己感兴趣的消息,那么对于一条学生表变更binlog,最少要插入三条消息,比如TAG=学生表,比如TAG=UPDATE修改操作,比如TAG=学生状态为1,等等。想到的就三种。。。
    所以上面这种方式缺陷还是挺明显的。

  • SQL过滤
    如果使用SQL过滤的方式,我们可以对某些属性进行过滤,自己拼接SQL,灵活性就上来了。

但是我好奇的一点是SQL怎么加到TAG里呢?并且TAG只能支持一个属性值呀。所以接下来从源码和原理的角度进行分析和探讨。

总体来说Tag过滤和SQL过滤如下图所示
在这里插入图片描述

代码展示

本着简单的原则出发

TAG过滤

当producer构建消息时消息时会构造方法里会有TAG的属性,如代码所示,Tag = Creative。

Message msg =
              new Message(
                  "CBeann", // topic
                  "Creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body

当consumer订阅topic时要想监听Tag = creative的就可以如下图所示

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
		//主题,Tag
        consumer.subscribe("CBeann", "Creative");
        consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setConsumerGroup("group1");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // wrong time format 2017_0422_221800
    // consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(...)

SQL过滤

与Tag消息不同的是,produccer生产的msg需要放入一些属性,如下代码所示,放入age属性的值为18。

 Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));
          SendResult sendResult = producer.send(msg);

consumer中则不能根据tag过滤了。需要使用MessageSelector

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);
        //consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup("group1Sql");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.registerMessageListener()

TAG过滤机制

过滤图

此处以Tag过滤机制为例,消息过滤存在两个地方,一个是服务器端,另一个是消息者端。
在这里插入图片描述

假设消费者订阅的topic=CBeann,tag=creative,creative的hashCode =9527(假设一下)

而topic=CBeann的消息队列里有3条消息
msg1[tag=feed,tagHashCode= 9000]
msg2[tag=creative,tagHashCode= 9527]
msg3[tag=material,tagHashCode= 9527]

当consumer消费者给broker服务器发送获取topic=CBeann,tag=creative请求时,请求会转化为topic=CBeann,tagHashCode=9527
因此对于上述的3条消息,经过tagHashCode匹配后会把msg2和msg3发送给consumer消息者。
而Consumer消费者会根据tag匹配后留下msg2

源码思路讲解

构建SubscriptionData

首先要了解一点,我们在consumer中设置订阅的topic和tag是什么样的一个数据结构呢?

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("CBeann", "Creative");

其实一直往下跟subcribe方法,最后我们可以定位到FilterAPI#buildSubscriptionData方法。我们传入的topic=CBeann;tag=creative 被封装到SubscriptionData对象里,包括topic、tag、tagHashCode
在这里插入图片描述

brokder过滤逻辑

那么consumer消费端是存储着topic、tag、tagHashCode。而consumer会把topic和tagHashCode发送给Broker服务器。

当consumer消费者向broker服务端请求获取消息时,broker会从ConsumeQueue获取offset之后的所有如下所示的三元组。ConsumeQueue里的数据三元组如下图所示。
在这里插入图片描述

其实三元组是解析出来的,解析的三个属性就是上图中的offsetPy、sizePy和tagCode。下面我们重点关注一下tagCode
在这里插入图片描述
解析出来的tagsCode如果匹配成功,则保留,如果匹配失败,则continue。
在这里插入图片描述

接下来看一下是怎么匹配呢?如果是*,则全匹配,否则就根据tagsCode匹配。此处不是根据tag匹配,所以会有hash冲突的数据也会匹配到
在这里插入图片描述

结论此时我们可以看到,broker服务器端是通过hashcode匹配的,哈希冲突的msg会被认为有效消息发送给consumer端

consumer过滤逻辑

一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。
在这里插入图片描述
拿到消息后再经过Tag过滤,如下图所示,则到达我们自定义的处理消息逻辑
在这里插入图片描述
结论此时我们可以看到,consumer消费者端是通过tag匹配的,二次过滤因为哈希导致消息Tag不准确的问题

SQL过滤机制

SQL过滤和Tag过滤的消息有什么区别

结论:没区别,就是多了几个属性。比如下面的代码中的age属性

Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));

如上面代码所示,msg的tag=creative, 属性age=18。
其实根据Message的构造方法和putUserProperty方法可以发现,最后都是放到Properties里
在这里插入图片描述

构建SubscriptionData

SQL过滤和Tag过滤的consumer端有什么区别?

如下面代码所示,我们构造了一个MessageSelector

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);

那么subscribe方法同样是把MessageSelector也是构建成SubscriptionData。不过和Tag那种不同的是,SubscriptionData里面放的是SQL即subString属性和expressionType属性SQL92
在这里插入图片描述

源码跟踪

broker过滤逻辑

SQL过滤和tag过滤都是经历下面的三个阶段,下面我们重点跟一下SQL过滤的代码块messageFilter#isMatchedByCommitLog。在这里插入图片描述

debug了一下,如下图所示,从buffer里解析出properties来然后和SQL进行校验,返回校验结果
在这里插入图片描述

consumer过滤逻辑

一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。
下面的这个图其实在上面也出现过,这个处理方法里并没有SQL过滤的逻辑,因此在consumer不过滤。
在这里插入图片描述

总结

  • 特殊的分表方式
    tag作为msg的properties,这个其实映射到数据库分库分表中。比如db的一条记录需要新增一个字段,我们完全可以新增一个setting表,存储这个properties属性。阿里这边的很多项目DB设计都是这么做的。
  • SQL过滤比Tag过滤慢的原因:比较慢,解析慢
    Tag过滤是直接等于,而SQL过滤还要通过表达式计算,SQL复杂的计算必然不如直接等于快。
    SQL过滤的时候需要解析properties,本身就是一种资源消耗。

相关文章:

  • 2023年电气,电子与信息工程国际会议(ISEEIE 2023)
  • 【前端开发学习】4.JavaScript
  • 【大数据技术Hadoop+Spark】HBase分布式数据库架构、特点、数据存储方式、寻址机制详解(图文解释)
  • K8s——Service、代理模式演示(二)
  • 哈希表及其与Java类集的关系
  • CSS基础总结(二)
  • 《Python多人游戏项目实战》第三节 在窗口上显示玩家ID以及对话内容
  • SpringBoot【配置文件】
  • 王卫点赞友商?北京快递保卫战,顺丰彰显大格局大气度
  • 95 C语言初阶练习题
  • Class Charset
  • 深度学习目标检测:YOLOv5实现红绿灯检测(含红绿灯数据集+训练代码)
  • SpringBoot+Vue实现前后端分离的小而学在线考试系统
  • Redis常见面试题(二)
  • Servlet应用(Request+response对象)
  • 网络传输文件的问题
  • IE9 : DOM Exception: INVALID_CHARACTER_ERR (5)
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • java2019面试题北京
  • JavaScript对象详解
  • JSONP原理
  • JS正则表达式精简教程(JavaScript RegExp 对象)
  • React 快速上手 - 06 容器组件、展示组件、操作组件
  • RxJS: 简单入门
  • ViewService——一种保证客户端与服务端同步的方法
  • Vue组件定义
  • Yeoman_Bower_Grunt
  • 分布式事物理论与实践
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 新书推荐|Windows黑客编程技术详解
  • 回归生活:清理微信公众号
  • 教程:使用iPhone相机和openCV来完成3D重建(第一部分) ...
  • 组复制官方翻译九、Group Replication Technical Details
  • (11)MATLAB PCA+SVM 人脸识别
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (第一天)包装对象、作用域、创建对象
  • (附源码)php投票系统 毕业设计 121500
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • (算法设计与分析)第一章算法概述-习题
  • (转)关于pipe()的详细解析
  • (转载)利用webkit抓取动态网页和链接
  • .Net 6.0 处理跨域的方式
  • .net core IResultFilter 的 OnResultExecuted和OnResultExecuting的区别
  • .net core 依赖注入的基本用发
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .net core开源商城系统源码,支持可视化布局小程序
  • .NET LINQ 通常分 Syntax Query 和Syntax Method
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .NET 常见的偏门问题
  • .Net 代码性能 - (1)
  • .net/c# memcached 获取所有缓存键(keys)
  • .net利用SQLBulkCopy进行数据库之间的大批量数据传递
  • @autowired注解作用_Spring Boot进阶教程——注解大全(建议收藏!)