当前位置: 首页 > news >正文

dubbo源码解析之服务调用(通信)流程

@TOC

文章系列

【一、dubbo源码解析之框架粗谈】
【二、dubbo源码解析之dubbo配置解析】
【三、dubbo源码解析之服务发布与注册】
【四、dubbo源码解析之服务发现】
【五、dubbo源码解析之服务调用(通信)流程】
【六、dubbo获取服务提供者IP列表】

一、服务发布

在 dubbo源码解析之服务发布与注册 一文中,存在步骤 4.4.3 服务发布,通过 DubboProtocol.export() 暴露一个本地端口,用于监听并处理客户端连接请求。

public class DubboProtocol extends AbstractProtocol {
	@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
		
		// key=serviceClassName + : + port,如com.example.demo.provider.DemoProvider:20880
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.addExportMap(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }

		// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。
        openServer(url);
        // 优先采用的序列化算法
        optimizeSerialization(url);

        return exporter;
    }
}

如果采用Netty进行远程通信,最终会通创建一个 NettyServer 对象。

1.1 NettyServer

public class NettyServer extends AbstractServer implements RemotingServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    /**
     * the cache for alive worker channel.
     * <ip:port, dubbo channel>
     */
    private Map<String, Channel> channels;
    /**
     * netty server bootstrap.
     */
    private ServerBootstrap bootstrap;
    /**
     * the boss channel that receive connections and dispatch these to worker channel.
     */
	private io.netty.channel.Channel channel;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    	// 会在父类的构造函数中,调用 doOpen() 方法:初始化并启动 netty 服务器
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }

    /**
     * 初始化并启动 netty 服务器
     */
    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }
}

NettyServer 中,主要是初始化并启动了一个 netty 服务器,然后构造了一个 Handler 处理链。

1.2 ChannlHandler 处理链

NettyServer 构造函数中,存在代码 ChannelHandlers.wrap(handler, url),返回一个 ChannlHandler 处理链。

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    	// 调用其 wrapInternal 进行包装 handler
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

	// 包装 handler
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

最终,ChannelHandler 链路如下:

MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> handler

在这里插入图片描述
其中 handler 为 DubboProtocol 中的成员变量。
在这里插入图片描述

二、服务发现

在 dubbo源码解析之服务发现 一文中,通过步骤 3.3 Reference.createProxy() 创建了并返回了一个代理对象(最终创建 Invoker 对象为 DubboInvoker)。

最终,创建的代理类结构如下:

在这里插入图片描述

三、服务调用(通信)流程

3.1 客户端代理类执行链路

在 DubboInvoker 打一个断点,通过对其执行链路分析,最终调用链路如下:

// 代理对象Invoker方法
1. org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke(Object proxy, Method method, Object[] args)

// 调用
2. result = invoker.invoke(rpcInvocation)
	// 故障转移
	2.1 MigrationInvoker#invoke(Invocation invocation)
	// Mock
	2.2 MockClusterInvoker#invoke(Invocation invocation)
	// 集群拦截器:包含一些前置、后置拦截器
	2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
		// 执行拦截方法
		2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)
		2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)
		// 集群容错模式
		2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
		// 包装
		2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)
		// 过滤器
		2.3.5 FilterNode#invoke(Invocation invocation)
			  2.3.5.1 ConsumerContextFilter
			  2.3.5.2 FutureFilter
			  2.3.5.3 MonitorFilter
		// 添加监听
		2.3.6 ListenerInvokerWrapper
		// 异步、同步处理器
		2.3.7 AsyncToSyncInvoker
	// dubbo调用
	2.4 DubboInvoker

// 获取结果集
3. result.recreate()

1.InvokerInvocationHandler#invoke

通过 JavassistProxyFactory 生成,采用 JDK 动态代理,生成过程如下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

可见,InvokerInvocationHandler 一定实现了 InvocationHandler 接口,最终,执行对象方法,都会进入到其 Object invoke(Object proxy, Method method, Object[] args) 中。

