当前位置: 首页 > news >正文

springcloud RocketMQ 客户端是怎么走到消费业务逻辑的 - debug step by step

springcloud RocketMQ ,一个mq消息发送后,客户端是怎么一步步拿到消息去消费的?我们要从代码层面探究这个问题。

找的流程图,有待考究。
在这里插入图片描述

以下我们开始debug:


拉取数据的线程:
PullMessageService.java 本质是一个线程类

public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();// ..
}

执行逻辑,一直循环拿取阻塞队列的内容,阻塞队列的内容由负载均衡服务提供。(阻塞队列中保存了目前客户端占有的 brokder - queue 信息)
在这里插入图片描述
然后进入 DefaultMQPushConsumerImpl.java 的 pullMessage(关键)
这里面有个关键的方法,this.pullAPIWrapper.pullKernelImpl(…) 这里传入了成功回调 pullCallback。
在这里插入图片描述
一直执行到 pullMessageAsync 是异步拉取消息,成功后会执行回调。
这里是成功消费后的回调。
成功后的回调逻辑:
在这里插入图片描述

ConsumeMessageConcurrentlyService.java 的 submitConsumeRequest 方法,将任务下发给消费者线程池 consumeExecutor (ThreadPoolExecutor 类型)去执行。(日志显示就是这里执行的消费业务)
在这里插入图片描述
~~
ok,我们看看开启的这个线程做了什么。
首先,单独一个线程是无法debug跨线程的,所以我们继续在 ConsumeMessageConcurrentlyService.ConsumeRequest 消费者请求线程中debug run方法,看看是怎么执行到我们的业务逻辑的。
发现是 监听器 listener 的消费逻辑
在这里插入图片描述
这个 listener 是一个接口,而且这个接口没有找到代码impl,也就是可能是匿名的视线
我们debug直接跳到了 RocketMQInboundChannelAdapter.java 的监听器,当时就是从这里把监听器注册进来的。
在这里插入图片描述
匿名方法执行了 RocketMQInboundChannelAdapter.this.consumeMessage
在这里插入图片描述
执行了一段 retry 逻辑(spring的重试框架),里面执行了发送消息逻辑。
在这里插入图片描述
发现底层用的是 spring 的 integration 消息通信框架!
debug进去send逻辑,会发送到一个 channel 中去
2
channel 里就有我们的处理方法的代理对象,是转发 dispatcher 的目标处理器 handlers 之一。
在这里插入图片描述
后面不出所料,就是通过反射去执行这个方法。
在这里插入图片描述
然后就跑到了我们的逻辑:
在这里插入图片描述

创作不易,希望点赞、收藏、关注支持~

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 19.延迟队列优化
  • 高性能响应式UI部件DevExtreme v24.1.4全新发布
  • TCP程序设计
  • Linux基础操作(下)
  • 基于Flink SQL CDC的实时数据同步
  • wire和reg的区别
  • 使用eclipse在新建的java项目中编辑xml文件时Unhandled event loop exception No more handles
  • 力扣 二分查找
  • Android Studio run App 不更新代码
  • 谷粒商城实战笔记-63-商品服务-API-品牌管理-OSS获取服务端签名
  • GO发票真伪批量查验方法、数电票查验接口
  • 系统移植(七)u-boot移植 ④ trusted版本
  • Flume安装部署
  • 先用先发!小样本故障诊断新思路!Transformer-SVM组合模型多特征分类预测/故障诊断(Matlab)
  • Unity横板动作游戏 -为什么我又开始学习Unity,而不是Godot。
  • 【译】JS基础算法脚本:字符串结尾
  • 分享一款快速APP功能测试工具
  • 【笔记】你不知道的JS读书笔记——Promise
  • Angular 响应式表单之下拉框
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • HTML中设置input等文本框为不可操作
  • k8s如何管理Pod
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • Making An Indicator With Pure CSS
  • PAT A1120
  • sublime配置文件
  • vue学习系列(二)vue-cli
  • 闭包--闭包作用之保存(一)
  • 不用申请服务号就可以开发微信支付/支付宝/QQ钱包支付!附:直接可用的代码+demo...
  • 番外篇1:在Windows环境下安装JDK
  • 讲清楚之javascript作用域
  • 利用阿里云 OSS 搭建私有 Docker 仓库
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 如何利用MongoDB打造TOP榜小程序
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 微服务框架lagom
  • ​sqlite3 --- SQLite 数据库 DB-API 2.0 接口模块​
  • "无招胜有招"nbsp;史上最全的互…
  • #include
  • #Linux(make工具和makefile文件以及makefile语法)
  • #LLM入门|Prompt#1.8_聊天机器人_Chatbot
  • #Z2294. 打印树的直径
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • $(function(){})与(function($){....})(jQuery)的区别
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (5)STL算法之复制
  • (bean配置类的注解开发)学习Spring的第十三天
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (cljs/run-at (JSVM. :browser) 搭建刚好可用的开发环境!)
  • (k8s)Kubernetes本地存储接入
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测
  • (MTK)java文件添加简单接口并配置相应的SELinux avc 权限笔记2
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (八)Flink Join 连接