当前位置: 首页 > news >正文

springcloud rocketmq 新增的消费者组从哪里开始消费

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费?

直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系的接口,在阿里云的生产控制台也没有绑定的入口。
在这里插入图片描述
在这里插入图片描述
所以只能是消费者启动后再注册订阅关系。
消费者从哪里消费的计算:
RebalancePushImpl.java
在这里插入图片描述

默认走的是:CONSUME_FROM_LAST_OFFSET 规则,按照官方说法,是从最后的消费位点开始继续消费。
关键的获取消费位点的逻辑:readOffset方法:
RemoteBrokerOffsetStore.java
在这里插入图片描述
集群模式下,是从远程获取的偏移量,跟据 fetchConsumeOffsetFromBroker 方法:
在这里插入图片描述
在这里插入图片描述
报错,其实就是服务端没有该消费者组的offset,被catch住默认返回 -1.
又不是重试队列,所以拿最大的偏置,broker-a queue-8 的 brokerOffset 是 25
在这里插入图片描述
出来到了 RebalanceImpl.java 的 updateProcessQueueTableInRebalance 方法。
在这里插入图片描述
然后会被添加到 pullRequestList 通过 this.dispatchPullRequest(pullRequestList)

控制台topic消费进度中已经保存了新的消费者组的消费进度,但 consumeOffset都是 0, 还有 759 个消息没有消费。
在这里插入图片描述

消费者消费了一些比较早前的消息:
在这里插入图片描述
在这里插入图片描述

消费进度也随之更新。
在这里插入图片描述
为什么和官方的说法不一致呢?CONSUME_FROM_LAST_OFFSET 为什么没有起到作用?
参考官方的修复:Fix CONSUME_FROM_LAST_OFFSET mode may pull data from 0L #4909

~~
至于这个console怎么看?参考以下:
在rocketmq的控制台中,选择 topic -> consumer manage,就可以查看一个主题下的消费者组、集群、队列的消费情况。
在这里插入图片描述
其中,10.122.24.41 是本人的内网ip,如果我本地线程卡住了(或者debug中),这个在线状态也会下线的。目前我是分配到了其中的8个集群队列,broker-a(8~15)。
在这里插入图片描述
offsetTable 的内容和我所描述的一致。
此外:
我还有对比组,是之前创建的废弃的消费者组,集群位点 brokerOffset 不变,消费位点 consumerOffset 落后了许多,落后的总量 diffTotal 代表此消费者组还有这么多未消费的消息。而且也没有在线的消费者客户端 consumerClient。
在这里插入图片描述
如果这时,我配置启动消费者去消费此消费者组。预计会消费 delay = 294 个消息。
结果也确实如此,将消息消费完,而且分配到了所有集群的所有队列。
在这里插入图片描述

测试结果:默认策略会从offset = 0 开始消费。
在这里插入图片描述
所以该参数没有用,还是从0开始消费了,这时候只能靠消费者组重置位点操作了。

~~
tags过滤,是服务端过滤,为什么会直接将不需要的消息也丢失掉呢?
这就要涉及到订阅关系一致性。
在这里插入图片描述
在这里插入图片描述

参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering

tags过滤会将不匹配的直接跳过(丢失)
我的理解是,现在没有为某个tags有单独记录消费进度的地方,所谓的服务端过滤,也只是说用hashcode快速匹配拉取而已,之后也是直接将offset拉到队列尾的。

参考:https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg?spm=a2c6h.12873639.article-detail.18.3ba035175CHVos

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【开发学习笔记】什么是Springboot?
  • 【AI大模型】Prompt 提示词工程使用详解
  • SSM学习9:SpringBoot简介、创建项目、配置文件、多环节配置
  • docker 构建 mongodb
  • 阿里服务器购买与java环境搭建 实践
  • kafka高性能的底层原理分析
  • 若依ruoyi+AI项目二次开发(智能售货机运营管理系统)
  • 0719_驱动1 arm裸机开发与linux驱动开发区别
  • centos7安装redis数据库步骤
  • 数据库编程中游标 连接 commit 字符集
  • 键盘是如何使用中断机制的?当打印一串字符到显示屏上时发生了什么???
  • 【科大讯飞笔试题汇总】2024-07-27-科大讯飞秋招提前批(研发岗)-三语言题解(Cpp/Java/Python)
  • 数据结构:单链表的实现
  • 大疆创新2025校招内推
  • LeeCode Practice Journal | Day25_Backtracking04
  • 网络传输文件的问题
  • 分享一款快速APP功能测试工具
  • 时间复杂度分析经典问题——最大子序列和
  • 10个确保微服务与容器安全的最佳实践
  • css的样式优先级
  • CSS居中完全指南——构建CSS居中决策树
  • GraphQL学习过程应该是这样的
  • Java,console输出实时的转向GUI textbox
  • Python学习之路16-使用API
  • zookeeper系列(七)实战分布式命名服务
  • 诡异!React stopPropagation失灵
  • 基于遗传算法的优化问题求解
  • 力扣(LeetCode)357
  • 小而合理的前端理论:rscss和rsjs
  • FaaS 的简单实践
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • 教程:使用iPhone相机和openCV来完成3D重建(第一部分) ...
  • ###STL(标准模板库)
  • #宝哥教你#查看jquery绑定的事件函数
  • (+4)2.2UML建模图
  • (AtCoder Beginner Contest 340) -- F - S = 1 -- 题解
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第2节(共同的基类)
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • (第61天)多租户架构(CDB/PDB)
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (六)软件测试分工
  • (四)stm32之通信协议
  • (提供数据集下载)基于大语言模型LangChain与ChatGLM3-6B本地知识库调优:数据集优化、参数调整、Prompt提示词优化实战
  • (限时免费)震惊!流落人间的haproxy宝典被找到了!一切玄妙尽在此处!
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (转)h264中avc和flv数据的解析
  • (转载)OpenStack Hacker养成指南
  • .[backups@airmail.cc].faust勒索病毒的最新威胁:如何恢复您的数据?
  • .CSS-hover 的解释
  • .Net CF下精确的计时器
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .NET/C# 中设置当发生某个特定异常时进入断点(不借助 Visual Studio 的纯代码实现)
  • .Net多线程Threading相关详解
  • .net分布式压力测试工具(Beetle.DT)