public class InvokerInvocationHandler implements InvocationHandler {
	@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    	// Object方法直接执行
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }

		/**
		 * 构建 rpc 调用对象:核心对象,用于在rpc调用的时候进行传输
		 * method:需要调用的方法
		 * invoker.getInterface().getName():调用方法所属接口ClassName,
		 *                                   如com.example.demo.provider.TestProvider
		 * protocolServiceKey:如com.example.demo.provider.TestProvider:dubbo
		 * args:方法实参
		 */
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
        
        // 目标服务名称:如com.example.demo.provider.TestProvider
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);

        // invoker.getUrl() returns consumer url.
        // 为当前上下文添加url参数:如dubbo://127.0.0.1/com.example.demo.provider.TestProvider?省略其他参数...
        RpcContext.setRpcContext(invoker.getUrl());

        if (consumerModel != null) {
        	// 添加属性
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

		// 调用:invoker.invoke(rpcInvocation)
		// 获取结果:result.recreate()
        return invoker.invoke(rpcInvocation).recreate();
    }
}

2. result = invoker.invoke(rpcInvocation)

调用。

2.1 MigrationInvoker#invoke(Invocation invocation)

故障转移 Invoker。通过在 RegistryProtocol.doRefer() 进行创建,代码如下:

public class RegistryProtocol implements Protocol {
	protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
        URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 创建并获取一个 MigrationInvoker
        ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
        return interceptInvoker(migrationInvoker, url, consumerUrl);
    }
    
    protected <T> ClusterInvoker<T> getMigrationInvoker(RegistryProtocol registryProtocol, Cluster cluster, Registry registry,
                                                        Class<T> type, URL url, URL consumerUrl) {
        return new ServiceDiscoveryMigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
    }
}

MigrationInvoker

public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
    	// 检查 serviceDiscoveryInvoker 是否可用,初始不可用
        if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + (invoker.getDirectory().getAllInvokers() == null ? "is null" : invoker.getDirectory().getAllInvokers().size()));
            }
            // 走这里
            return invoker.invoke(invocation);
        }

    	// 检查 invoker 是否可用
        if (!checkInvokerAvailable(invoker)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + (serviceDiscoveryInvoker.getDirectory().getAllInvokers() == null ? " is null " : serviceDiscoveryInvoker.getDirectory().getAllInvokers().size()));
            }
            return serviceDiscoveryInvoker.invoke(invocation);
        }

        return currentAvailableInvoker.invoke(invocation);
    }
}

2.2 MockClusterInvoker#invoke(Invocation invocation)

执行 Mock 逻辑,在 RegistryProtocol.doCreateInvoker() 中进行创建,代码如下:

public class RegistryProtocol implements Protocol {
	protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(urlToRegistry);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(urlToRegistry);
        directory.subscribe(toSubscribeUrl(urlToRegistry));
		
		// 创建并返回 Invoker
		// cluster 为ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap) 获取的对象
		//默认情况下name=Cluster.DEFAULT=failover  wrap=true
        return (ClusterInvoker<T>) cluster.join(directory);
    }
}

所以,cluster 对象为 MockClusterWrapper(FailoverCluster) ,当执行 cluster.join(directory) 逻辑,最终会先到 MockClusterWrapperjoin() 方法中,进行包装增强。

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    	// 创建一个 MockClusterInvoker
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

MockClusterWrapper 中,会创建一个 MockClusterInvoker

public class MockClusterInvoker<T> implements ClusterInvoker<T> {
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

		// 获取url中的 mock 参数
        String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            // no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
            }
            //force:direct mock
            // force 模式:直接返回mock数据
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            // 异常执行mock逻辑
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }
}

2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)

集群拦截器:包含一些前置、后置拦截器

在 MockClusterWrapper 构造函数中创建 MockClusterInvoker 时,会通过 this.cluster.join(directory) 创建一个 invoker 对象返回,如下:

new MockClusterInvoker<T>(directory, this.cluster.join(directory));

其中,this.cluster 为 dubbo SPI 注入的一个 cluster 对象,默认为 FailoverCluster

public class FailoverCluster extends AbstractCluster {

