当前位置: 首页 > news >正文

nacos 服务发现获取列表源码分析

nacos 服务发现获取列表源码是注册中心最重要的技术点之一,其获取服务列表理论上是在首次接口调用时获取,有时候配置饥饿加载,即服务启动时就获取服务列表:今天我们从一个入口解析获取配置列表;

一、客户端源码

1、自动装配:

commons 中

 nacos 中

 

2、以nacos包为例 com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnAvailableEndpoint//初始化点击此类
	public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnEnabledHealthIndicator("nacos-discovery")
	public HealthIndicator nacosDiscoveryHealthIndicator(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
		return new NacosDiscoveryHealthIndicator(
				nacosServiceManager.getNamingService(nacosProperties));
	}

}

3、点击 com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpoint

    @ReadOperation
	public Map<String, Object> nacosDiscovery() {
		Map<String, Object> result = new HashMap<>();
		result.put("NacosDiscoveryProperties", nacosDiscoveryProperties);

		NamingService namingService = nacosServiceManager
				.getNamingService(nacosDiscoveryProperties.getNacosProperties());
		List<ServiceInfo> subscribe = Collections.emptyList();

		try {
			subscribe = namingService.getSubscribeServices();
			for (ServiceInfo serviceInfo : subscribe) {
                //获取应用实例,点击
				List<Instance> instances = namingService.getAllInstances(
						serviceInfo.getName(), serviceInfo.getGroupName());
				serviceInfo.setHosts(instances);
			}
		}
		catch (Exception e) {
			log.error("get subscribe services from nacos fail,", e);
		}
		result.put("subscribe", subscribe);
		return result;
	}

4、点击namingService.getAllInstances方法进入

com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String)

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException {//点击进入
        return getAllInstances(serviceName, groupName, new ArrayList<String>());
    }

来到

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters) throws NacosException {
        //点击
        return getAllInstances(serviceName, groupName, clusters, true);
    }

来到

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {
            //点击获取实例
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }

 5、进入com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }

        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);//点击进入
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {

            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //延时执行的定时任务,更新客户端缓存
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

来到 updateServiceNow(serviceName, clusters) 方法

com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow

    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            //正式获取服务列表,点击
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

 6、来到com.alibaba.nacos.client.naming.net.NamingProxy#queryList

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        //此处开始去nacos服务端获取列表
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

7、点击 scheduleUpdateIfAbsent(serviceName, clusters);

 public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            //定时获取服务端最新数据,并更新到本地的服务,点击UpdateTask
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

 8、点击UpdateTask 来到

com.alibaba.nacos.client.naming.core.HostReactor.UpdateTask#run

        @Override
        public void run() {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);//获取更新
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }

                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);//获取更新
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }

                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }

        }

二、服务端源码

com.alibaba.nacos.naming.controllers.InstanceController#list

1、入口

    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {

        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
        //核心入口
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
    }

点击

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {

                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                                pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }

        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }

        checkIfDisabled(service);

        List<Instance> srvedIPs;
        //获取服务的所有ip
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        // filter ips using selector:
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }

        if (CollectionUtils.isEmpty(srvedIPs)) {

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }

            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }

            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }

        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());
        //循环放入服务ip
        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }

        if (isCheck) {
            result.put("reachProtectThreshold", false);
        }

        double threshold = service.getProtectThreshold();

        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                result.put("reachProtectThreshold", true);
            }

            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }

        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);

            return JacksonUtils.createEmptyJsonNode();
        }

        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
        //循环封装数据
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();

            if (healthyOnly && !entry.getKey()) {
                continue;
            }

            for (Instance instance : ips) {

                // remove disabled instance:
                if (!instance.isEnabled()) {
                    continue;
                }

                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }

                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);

            }
        }
        //放到返回值里面
        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

 

到此,服务发现的源码流程分析完毕,下篇我们nacos配置中心源码,敬请期待!

相关文章:

  • 【MySQL】过年没有回老家,在出租屋里整理了一些思维导图
  • 《流浪地球 2》 Deepfake 小试牛刀,45+ 吴京「被」年轻,变身 21 岁小鲜肉
  • C++工程实践必备技能
  • GitHub访问问题与FastGithub下载及使用(详细篇)
  • <使用Python自定义生成简易二维码>——《Python项目实战》
  • Spring Boot 热部署(热加载)
  • 又一个开源工具搞完了,工作效率直接翻倍
  • 入职-环境安装篇
  • 自动驾驶感知——毫米波雷达
  • MySQL运维(二)MySQL分库分表概念及实战、读取分离详解
  • K8s简介之什么是K8s
  • webgl绘制图形API——drawArrays、drawElements
  • 不平衡数据集的建模的技巧和策略
  • 每天一道大厂SQL题【Day01】
  • 【JAVA核心知识】46:什么是零拷贝Zero-copy
  • 0基础学习移动端适配
  • 11111111
  • 4. 路由到控制器 - Laravel从零开始教程
  • ES6之路之模块详解
  • fetch 从初识到应用
  • JavaScript的使用你知道几种?(上)
  • Kibana配置logstash,报表一体化
  • magento 货币换算
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • vue-cli在webpack的配置文件探究
  • Vue--数据传输
  • XForms - 更强大的Form
  • 初识 webpack
  • 当SetTimeout遇到了字符串
  • 关于extract.autodesk.io的一些说明
  • 基于 Ueditor 的现代化编辑器 Neditor 1.5.4 发布
  • 今年的LC3大会没了?
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 爬虫模拟登陆 SegmentFault
  • 如何进阶一名有竞争力的程序员?
  • 数据仓库的几种建模方法
  • 项目管理碎碎念系列之一:干系人管理
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​ 无限可能性的探索:Amazon Lightsail轻量应用服务器引领数字化时代创新发展
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • ​业务双活的数据切换思路设计(下)
  • ​总结MySQL 的一些知识点:MySQL 选择数据库​
  • #常见电池型号介绍 常见电池尺寸是多少【详解】
  • #微信小程序:微信小程序常见的配置传值
  • $.ajax,axios,fetch三种ajax请求的区别
  • (2)STM32单片机上位机
  • (2022 CVPR) Unbiased Teacher v2
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (顺序)容器的好伴侣 --- 容器适配器
  • (算法设计与分析)第一章算法概述-习题
  • (原創) 如何讓IE7按第二次Ctrl + Tab時,回到原來的索引標籤? (Web) (IE) (OS) (Windows)...
  • (转)Scala的“=”符号简介
  • (转)Sublime Text3配置Lua运行环境