当前位置: 首页 > news >正文

HDFSRPC通信框架详解

本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。

写在前面

rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。

https://blog.csdn.net/yhl_jxy/article/details/79332092

还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。

RPC框架架构图

1871_2.jpeg

从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。

Listener

listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。

Listener.run、doAccpect

public void run() {LOG.info(Thread.currentThread().getName() + ": starting");Server.connectionManager.startIdleScan();while (Server.running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);Server.connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsServer.connectionManager.stopIdleScan();Server.connectionManager.closeAll();}}void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = Server.connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}Server.connectionManager.droppedConnections.getAndIncrement();continue;}key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}

简单来说就是accept channel,变成connection,然后交给reader处理。

Reader

Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:

  1. rpc connection初始7字节的检查。
  2. sasl握手与验证。
  3. IpcConnectionContext读取。
  4. processOneRpc准备工作,包括RequestHeaderProto解析。

还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。

reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。

@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (Server.running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isReadable()) {doRead(key);}} catch (CancelledKeyException cke) {// something else closed the connection, ex. responder or// the listener doing an idle scan.  ignore it and let them// clean up.LOG.info(Thread.currentThread().getName() +": connection aborted from " + key.attachment());}key = null;}} catch (InterruptedException e) {if (Server.running) {                      // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);} catch (Throwable re) {LOG.error("Bug in read selector!", re);//ExitUtil.terminate(1, "Bug in read selector!");}}}//from Listener doReadvoid doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return;  }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Any exceptions that reach here are fatal unexpected internal errors// that could not be sent to the client.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c +" threw exception [" + e + "]", e);count = -1; //so that the (count < 0) block is executed}// setupResponse will signal the connection should be closed when a// fatal response is sent.if (count < 0 || c.shouldClose()) {Server.closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}   

CallQueue

callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。

Handler

handler线程也比较简单,实际上就是执行了call.run()。

@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");while (Server.running) {try {final Call call = Server.callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call);}CurCall.set(call);/*TODOUserGroupInformation remoteUser = call.getRemoteUser();if (remoteUser != null) {remoteUser.doAs(call);} else {call.run();}*/call.run();} catch (InterruptedException e) {if (Server.running) {                          // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);} finally {CurCall.set(null);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}

主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。

Protocols

每个server都自己的Protocols,protocols首先是以rpcKind分类的。

enum RpcKindProto {RPC_BUILTIN          = 0;  // Used for built in calls by testsRPC_WRITABLE         = 1;  // Use WritableRpcEngine RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
}

3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。

RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。

Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);

key为ProtoNameVer,要注意的hashcode的实现方法。

static class ProtoNameVer {final String protocol;final long   version;ProtoNameVer(String protocol, long ver) {this.protocol = protocol;this.version = ver;}@Overridepublic boolean equals(Object o) {if (o == null) return false;if (this == o) return true;if (! (o instanceof ProtoNameVer))return false;ProtoNameVer pv = (ProtoNameVer) o;return ((pv.protocol.equals(this.protocol)) && (pv.version == this.version));     }@Overridepublic int hashCode() {return protocol.hashCode() * 37 + (int) version;    }}

所以任何protocol必须有protocol和version,即注解类ProtocolInfo。

@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {String protocolName();  // the name of the protocol (i.e. rpc service)long protocolVersion() default -1; // default means not defined use old way
}

一个protocol的接口类类似这样。

@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/*** Protocol that a clients use to communicate with the NameNode.** Note: This extends the protocolbuffer service based interface to* add annotations required for security.*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}

那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。

service ClientNamenodeProtocol {rpc getBlockLocations(GetBlockLocationsRequestProto)returns(GetBlockLocationsResponseProto);rpc getServerDefaults(GetServerDefaultsRequestProto)returns(GetServerDefaultsResponseProto);rpc create(CreateRequestProto)returns(CreateResponseProto);rpc append(AppendRequestProto) returns(AppendResponseProto);rpc setReplication(SetReplicationRequestProto)returns(SetReplicationResponseProto);rpc setStoragePolicy(SetStoragePolicyRequestProto)...
}

编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。

我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。

//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);    

最后call.run其实是根据RequestHeaderProto来找到对应的实现类。

message RequestHeaderProto {/** Name of the RPC method */required string methodName = 1;/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get metainfo* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto*/required string declaringClassProtocolName = 2;/** protocol version of class declaring the called method */required uint64 clientProtocolVersion = 3;
}