    public static final String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }
}

当调用其 join(directory) 方法时,会进入 AbstractCluster#invoke(directory) 中,如下:

public abstract class AbstractCluster implements Cluster {
	@Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    	// 构建集群拦截器
        return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
    }
    
    /**
   	 * 构建集群拦截器 
   	 * doJoin(directory):模版方法设计模式,执行子类的doJoin方法,默认情况下返回一个 FailoverClusterInvoker
   	 * directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY):获取url中reference.interceptor参数值
   	 */
    private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
        AbstractClusterInvoker<T> last = clusterInvoker;
        // SPI
        List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);

        if (!interceptors.isEmpty()) {
            for (int i = interceptors.size() - 1; i >= 0; i--) {
                final ClusterInterceptor interceptor = interceptors.get(i);
                final AbstractClusterInvoker<T> next = last;
                // 责任链模式,返回一个 InterceptorInvokerNode
                last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
            }
        }
        return last;
    }
}

InterceptorInvokerNode 为 AbstractCluster 中的一个内部类,所以最终执行 invoke() 方法时,进入 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation) 中,如下:

public abstract class AbstractCluster implements Cluster {
	protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
		@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
            	// 前置拦截处理器
                interceptor.before(next, invocation);
                // 执行拦截方法,返回一个异步结果
                asyncResult = interceptor.intercept(next, invocation);
            } catch (Exception e) {
                // onError callback
                if (interceptor instanceof ClusterInterceptor.Listener) {
                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                    listener.onError(e, clusterInvoker, invocation);
                }
                throw e;
            } finally {
            	// 后置拦截处理器
                interceptor.after(next, invocation);
            }
            // 异步 CallBack
            return asyncResult.whenCompleteWithContext((r, t) -> {
                // onResponse callback
                if (interceptor instanceof ClusterInterceptor.Listener) {
                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                    if (t == null) {
                        listener.onMessage(r, clusterInvoker, invocation);
                    } else {
                        listener.onError(t, clusterInvoker, invocation);
                    }
                }
            });
        }
	}
}
2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)

执行拦截方法,返回一个异步结果。

@SPI
public interface ClusterInterceptor {

    void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
    	// 调用invoke,返回一个异步结果。
        return clusterInvoker.invoke(invocation);
    }

    interface Listener {

        void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

        void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
    }
}
2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)

AbstractClusterInvoker#invoke(final Invocation invocation) 方法中存在几个重要步骤:

  1. 获取所有 invoker
  2. 初始化负载均衡算法
  3. 执行invoke
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
	@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();

        // binding attachments into invocation.
        // 绑定当前上下文中其他参数
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

		// step1:获取所有 invoker
        List<Invoker<T>> invokers = list(invocation);

		// step2:初始化负载均衡算法
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        
        // step3:执行invoke
        return doInvoke(invocation, invokers, loadbalance);
    }
}

step1:获取所有 invoker

从 directory 中获取所有可调用的 Invoker 对象

  • DynamicDirectory:动态
  • StaticDirectory:静态

step2:初始化负载均衡算法

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation 	invocation) {
   if (CollectionUtils.isNotEmpty(invokers)) {
   		// 默认随机
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
               .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
   } else {
   		// 默认随机
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
   }
}

dubbo 内置负载均衡算法如下:

  • Random LoadBalance:随机,按权重设置随机概率(默认随机)。
  • RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率。
  • LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
  • ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者。

step3:执行invoke

AbstractClusterInvoker 采用了抽象模版方法设计模式,实现类有如下(对应dubbo集群容错模式):

  • FailoverClusterInvoker:失败自动切换,当出现失败,重试其它服务器。
  • FailfastClusterInvoker:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
  • FailsafeClusterInvoker:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
  • FailbackClusterInvoker:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
  • ForkingClusterInvoker:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
  • BroadcastClusterInvoker:广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

默认采用 FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器。

2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker> invokers, LoadBalance loadbalance)

FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
	/**
	 * 执行
	 * @param invocation:方法调用元数据
	 * @param Invokers:当前方法对应的所有url
	 * @param LoadBalance:负载均衡算法
	 */
	@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = calculateInvokeTimes(methodName);
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
	
		// 失败自动切换,当出现失败,重试其它服务器
		// len 默认为3,也就是说,执行一次,异常最多重试2次
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            
            // 根据负载均衡算法、invoker 列表 获取一个invoker 对象
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
            
            	// 执行调用 invoke
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

在 FailoverClusterInvoker 的 doInvoke() 方法中,会根据负载均衡算法,从 invokers 列表中选举出一个可执行的 invoke 对象,进而执行其 invoke() 方法。

其中,invoker 对象为 RegistryDirectory.InvokerDelegate(FilterNode(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))),创建过程如下:

public class RegistryDirectory<T> extends DynamicDirectory<T> {
	private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
		// 省略其他代码.....
	
		invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
		
		// 省略其他代码.....
	}
}
2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)

包装。

public class RegistryDirectory<T> extends DynamicDirectory<T> {
	private static class InvokerDelegate<T> extends InvokerWrapper<T> {
        private URL providerUrl;

        public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
            super(invoker, url);
            this.providerUrl = providerUrl;
        }

        public URL getProviderUrl() {
            return providerUrl;
        }
    }
}

InvokerDelegate 为 RegistryDirectory 中的一个私有内部类,执行其 invoke() 方法,最终会调用其父类的 invoke() 方法,如下:

public class InvokerWrapper<T> implements Invoker<T> {
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
    	// invoker  ->   FilterNode
        return invoker.invoke(invocation);
    }
}
2.3.5 FilterNode#invoke(Invocation invocation)

过滤器节点。

class FilterNode<T> implements Invoker<T>{
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
        	// 执行其过滤链
            asyncResult = filter.invoke(next, invocation);
        } catch (Exception e) {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                try {
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    if (listener != null) {
                        listener.onError(e, invoker, invocation);
                    }
                } finally {
                    listenableFilter.removeListener(invocation);
                }
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                listener.onError(e, invoker, invocation);
            }
            throw e;
        } finally {

        }
        return asyncResult.whenCompleteWithContext((r, t) -> {
            if (filter instanceof ListenableFilter) {
                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                Filter.Listener listener = listenableFilter.listener(invocation);
                try {
                    if (listener != null) {
                        if (t == null) {
                            listener.onResponse(r, invoker, invocation);
                        } else {
                            listener.onError(t, invoker, invocation);
                        }
                    }
                } finally {
                    listenableFilter.removeListener(invocation);
                }
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                if (t == null) {
                    listener.onResponse(r, invoker, invocation);
                } else {
                    listener.onError(t, invoker, invocation);
                }
            }
        });
    }
}

其中 filter 对象为 dubbo SPI 扩展点(在 ProtocolFilterWrapper#buildInvokerChain() 方法中进行获取构建。)。

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
2.3.5.1 ConsumerContextFilter

消费者上下文过滤器。

@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext context = RpcContext.getContext();
        context.setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
                .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
                .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }

        // pass default timeout set by end user (ReferenceConfig)
        Object countDown = context.get(TIME_COUNTDOWN_KEY);
        if (countDown != null) {
            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
            if (timeoutCountDown.isExpired()) {
                return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
                        "No time left for making the following call: " + invocation.getServiceName() + "."
                                + invocation.getMethodName() + ", terminate directly."), invocation);
            }
        }
        return invoker.invoke(invocation);
    }
}
2.3.5.2 FutureFilter

未来结果集处理Filter。

@Activate(group = CommonConstants.CONSUMER)
public class FutureFilter implements Filter, Filter.Listener {
	@Override
    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    	// 触发调用回调
        fireInvokeCallback(invoker, invocation);
        // 调用前需要配置是否有返回值,以帮助调用者判断是否需要返回future。
        return invoker.invoke(invocation);
    }
}
2.3.5.3 MonitorFilter

监控。

