当前位置: 首页 > news >正文

Nacos 服务发现(订阅)源码分析(服务端)

前言:

前文我们分析了 Nacos 服务发现(订阅)的流程,从 Nacos Client 端的源码分析了服务发现的过程,服务发现最终还是要调用 Nacos Server 端来获取服务信息,缓存到客户端本地,并且会定时向 Nacos Server 端发送请求,获取服务信息,本篇我们从 Nacos Server 来分析一下服务订阅源码。

Nacos 系列文章传送门:

Nacos 初步认识和 Nacos 部署细节

Nacos 配置管理模型 – 命名空间(Namespace)、配置分组(Group)和配置集ID(Data ID)

Nacos 注册中心和配置中心【实战】

服务启动何时触发 Nacos 的注册流程?

Nacos Client 端服务注册流程源码分析

Nacos Server 端服务注册流程源码分析

Nacos 服务发现(订阅)源码分析(客户端)

InstanceController#list 方法源码解析

前文我们分析到服务的发现(订阅)最终会调用 Nacos Server 端的接口,而这个接口就在 InstanceController 中,根据接口路径我们找到了对应的方法也就是 InstanceController#list 方法,源码解析如下:

//com.alibaba.nacos.naming.controllers.InstanceController#list
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {//获取 namespaceIdString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);//获取 serviceNameString serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//检查格式NamingUtils.checkServiceNameFormat(serviceName);//agent  java、c、c++、go、nginx、dnsfString agent = WebUtils.getUserAgent(request);//获取集群信息String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);//获取 udp  端口int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));//获取环境信息String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));//获取服务列表return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

InstanceController#list 方法本身没有什么难懂逻辑,只是从 request 中获取一些属性后,继续调用了 InstanceController#doSrvIpxt 方法,我们接着往下看。

InstanceController#doSrvIpxt 方法源码解析

InstanceController#doSrvIpxt 方法的源码比较多,大概拆分一下重要步骤,做了如下事情。

  1. 根据 namespaceId, 和 serviceName 获取服务信息。
  2. 判断是有有客户端订阅了服务,如果由客户端订阅了服务,则加入到 UDP 推送列表中,也就是之前我们分析过的 Nacos Server 是如何通知 Nacos Client 服务下线。
  3. 阀值判断,通过各种判断规则得到服务列表(判断详情请看源码分析)。
  4. 封装结果集返回。
//com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {//创建客户端对象ClientInfo clientInfo = new ClientInfo(agent);//创建 ObjectNodeObjectNode result = JacksonUtils.createEmptyJsonNode();//根据  namespaceId serviceName 获取 serviceService service = serviceManager.getService(namespaceId, serviceName);//缓存时间 默认 10 秒long cacheMillis = switchDomain.getDefaultCacheMillis();// now try to enable the pushtry {// udp 端口大于0 且已经开启推送  只有客户端订阅了  udp  端口才会大于0if (udpPort > 0 && pushService.canEnablePush(agent)) {//添加当前客户端 IP、UDP端口到 PushService 中 会作为可推送的目标客户端添加给推送服务组件pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);//根据服务名 获取缓存时间默认10 秒cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}//service 为空判断if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}//service 为空 构造空对象 返回result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}//检查服务是否禁用 默认是 启用的checkIfDisabled(service);//服务实例 ipsList<Instance> srvedIPs;//服务实例 ipssrvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector://若选择器不空 则根据选择算法选择可用的intance列表 默认情况下 选择器不做任何过滤if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}//serviceIps 为空判断if (CollectionUtils.isEmpty(srvedIPs)) {//为空if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}//客户端类型判断if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}//构造空对象返回result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.set("hosts", JacksonUtils.createEmptyArrayNode());result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}//存储健康和不健康的实例//key为true的value中存放的是所有健康的instance//key为false的value存放的是所有不健康的instanceMap<Boolean, List<Instance>> ipMap = new HashMap<>(2);ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());//服务实例遍历 区分健康和不健康的实例for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}//阀值判断 isCheck 客户端请求中如果没有传值 则默认是 fasleif (isCheck) {//false 标识没有达到保护阀值result.put("reachProtectThreshold", false);}//获取保护阀值double threshold = service.getProtectThreshold();//通过健康实例除以所有实例 来判断是否触发阀值if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {//进入这里标识已经出发了保护阀值Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {//启动服务保护机制result.put("reachProtectThreshold", true);}//将不健康的实例全部加入到健康的实例中//这样做的好处是可以保证服务不会那么快被打崩溃 即使有部分失败的 但是还是有可用的服务 //不健康的实例存在的目的就是分流 缓解健康服务的压力ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));//清空不健康的实例ipMap.get(Boolean.FALSE).clear();}//阀值判断if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}//能够走到这里 标识没有出发 阀值保护ArrayNode hosts = JacksonUtils.createEmptyArrayNode();//遍历实例for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();//如果只需要健康的实例 那就跳过不健康的实例if (healthyOnly && !entry.getKey()) {continue;}//遍历服务实例for (Instance instance : ips) {// remove disabled instance://移除禁用的实例if (!instance.isEnabled()) {continue;}//创建空对ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();//构建实例对象ipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}}//设置服务实例列表result.replace("hosts", hosts);//客户端类型判断if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}//返回结果result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}

PushService#addClient 方法源码解析

我们上面在分析 InstanceController#doSrvIpxt 方法时候,提到如果客户端的订阅了该服务,Nacos 服务端会进行通过 UDP 推送给客户端最新的服务信息,而这个操作就是由 PushService 类实现的,PushService#addClient 方法只是把服务相关信息加入到了推送列表中。

//com.alibaba.nacos.naming.push.PushService#addClient(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.net.InetSocketAddress, com.alibaba.nacos.naming.push.DataSource, java.lang.String, java.lang.String)
public void addClient(String namespaceId, String serviceName, String clusters, String agent,InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {//构造 PushClient 对象PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,app);//加入到推送列表中addClient(client);
}//com.alibaba.nacos.naming.push.PushService#addClient(com.alibaba.nacos.naming.push.PushService.PushClient)
public void addClient(PushClient client) {// client is stored by key 'serviceName' because notify event is driven by serviceName change//获取 serviceKeyString serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());//从 客户端 map  中获取 当前 client 对象ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);//为空 判断if (clients == null) {//为空 则加入到 客户端 map 中clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));//加入后获取clients = clientMap.get(serviceKey);}//获取之前的 client 对象PushClient oldClient = clients.get(client.toString());if (oldClient != null) {//为空空 刷新 其实是修改 client 最后一次引用时间 可以理解为更新时间oldClient.refresh();} else {//为空 也就是还没有注册这个推送目标客户端  将 client 加入到 clientsPushClient res = clients.putIfAbsent(client.toString(), client);if (res != null) {Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());}Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());}
}

PushService#onApplicationEvent 方法源码分析

上面我们分析到客户端获取服务信息的时候,服务端会判断是否有客户端订阅了该服务信息,如果有,则会出发推送给客户端,最终会把服务信息封装成一个 PushClient 加入到 clientMap 中,前文我们分析了客户端是如果感知服务下线的,其中也发现了一个 clientMap 的存储结构,而在 PushService#onApplicationEvent 方法会注册一个延时任务并将该 future 放入 futureMap,该延时任务会从 clientMap 获取指定namespaceId、 serviceName 的client 集合,遍历 client 集合,判断 client 是否是 zombie(僵尸) client,如果是的则移除该 client,否则创建 Receiver.AckEntry,然后通过 UDP 的方式推送给 client,执行完毕后会从 futureMap 移除该 future,至此回到了我们前一篇分析的地方,后续就是我们熟悉的流程,不在重复分析了。

//com.alibaba.nacos.naming.push.PushService#onApplicationEvent
public void onApplicationEvent(ServiceChangeEvent event) {//从事件对象中获取到 serviceService service = event.getService();//获取 servicenameString serviceName = service.getName();//获取名称空间idString namespaceId = service.getNamespaceId();//使用延时任务 延时1 秒 通过 UDP 的方式来发送Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");//从缓存map中获取当前服务的内层map 内层map中存放着当前服务的所有Nacos Client的//根据 namespaceId 和 serviceName 获取对应的 client 信息ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));//为空判断 如果为空 就没有必要推送了if (MapUtils.isEmpty(clients)) {return;}//创建缓存 mapMap<String, Object> cache = new HashMap<>(16);//获取当前时间的 纳秒long lastRefTime = System.nanoTime();//遍历所有的 client 信息for (PushClient client : clients.values()) {//是否是僵尸客户端if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());//如果是的话 就移除僵尸客户端clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}//ACKReceiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());//获取推送 keyString key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;//switchDomain.getDefaultPushCacheMillis() 默认是 10秒 因此不会进入 ifif (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}//封装 ackEntry  将客户端信息封装到 ackEntry if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {//这里初始化了需要推送的 客户端ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));//通过 udp 协议向 Nacos 客户端推送数据udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {//移除 futurefutureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);//任务放入 futureMap  表示已经发送了 udp 到客户端的服务实例futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}

总结:服务发现(订阅)Nacos 服务端的代码还是比较简答的,而且也有一种一通百通的感觉,分析的过程中,又回到了我们前文分析的代码,一下子就知道是怎么回事了,也更加理解了 Nacos 的设计思想。

欢迎提出建议及对错误的地方指出纠正。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 数据仓库事实表
  • 【微服务实战之Docker容器】第六章-复杂安装(Mysql主从Redis集群)
  • 代理伺服器分類詳解
  • ArcGIS Pro SDK (九)几何 10 弧
  • 【数据结构】初识数据结构
  • AI、AGI、AIGC与AIGC、NLP、LLM,ChatGPT区分
  • Nature子刊 | ATAC-seq、RNA-seq和蛋白组联合分析揭示脂质激活转录因子PPARα在肾脏代偿性肥大的作用机制
  • pdf怎么压缩的小一点?PDF压缩变小的6种方法(2024全新)
  • 数学基础【俗说矩阵】:初等矩阵和矩阵的初等行变化关系推导
  • 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【26】【内网穿透】cpolar
  • python内置zip函数详解
  • Linux——多路复用之poll
  • ACM中国图灵大会专题 | 图灵奖得主Manuel Blum教授与仓颉团队交流 | 华为论坛:面向全场景应用编程语言精彩回顾
  • arcgis紧凑型切片缓存(解决大范围切片,文件数量大的问题)
  • 三、初识C语言(3)
  • [case10]使用RSQL实现端到端的动态查询
  • [NodeJS] 关于Buffer
  • 【干货分享】SpringCloud微服务架构分布式组件如何共享session对象
  • C++类中的特殊成员函数
  • conda常用的命令
  • CSS实用技巧干货
  • css选择器
  • ERLANG 网工修炼笔记 ---- UDP
  • JavaScript 一些 DOM 的知识点
  • JAVA之继承和多态
  • mysql innodb 索引使用指南
  • springboot_database项目介绍
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • 爱情 北京女病人
  • 前端面试总结(at, md)
  • 如何正确配置 Ubuntu 14.04 服务器?
  • 入门级的git使用指北
  • 实现简单的正则表达式引擎
  • 使用agvtool更改app version/build
  • -- 数据结构 顺序表 --Java
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • 字符串匹配基础上
  • AI又要和人类“对打”,Deepmind宣布《星战Ⅱ》即将开始 ...
  • #QT(一种朴素的计算器实现方法)
  • #在 README.md 中生成项目目录结构
  • %@ page import=%的用法
  • (4)事件处理——(2)在页面加载的时候执行任务(Performing tasks on page load)...
  • (LeetCode C++)盛最多水的容器
  • (Windows环境)FFMPEG编译,包含编译x264以及x265
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (附源码)springboot优课在线教学系统 毕业设计 081251
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (机器学习-深度学习快速入门)第一章第一节:Python环境和数据分析
  • (实战篇)如何缓存数据
  • (转)Groupon前传:从10个月的失败作品修改,1个月找到成功
  • .NET 4 并行(多核)“.NET研究”编程系列之二 从Task开始
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
  • .NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布
  • .Net Web窗口页属性