当前位置: 首页 > news >正文

dubbo 服务消费原理分析之应用级服务发现

文章目录

  • 前言
  • 一、MigrationRuleListener
    • 1、迁移状态模型
    • 2、Provider 端升级
    • 3、Consumer 端升级
    • 4、服务消费选址
    • 5、MigrationRuleListener.onRefer
    • 6、MigrationRuleHandler.doMigrate
    • 6、MigrationRuleHandler.refreshInvoker
    • 7、MigrationClusterInvoker.migrateToApplicationFirstInvoker
    • 8、MigrationInvoker.refreshInterfaceInvoker
    • 9、MigrationInvoker.refreshServiceDiscoveryInvoker
    • 10、MigrationInvoker.calcPreferredInvoker
    • 11、RegistryProtocol.getServiceDiscoveryInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、MigrationRuleListener

1、迁移状态模型

在 Dubbo 3 之前地址注册模型是以接口级粒度注册到注册中心的,而 Dubbo 3 全新的应用级注册模型注册到注册中心的粒度是应用级的。从注册中心的实现上来说是几乎不一样的,这导致了对于从接口级注册模型获取到的 invokers 是无法与从应用级注册模型获取到的 invokers 进行合并的。
为了帮助用户从接口级往应用级迁移,Dubbo 3 设计了 Migration 机制,基于三个状态的切换实现实际调用中地址模型的切换

图片3.1

当前共存在三种状态,FORCE_INTERFACE(强制接口级),APPLICATION_FIRST(应用级优先)、FORCE_APPLICATION(强制应用级)。
FORCE_INTERFACE:只启用兼容模式下接口级服务发现的注册中心逻辑,调用流量 100% 走原有流程
APPLICATION_FIRST:开启接口级、应用级双订阅,运行时根据阈值和灰度流量比例动态决定调用流量走向
FORCE_APPLICATION:只启用新模式下应用级服务发现的注册中心逻辑,调用流量 100% 走应用级订阅的地址

2、Provider 端升级

在不改变任何 Dubbo 配置的情况下,可以将一个应用或实例升级到 3.x 版本,升级后的 Dubbo 实例会默保保证与 2.x 版本的兼容性,即会正常注册 2.x 格式的地址到注册中心,因此升级后的实例仍会对整个集群仍保持可见状态。

图片3.2

3、Consumer 端升级

对于双订阅的场景,消费端虽然可同时持有 2.x 地址与 3.x 地址,但选址过程中两份地址是完全隔离的:要么用 2.x 地址,要么用 3.x 地址,不存在两份地址混合调用的情况,这个决策过程是在收到第一次地址通知后就完成了的。
图片3.3

4、服务消费选址

调用发生前,框架在消费端会有一个“选址过程”,注意这里的选址和之前 2.x 版本是有区别的,选址过程包含了两层筛选

  • 先进行地址列表(ClusterInvoker)筛选(接口级地址 or 应用级地址)
  • 再进行实际的 provider 地址(Invoker)筛选

图3.4

双注册带来的资源消耗
双注册不可避免的会带来额外的注册中心存储压力,但考虑到应用级地址发现模型的数据量在存储方面的极大优势,即使对于一些超大规模集群的用户而言,新增的数据量也并不会带来存储问题。总体来说,对于一个普通集群而言,数据增长可控制在之前数据总量的 1/100 ~ 1/1000

以一个中等规模的集群实例来说: 2000 实例、50个应用(500 个 Dubbo 接口,平均每个应用 10 个接口)。
​假设每个接口级 URL 地址平均大小为 5kb,每个应用级 URL 平均大小为 0.5kb
老的接口级地址量:2000 * 500 * 5kb ≈ 4.8G
​新的应用级地址量:2000 * 50 * 0.5kb ≈ 48M
​双注册后仅仅增加了 48M 的数据量。

接下来分析MigrationRuleListener的实际处理逻辑

