dubbo源码解析之服务调用(通信)流程
@TOC
文章系列
【一、dubbo源码解析之框架粗谈】
【二、dubbo源码解析之dubbo配置解析】
【三、dubbo源码解析之服务发布与注册】
【四、dubbo源码解析之服务发现】
【五、dubbo源码解析之服务调用(通信)流程】
【六、dubbo获取服务提供者IP列表】
一、服务发布
在 dubbo源码解析之服务发布与注册 一文中,存在步骤 4.4.3 服务发布,通过 DubboProtocol.export()
暴露一个本地端口,用于监听并处理客户端连接请求。
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
// key=serviceClassName + : + port,如com.example.demo.provider.DemoProvider:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.addExportMap(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。
openServer(url);
// 优先采用的序列化算法
optimizeSerialization(url);
return exporter;
}
}
如果采用Netty进行远程通信,最终会通创建一个 NettyServer
对象。
1.1 NettyServer
public class NettyServer extends AbstractServer implements RemotingServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
/**
* the cache for alive worker channel.
* <ip:port, dubbo channel>
*/
private Map<String, Channel> channels;
/**
* netty server bootstrap.
*/
private ServerBootstrap bootstrap;
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 会在父类的构造函数中,调用 doOpen() 方法:初始化并启动 netty 服务器
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
/**
* 初始化并启动 netty 服务器
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
在 NettyServer
中,主要是初始化并启动了一个 netty 服务器,然后构造了一个 Handler 处理链。
1.2 ChannlHandler 处理链
在 NettyServer
构造函数中,存在代码 ChannelHandlers.wrap(handler, url)
,返回一个 ChannlHandler 处理链。
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
// 调用其 wrapInternal 进行包装 handler
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
// 包装 handler
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
最终,ChannelHandler 链路如下:
MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> handler
其中 handler 为 DubboProtocol
中的成员变量。
二、服务发现
在 dubbo源码解析之服务发现 一文中,通过步骤 3.3 Reference.createProxy() 创建了并返回了一个代理对象(最终创建 Invoker 对象为 DubboInvoker)。
最终,创建的代理类结构如下:
三、服务调用(通信)流程
3.1 客户端代理类执行链路
在 DubboInvoker 打一个断点,通过对其执行链路分析,最终调用链路如下:
// 代理对象Invoker方法
1. org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke(Object proxy, Method method, Object[] args)
// 调用
2. result = invoker.invoke(rpcInvocation)
// 故障转移
2.1 MigrationInvoker#invoke(Invocation invocation)
// Mock
2.2 MockClusterInvoker#invoke(Invocation invocation)
// 集群拦截器:包含一些前置、后置拦截器
2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
// 执行拦截方法
2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)
2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)
// 集群容错模式
2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
// 包装
2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)
// 过滤器
2.3.5 FilterNode#invoke(Invocation invocation)
2.3.5.1 ConsumerContextFilter
2.3.5.2 FutureFilter
2.3.5.3 MonitorFilter
// 添加监听
2.3.6 ListenerInvokerWrapper
// 异步、同步处理器
2.3.7 AsyncToSyncInvoker
// dubbo调用
2.4 DubboInvoker
// 获取结果集
3. result.recreate()
1.InvokerInvocationHandler#invoke
通过 JavassistProxyFactory 生成,采用 JDK 动态代理,生成过程如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
可见,InvokerInvocationHandler
一定实现了 InvocationHandler
接口,最终,执行对象方法,都会进入到其 Object invoke(Object proxy, Method method, Object[] args)
中。
public class InvokerInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// Object方法直接执行
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
/**
* 构建 rpc 调用对象:核心对象,用于在rpc调用的时候进行传输
* method:需要调用的方法
* invoker.getInterface().getName():调用方法所属接口ClassName,
* 如com.example.demo.provider.TestProvider
* protocolServiceKey:如com.example.demo.provider.TestProvider:dubbo
* args:方法实参
*/
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
// 目标服务名称:如com.example.demo.provider.TestProvider
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// invoker.getUrl() returns consumer url.
// 为当前上下文添加url参数:如dubbo://127.0.0.1/com.example.demo.provider.TestProvider?省略其他参数...
RpcContext.setRpcContext(invoker.getUrl());
if (consumerModel != null) {
// 添加属性
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
// 调用:invoker.invoke(rpcInvocation)
// 获取结果:result.recreate()
return invoker.invoke(rpcInvocation).recreate();
}
}
2. result = invoker.invoke(rpcInvocation)
调用。
2.1 MigrationInvoker#invoke(Invocation invocation)
故障转移 Invoker。通过在 RegistryProtocol.doRefer()
进行创建,代码如下:
public class RegistryProtocol implements Protocol {
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 创建并获取一个 MigrationInvoker
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
protected <T> ClusterInvoker<T> getMigrationInvoker(RegistryProtocol registryProtocol, Cluster cluster, Registry registry,
Class<T> type, URL url, URL consumerUrl) {
return new ServiceDiscoveryMigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
}
}
MigrationInvoker
public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 检查 serviceDiscoveryInvoker 是否可用,初始不可用
if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + (invoker.getDirectory().getAllInvokers() == null ? "is null" : invoker.getDirectory().getAllInvokers().size()));
}
// 走这里
return invoker.invoke(invocation);
}
// 检查 invoker 是否可用
if (!checkInvokerAvailable(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + (serviceDiscoveryInvoker.getDirectory().getAllInvokers() == null ? " is null " : serviceDiscoveryInvoker.getDirectory().getAllInvokers().size()));
}
return serviceDiscoveryInvoker.invoke(invocation);
}
return currentAvailableInvoker.invoke(invocation);
}
}
2.2 MockClusterInvoker#invoke(Invocation invocation)
执行 Mock 逻辑,在 RegistryProtocol.doCreateInvoker()
中进行创建,代码如下:
public class RegistryProtocol implements Protocol {
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
directory.subscribe(toSubscribeUrl(urlToRegistry));
// 创建并返回 Invoker
// cluster 为ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap) 获取的对象
//默认情况下name=Cluster.DEFAULT=failover wrap=true
return (ClusterInvoker<T>) cluster.join(directory);
}
}
所以,cluster 对象为 MockClusterWrapper(FailoverCluster)
,当执行 cluster.join(directory)
逻辑,最终会先到 MockClusterWrapper
的 join()
方法中,进行包装增强。
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 创建一个 MockClusterInvoker
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
在 MockClusterWrapper
中,会创建一个 MockClusterInvoker
。
public class MockClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 获取url中的 mock 参数
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
// no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
// force 模式:直接返回mock数据
result = doMockInvoke(invocation, null);
} else {
//fail-mock
// 异常执行mock逻辑
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
}
2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
集群拦截器:包含一些前置、后置拦截器。
在 MockClusterWrapper 构造函数中创建 MockClusterInvoker 时,会通过 this.cluster.join(directory)
创建一个 invoker 对象返回,如下:
new MockClusterInvoker<T>(directory, this.cluster.join(directory));
其中,this.cluster 为 dubbo SPI 注入的一个 cluster 对象,默认为 FailoverCluster
。
public class FailoverCluster extends AbstractCluster {
public static final String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
当调用其 join(directory) 方法时,会进入 AbstractCluster#invoke(directory)
中,如下:
public abstract class AbstractCluster implements Cluster {
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 构建集群拦截器
return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}
/**
* 构建集群拦截器
* doJoin(directory):模版方法设计模式,执行子类的doJoin方法,默认情况下返回一个 FailoverClusterInvoker
* directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY):获取url中reference.interceptor参数值
*/
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
AbstractClusterInvoker<T> last = clusterInvoker;
// SPI
List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);
if (!interceptors.isEmpty()) {
for (int i = interceptors.size() - 1; i >= 0; i--) {
final ClusterInterceptor interceptor = interceptors.get(i);
final AbstractClusterInvoker<T> next = last;
// 责任链模式,返回一个 InterceptorInvokerNode
last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
}
}
return last;
}
}
InterceptorInvokerNode 为 AbstractCluster 中的一个内部类,所以最终执行 invoke() 方法时,进入 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
中,如下:
public abstract class AbstractCluster implements Cluster {
protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 前置拦截处理器
interceptor.before(next, invocation);
// 执行拦截方法,返回一个异步结果
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// onError callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
listener.onError(e, clusterInvoker, invocation);
}
throw e;
} finally {
// 后置拦截处理器
interceptor.after(next, invocation);
}
// 异步 CallBack
return asyncResult.whenCompleteWithContext((r, t) -> {
// onResponse callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
}
}
2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)
执行拦截方法,返回一个异步结果。
@SPI
public interface ClusterInterceptor {
void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
// 调用invoke,返回一个异步结果。
return clusterInvoker.invoke(invocation);
}
interface Listener {
void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
}
}
2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)
在 AbstractClusterInvoker#invoke(final Invocation invocation)
方法中存在几个重要步骤:
- 获取所有 invoker
- 初始化负载均衡算法
- 执行invoke
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
// 绑定当前上下文中其他参数
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// step1:获取所有 invoker
List<Invoker<T>> invokers = list(invocation);
// step2:初始化负载均衡算法
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// step3:执行invoke
return doInvoke(invocation, invokers, loadbalance);
}
}
step1:获取所有 invoker
从 directory 中获取所有可调用的 Invoker 对象
- DynamicDirectory:动态
- StaticDirectory:静态
step2:初始化负载均衡算法
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
// 默认随机
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
// 默认随机
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
dubbo 内置负载均衡算法如下:
- Random LoadBalance:随机,按权重设置随机概率(默认随机)。
- RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率。
- LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
- ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者。
step3:执行invoke
AbstractClusterInvoker 采用了抽象模版方法设计模式,实现类有如下(对应dubbo集群容错模式):
- FailoverClusterInvoker:失败自动切换,当出现失败,重试其它服务器。
- FailfastClusterInvoker:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
- FailsafeClusterInvoker:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
- FailbackClusterInvoker:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
- ForkingClusterInvoker:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
- BroadcastClusterInvoker:广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
默认采用 FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器。
2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker> invokers, LoadBalance loadbalance)
FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
/**
* 执行
* @param invocation:方法调用元数据
* @param Invokers:当前方法对应的所有url
* @param LoadBalance:负载均衡算法
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 失败自动切换,当出现失败,重试其它服务器
// len 默认为3,也就是说,执行一次,异常最多重试2次
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 根据负载均衡算法、invoker 列表 获取一个invoker 对象
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 执行调用 invoke
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}
在 FailoverClusterInvoker 的 doInvoke() 方法中,会根据负载均衡算法,从 invokers 列表中选举出一个可执行的 invoke 对象,进而执行其 invoke() 方法。
其中,invoker 对象为 RegistryDirectory.InvokerDelegate(FilterNode(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))),创建过程如下:
public class RegistryDirectory<T> extends DynamicDirectory<T> {
private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
// 省略其他代码.....
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
// 省略其他代码.....
}
}
2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)
包装。
public class RegistryDirectory<T> extends DynamicDirectory<T> {
private static class InvokerDelegate<T> extends InvokerWrapper<T> {
private URL providerUrl;
public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
super(invoker, url);
this.providerUrl = providerUrl;
}
public URL getProviderUrl() {
return providerUrl;
}
}
}
InvokerDelegate 为 RegistryDirectory 中的一个私有内部类,执行其 invoke() 方法,最终会调用其父类的 invoke() 方法,如下:
public class InvokerWrapper<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
// invoker -> FilterNode
return invoker.invoke(invocation);
}
}
2.3.5 FilterNode#invoke(Invocation invocation)
过滤器节点。
class FilterNode<T> implements Invoker<T>{
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 执行其过滤链
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
}
其中 filter 对象为 dubbo SPI 扩展点(在 ProtocolFilterWrapper#buildInvokerChain()
方法中进行获取构建。)。
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
2.3.5.1 ConsumerContextFilter
消费者上下文过滤器。
@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext context = RpcContext.getContext();
context.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
.setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
// pass default timeout set by end user (ReferenceConfig)
Object countDown = context.get(TIME_COUNTDOWN_KEY);
if (countDown != null) {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
return invoker.invoke(invocation);
}
}
2.3.5.2 FutureFilter
未来结果集处理Filter。
@Activate(group = CommonConstants.CONSUMER)
public class FutureFilter implements Filter, Filter.Listener {
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
// 触发调用回调
fireInvokeCallback(invoker, invocation);
// 调用前需要配置是否有返回值,以帮助调用者判断是否需要返回future。
return invoker.invoke(invocation);
}
}
2.3.5.3 MonitorFilter
监控。
@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter implements Filter, Filter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getContext().getRemoteHost());
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
return invoker.invoke(invocation); // proceed invocation chain
}
}
2.3.6 ListenerInvokerWrapper
public class ListenerInvokerWrapper<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
2.3.7 AsyncToSyncInvoker
异步、同步处理器
public class AsyncToSyncInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 执行 AbstractInvoker.invoker
Result asyncResult = invoker.invoke(invocation);
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
/**
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else {
throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
}
2.4 DubboInvoker
dubbo调用。
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取客户端连接
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否单向:根据有无返回值判断
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超时时间
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
if (isOneway) { // 无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 通过Client端连接,进行远程通信
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else { // 有返回值
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 通过Client端连接,进行远程通信
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
// 保持异步结果处理对象到当前请求上下文中
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
// 方法
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
3. result.recreate()
获取结果集。
result 为 AsyncRpcResult 对象。
public class AsyncRpcResult implements Result {
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
return RpcContext.getContext().getFuture();
}
return getAppResponse().recreate();
}
}
3.2 服务端 ChannelHandler 处理链路
通过对 dubbo 服务发布的分析,最终,在服务收到客户端请求后,ChannelHandler 处理链路如下:
1. MultiMessageHandler
2. HeartbeatHandler
3. AllChannelHandler
4. DecodeHandler
5. DubboProtocol.handler
3.2.1 MultiMessageHandler
多消息处理Handler。
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 判断是否为 MultiMessage
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
// 循环调用
for (Object obj : list) {
try {
handler.received(channel, obj);
} catch (ExecutionException e) {
logger.error("MultiMessageHandler received fail.", e);
handler.caught(channel, e);
}
}
} else {
handler.received(channel, message);
}
}
}
3.2.2 HeartbeatHandler
心跳处理Handler。
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
// 判断是否为心跳请求
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
}
3.2.3 AllChannelHandler
处理客户端调用请求。
public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 异步执行一个 ChannelEventRunnable
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
ChannelEventRunnable
public class ChannelEventRunnable implements Runnable {
@Override
public void run() {
// 处理客户端请求
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED: // 连接
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED: // 断开连接
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT: // 发送
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}
3.2.4 DecodeHandler
解码Handler。
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
private void decode(Object message) {
if (message instanceof Decodeable) {
try {
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
3.2.5 DubboProtocol.handler
真正调用服务端方法的Handler。
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
/**
* 处理客户端方法调用请求,执行服务端方法,并方法
*/
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get(METHODS_KEY);
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(COMMA_SEPARATOR)) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(COMMA_SEPARATOR);
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行本地方法
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isDebugEnabled()) {
logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
}
最终会执行 JavassistProxyFactory 生成的 AbstractProxyInvoker 对象中的方法。
3.3 客户端收到服务端执行结果后处理
在进行 NettyClient 连接创建时,会传入一个 Handler,如下:
public class NettyClient extends AbstractClient {
@Override
protected void doOpen() throws Throwable {
// 处理Handler
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
}
所以当收到服务端响应结果后,会执行 NettyClientHandler#channelRead()
方法,进行处理,最终,会进入 HeaderExchangeHandler#received()
方法中。
3.3.1 HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 处理服务端响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
// 处理服务端响应结果
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
3.3.2 DefaultFuture
public class DefaultFuture extends CompletableFuture<Object> {
// 处理服务端响应结果
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
// 处理服务端响应结果
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) { // 是否执行成功
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 超时
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else { // 远程服务异常
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
// the result is returning, but the caller thread may still waiting
// to avoid endless waiting for whatever reason, notify caller thread to return.
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
}