然后通过反射,去执行了实现类的方法。

 Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;RequestHeaderProto rpcRequest = request.getRequestHeader();String methodName = rpcRequest.getMethodName();/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get info* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto.*/String declaringClassProtoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = request.getValue(prototype);Message result = null;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;boolean isDeferred = false;try {//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);// Check if this needs to be a deferred response,// by checking the ThreadLocal callback being set} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);//if (LOG.isDebugEnabled()) {String msg ="Served: " + methodName + (isDeferred ? ", deferred" : "") +", queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}//LOG.debug(msg);LOG.info(msg);//LOG.info("params:" + param.toString());//LOG.info("result:" + result.toString());//}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);}return RpcWritable.wrap(result);}

完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。

ResponseQueue

在connection中,主要存放处理完的rpccall。

Responder

Responder线程主要负责call结果的返回。

 private boolean processResponse(LinkedList<RpcCall> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false;       // there is more data for this channel.int numElements = 0;RpcCall call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true;              // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = call.connection.channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) {    // last call fully processes.done = true;             // no more data for this channel.} else {done = false;            // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false;              // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true;               // error. no more data for this channel.Server.closeConnection(call.connection);}}return done;}

相关文章:

  • 2024 年广西职业院校技能大赛高职组《云计算应用》赛项样卷
  • wireshark流量分析
  • 数值分析复习:逼近理论的应用——最小二乘问题、解超定、欠定方程组
  • C语言例4-35:鸡翁一,值钱五;鸡母一,值钱三;鸡雏三,值钱一。百钱买百鸡、问鸡翁、鸡母和鸡雏各几何?
  • 极简wordpress网站模板
  • pip永久修改镜像地址
  • 【Flink】Flink 处理函数之基本处理函数(一)
  • STM32-01基于HAL库(CubeMX+MDK+Proteus)仿真开发环境搭建(LED点亮测试实例)
  • Redis 教程系列之Redis 事务(十六)
  • 前端理论总结(css3)——css优化的方法
  • 线程通信同步
  • 低代码与数字化转型:重塑企业技术生态的新引擎
  • STM32之HAL开发——串口配置(源码)
  • mac系统使用经验
  • Etcd 基本入门
  • [case10]使用RSQL实现端到端的动态查询
  • Consul Config 使用Git做版本控制的实现
  • C学习-枚举(九)
  • Debian下无root权限使用Python访问Oracle
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • JavaScript设计模式系列一:工厂模式
  • JS函数式编程 数组部分风格 ES6版
  • Redis 懒删除(lazy free)简史
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • Spring思维导图,让Spring不再难懂(mvc篇)
  • SQLServer插入数据
  • Webpack4 学习笔记 - 01:webpack的安装和简单配置
  • windows-nginx-https-本地配置
  • 笨办法学C 练习34:动态数组
  • 聊聊redis的数据结构的应用
  • 前端性能优化--懒加载和预加载
  • 使用parted解决大于2T的磁盘分区
  • 推荐一款sublime text 3 支持JSX和es201x 代码格式化的插件
  • 因为阿里,他们成了“杭漂”
  • 鱼骨图 - 如何绘制?
  • 再谈express与koa的对比
  • ionic入门之数据绑定显示-1
  • kubernetes资源对象--ingress
  • # 安徽锐锋科技IDMS系统简介
  • ###C语言程序设计-----C语言学习(6)#
  • (1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)
  • (16)UiBot:智能化软件机器人(以头歌抓取课程数据为例)
  • (30)数组元素和与数字和的绝对差
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (4)事件处理——(6)给.ready()回调函数传递一个参数(Passing an argument to the .ready() callback)...
  • (floyd+补集) poj 3275
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (二)linux使用docker容器运行mysql
  • (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  • (六) ES6 新特性 —— 迭代器(iterator)
  • (免费领源码)python#django#mysql校园校园宿舍管理系统84831-计算机毕业设计项目选题推荐
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)我也是一只IT小小鸟
  • (转载)利用webkit抓取动态网页和链接