@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter implements Filter, Filter.Listener {
	@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
            invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getContext().getRemoteHost());
            getConcurrent(invoker, invocation).incrementAndGet(); // count up
        }
        return invoker.invoke(invocation); // proceed invocation chain
    }
}
2.3.6 ListenerInvokerWrapper
public class ListenerInvokerWrapper<T> implements Invoker<T> {
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }
}
2.3.7 AsyncToSyncInvoker

异步、同步处理器

public class AsyncToSyncInvoker<T> implements Invoker<T> {
	@Override
    public Result invoke(Invocation invocation) throws RpcException {
    	// 执行 AbstractInvoker.invoker
        Result asyncResult = invoker.invoke(invocation);

        try {
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                /**
                 * NOTICE!
                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                 */
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else {
                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
        return asyncResult;
    }
}

2.4 DubboInvoker

dubbo调用。

public class DubboInvoker<T> extends AbstractInvoker<T> {
	@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

		// 获取客户端连接
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
        	// 是否单向:根据有无返回值判断
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 超时时间
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
            if (isOneway) { // 无返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 通过Client端连接,进行远程通信
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else { // 有返回值
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                // 通过Client端连接,进行远程通信
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                
                // 保持异步结果处理对象到当前请求上下文中
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                // 方法
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

3. result.recreate()

获取结果集。

result 为 AsyncRpcResult 对象。

public class AsyncRpcResult implements Result {
	@Override
    public Object recreate() throws Throwable {
        RpcInvocation rpcInvocation = (RpcInvocation) invocation;
        if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
            return RpcContext.getContext().getFuture();
        }

        return getAppResponse().recreate();
    }
}

3.2 服务端 ChannelHandler 处理链路

通过对 dubbo 服务发布的分析,最终,在服务收到客户端请求后,ChannelHandler 处理链路如下:

1. MultiMessageHandler
2. HeartbeatHandler
3. AllChannelHandler
4. DecodeHandler
5. DubboProtocol.handler

3.2.1 MultiMessageHandler

多消息处理Handler。

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
    	// 判断是否为 MultiMessage
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            // 循环调用
            for (Object obj : list) {
                try {
                    handler.received(channel, obj);
                } catch (ExecutionException e) {
                    logger.error("MultiMessageHandler received fail.", e);
                    handler.caught(channel, e);
                }
            }
        } else {
            handler.received(channel, message);
        }
    }
}

3.2.2 HeartbeatHandler

心跳处理Handler。

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
	@Override
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        
        // 判断是否为心跳请求
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
            }
            return;
        }
        handler.received(channel, message);
    }
}

3.2.3 AllChannelHandler

处理客户端调用请求。

public class AllChannelHandler extends WrappedChannelHandler {
	@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
        	// 异步执行一个 ChannelEventRunnable
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

ChannelEventRunnable

public class ChannelEventRunnable implements Runnable {
@Override
    public void run() {
    	// 处理客户端请求
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED: // 连接
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED: // 断开连接
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT: // 发送
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}

3.2.4 DecodeHandler

解码Handler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

    private void decode(Object message) {
        if (message instanceof Decodeable) {
            try {
                ((Decodeable) message).decode();
                if (log.isDebugEnabled()) {
                    log.debug("Decode decodeable message " + message.getClass().getName());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode
}

3.2.5 DubboProtocol.handler

真正调用服务端方法的Handler。

public class DubboProtocol extends AbstractProtocol {
	private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

		/**
		 * 处理客户端方法调用请求,执行服务端方法,并方法
		 */
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get(METHODS_KEY);
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(COMMA_SEPARATOR)) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(COMMA_SEPARATOR);
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // 执行本地方法
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isDebugEnabled()) {
                logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }

            RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
            invocation.setAttachment(PATH_KEY, url.getPath());
            invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
            invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
            if (url.getParameter(STUB_EVENT_KEY, false)) {
                invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
            }

            return invocation;
        }
    };
}

最终会执行 JavassistProxyFactory 生成的 AbstractProxyInvoker 对象中的方法。

3.3 客户端收到服务端执行结果后处理

在进行 NettyClient 连接创建时,会传入一个 Handler,如下:

public class NettyClient extends AbstractClient {
	@Override
    protected void doOpen() throws Throwable {
    	// 处理Handler
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(EVENT_LOOP_GROUP)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(socketChannelClass());

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
                }

                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                        .addLast("handler", nettyClientHandler);

                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }
}