5、MigrationRuleListener.onRefer

    @Overridepublic void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {MigrationRuleHandler<?> migrationRuleHandler = handlers.computeIfAbsent((MigrationInvoker<?>) invoker, _key -> {((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);});// 迁移规则执行 rule是封装了迁移的配置规则的信息对应类型MigrationRule类型,在初始化对象的时候进行了配置初始化migrationRuleHandler.doMigrate(rule);}

6、MigrationRuleHandler.doMigrate

从url中读取迁移方式和阈值,用于进行迁移的调用决策

  • 获取迁移方式
  • 获取决策阈值(默认-1.0)
	public synchronized void doMigrate(MigrationRule rule) {if (migrationInvoker instanceof ServiceDiscoveryMigrationInvoker) {refreshInvoker(MigrationStep.FORCE_APPLICATION, 1.0f, rule);return;}// initial step : APPLICATION_FIRST// 迁移方式,MigrationStep 一共有3种枚举情况:FORCE_INTERFACE, APPLICATION_FIRST, FORCE_APPLICATION// 默认 APPLICATION_FIRST 开启接口级、应用级双订阅MigrationStep step = MigrationStep.APPLICATION_FIRST;float threshold = -1f;try {	// URL中获取迁移方式step = rule.getStep(consumerURL);// threshold: 决策阈值(默认-1.0)计算与获取threshold = rule.getThreshold(consumerURL);} catch (Exception e) {logger.error("Failed to get step and threshold info from rule: " + rule, e);}// 刷新调用器对象 来进行决策服务发现模式if (refreshInvoker(step, threshold, rule)) {// refresh success, update rulesetMigrationRule(rule);}}

6、MigrationRuleHandler.refreshInvoker

通过迁移配置和当前提供者注册信息来决定创建什么类型的调用器对象(Invoker)来为后续服务调用做准备

	private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {if (step == null || threshold == null) {throw new IllegalStateException("Step or threshold of migration rule cannot be null");}MigrationStep originStep = currentStep;if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {boolean success = true;switch (step) {// 接口和应用双订阅,默认case APPLICATION_FIRST:migrationInvoker.migrateToApplicationFirstInvoker(newRule);break;// 仅应用case FORCE_APPLICATION:success = migrationInvoker.migrateToForceApplicationInvoker(newRule);break;// 仅接口case FORCE_INTERFACE:default:success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);}if (success) {// 设置迁移模式和阈值setCurrentStepAndThreshold(step, threshold);logger.info("Succeed Migrated to " + step + " mode. Service Name: " + consumerURL.getDisplayServiceKey());report(step, originStep, "true");} else {// migrate failed, do not save new step and rulelogger.warn("Migrate to " + step + " mode failed. Probably not satisfy the threshold you set "+ threshold + ". Please try re-publish configuration if you still after check.");report(step, originStep, "false");}return success;}// ignore if step is same with previous, will continue override rule for MigrationInvokerreturn true;}

7、MigrationClusterInvoker.migrateToApplicationFirstInvoker

默认使用接口级和应用级双订阅来兼容迁移的服务的
对应实现类 MigrationInvoker.migrateToApplicationFirstInvoker

    @Overridepublic void migrateToApplicationFirstInvoker(MigrationRule newRule) {CountDownLatch latch = new CountDownLatch(0);// 刷新接口级InvokerrefreshInterfaceInvoker(latch);// 刷新应用级InvokerrefreshServiceDiscoveryInvoker(latch);// directly calculate preferred invoker, will not wait until address notify// calculation will re-occurred when address notify later//  计算当前使用应用级还是接口级服务发现的Invoker对象calcPreferredInvoker(newRule);}

8、MigrationInvoker.refreshInterfaceInvoker

刷新接口级Invoker

	protected void refreshInterfaceInvoker(CountDownLatch latch) {// 去除旧的监听clearListener(invoker);// 需要更新if (needRefresh(invoker)) {if (logger.isDebugEnabled()) {logger.debug("Re-subscribing interface addresses for interface " + type.getName());}if (invoker != null) {invoker.destroy();}// 创建invokerinvoker = registryProtocol.getInvoker(cluster, registry, type, url);}setListener(invoker, () -> {latch.countDown();if (reportService.hasReporter()) {reportService.reportConsumptionStatus(reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));}if (step == APPLICATION_FIRST) {calcPreferredInvoker(rule);}});}

9、MigrationInvoker.refreshServiceDiscoveryInvoker

刷新应用级Invoker

	protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {clearListener(serviceDiscoveryInvoker);if (needRefresh(serviceDiscoveryInvoker)) {if (logger.isDebugEnabled()) {logger.debug("Re-subscribing instance addresses, current interface " + type.getName());}if (serviceDiscoveryInvoker != null) {serviceDiscoveryInvoker.destroy();}// 获取应用级invokerserviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);}setListener(serviceDiscoveryInvoker, () -> {latch.countDown();if (reportService.hasReporter()) {reportService.reportConsumptionStatus(reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app"));}if (step == APPLICATION_FIRST) {calcPreferredInvoker(rule);}});}

10、MigrationInvoker.calcPreferredInvoker

计算当前使用应用级还是接口级服务发现的Invoker对象

	private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {if (serviceDiscoveryInvoker == null || invoker == null) {return;}// 通过阈值指定invokerSet<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel()).getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();if (CollectionUtils.isNotEmpty(detectors)) {// pick preferred invoker// the real invoker choice in invocation will be affected by promotionif (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {this.currentAvailableInvoker = serviceDiscoveryInvoker;} else {this.currentAvailableInvoker = invoker;}}}

11、RegistryProtocol.getServiceDiscoveryInvoker

这里主要做了两件事

  • 创建注册中心目录
  • 通过代理对象的invoker
    public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建注册中心目录DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);// 创建invokerreturn doCreateInvoker(directory, cluster, registry, type);}

服务目录(RegistryDirectory),负责框架与注册中心的订阅,并动态更新本地Invoker列表、路由列表、配置信息的逻辑

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • UI(五)常用布局总结
  • RDP最小化之后仍然保持UI渲染的方法
  • php 实现推荐算法
  • 建模导论的最后一个视频笔记
  • 内存序学习笔记(一)——表达式求值顺序
  • 深入提示工程:解锁ChatGPT的无限潜能,掌握AI时代的智能对话技巧
  • iOS——线程安全、线程同步与线程通信
  • 模型训练套路(一)
  • [数据集][目标检测]街道乱堆垃圾检测数据集VOC+YOLO格式94张1类别
  • 【数学建模】2024数学建模国赛B题(word论文+matlab):生产过程中的决策问题
  • C++ STL-List容器概念及应用方法详解
  • 2024年高教社杯数学建模国赛C题超详细解题思路分析
  • Linux 一个简单的中断信号实现
  • 力扣100题——子串
  • 经验笔记:SSL证书
  • [笔记] php常见简单功能及函数
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • CentOS从零开始部署Nodejs项目
  • CODING 缺陷管理功能正式开始公测
  • docker python 配置
  • golang中接口赋值与方法集
  • IP路由与转发
  • JAVA多线程机制解析-volatilesynchronized
  • linux安装openssl、swoole等扩展的具体步骤
  • Linux链接文件
  • 回流、重绘及其优化
  • 老板让我十分钟上手nx-admin
  • 理解IaaS, PaaS, SaaS等云模型 (Cloud Models)
  • 前端性能优化——回流与重绘
  • 浅谈web中前端模板引擎的使用
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 如何选择开源的机器学习框架?
  • 用Node EJS写一个爬虫脚本每天定时给心爱的她发一封暖心邮件
  • 怎样选择前端框架
  • kubernetes资源对象--ingress
  • 函数计算新功能-----支持C#函数
  • # Spring Cloud Alibaba Nacos_配置中心与服务发现(四)
  • # 数仓建模:如何构建主题宽表模型?
  • #java学习笔记(面向对象)----(未完结)
  • $(document).ready(function(){}), $().ready(function(){})和$(function(){})三者区别
  • (35)远程识别(又称无人机识别)(二)
  • (Oracle)SQL优化技巧(一):分页查询
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (二)斐波那契Fabonacci函数
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (理论篇)httpmoudle和httphandler一览
  • (力扣记录)1448. 统计二叉树中好节点的数目
  • (十六)串口UART
  • (四)搭建容器云管理平台笔记—安装ETCD(不使用证书)
  • (一) 初入MySQL 【认识和部署】
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • **《Linux/Unix系统编程手册》读书笔记24章**
  • .360、.halo勒索病毒的最新威胁:如何恢复您的数据?
  • .bat批处理出现中文乱码的情况
  • .halo勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复