所以当收到服务端响应结果后,会执行 NettyClientHandler#channelRead() 方法,进行处理,最终,会进入 HeaderExchangeHandler#received() 方法中。

3.3.1 HeaderExchangeHandler

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
	@Override
    public void received(Channel channel, Object message) throws RemotingException {
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
        	// 处理服务端响应
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    }

	// 处理服务端响应结果
    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}

3.3.2 DefaultFuture

public class DefaultFuture extends CompletableFuture<Object> {
	// 处理服务端响应结果
	public static void received(Channel channel, Response response) {
        received(channel, response, false);
    }

	public static void received(Channel channel, Response response, boolean timeout) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
                // 处理服务端响应结果
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response status is " + response.getStatus()
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) { // 是否执行成功
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 超时
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else { // 远程服务异常
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }
}

四、总结

在这里插入图片描述

相关文章:

  • Linux网络技术学习(四)—— 用户空间与内核的接口
  • Django--ORM 多表查询
  • pytest 运行方式、常用参数、前后置条件
  • MySQL-1-SQL讲解
  • 数据结构与算法之美读书笔记15
  • msf辅助模块详细操作
  • 【移动端网页特效】02-移动端轮播图(原生JS)
  • 神经网络(十二)卷积神经网络DLC
  • vue3.x 组件间传参
  • Tomcat域名访问文件出现访问不到的问题
  • BATJ 互联网公司面试必问知识点:Spring 全家桶全解,java 分布式框架技术方案
  • RabbitMQ(一)消息队列
  • Pycharm打开时一直加载中?解决办法来了~
  • Spring Security 如何防止 Session Fixation 攻击
  • 【C++基础】1. 基本使用
  • 分享的文章《人生如棋》
  • 《用数据讲故事》作者Cole N. Knaflic:消除一切无效的图表
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • nodejs调试方法
  • Theano - 导数
  • - 概述 - 《设计模式(极简c++版)》
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 码农张的Bug人生 - 见面之礼
  • 收藏好这篇,别再只说“数据劫持”了
  • nb
  • MiKTeX could not find the script engine ‘perl.exe‘ which is required to execute ‘latexmk‘.
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • ​无人机石油管道巡检方案新亮点:灵活准确又高效
  • ###C语言程序设计-----C语言学习(3)#
  • (C语言)球球大作战
  • (js)循环条件满足时终止循环
  • (第二周)效能测试
  • (附源码)spring boot校园健康监测管理系统 毕业设计 151047
  • (一)基于IDEA的JAVA基础1
  • (原+转)Ubuntu16.04软件中心闪退及wifi消失
  • .aanva
  • .gitignore文件_Git:.gitignore
  • .Net Remoting(分离服务程序实现) - Part.3
  • .net 重复调用webservice_Java RMI 远程调用详解,优劣势说明
  • @require_PUTNameError: name ‘require_PUT‘ is not defined 解决方法
  • @selector(..)警告提示
  • @SpringBootApplication 包含的三个注解及其含义
  • [⑧ADRV902x]: Digital Pre-Distortion (DPD)学习笔记
  • [AIGC] MySQL存储引擎详解
  • [Android]Android开发入门之HelloWorld
  • [Android]通过PhoneLookup读取所有电话号码
  • [AutoSar]状态管理(五)Dcm与BswM、EcuM的复位实现
  • [C# 基础知识系列]专题十六:Linq介绍
  • [Contiki系列论文之2]WSN的自适应通信架构
  • [Editor]Unity Editor类常用方法
  • [flink总结]什么是flink背压 ,有什么危害? 如何解决flink背压?flink如何保证端到端一致性?
  • [HUBUCTF 2022 新生赛]
  • [Java并发编程实战] 共享对象之可见性