当前位置: 首页 > news >正文

zookeeper节点启动的主要逻辑

1.前言

QuorumPeer是一个线程对象,里面比较核心的方法是run方法,但QuorumPeer的run方法比较复杂,里面包含着针对QuorumPeer的各种状态的判断,里面的代码比较长,zk节点的looking状态下的操作,下面这块代码是针对QuorumPeer是Looking状态下的话,进行执行的代码逻辑,会有两个分支,根据判断节点是否配置readonlymode.enabled参数,然后有两个分支逻辑,这两个分支逻辑都会走同一个代码逻辑。readonlymode.enabled参数为true的时候会进行开启一个异步线程执行ReadOnlyZooKeeperServer的startup方法。

2.LOOKING状态下QuorumPeer的执行逻辑

下面这个是LOOKING状态下的代码逻辑

LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
//判断节点是否是一个只读节点的配置if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());checkSuspended();} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}

走同一块的代码逻辑

//将reconfigFlag这个字段的值设置为false
//不是很清楚这个 reconfigFlag字段的作用
reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;//QuorumPeer#start方法中已经进行了startLeaderElection方法的调用//这块看了下shuttingDownLE这个属性默认值为false 感觉一般情况下不会调用这个方法//开启选举算法 这块开始选择startLeaderElection();}//设置当前的选票 //makeLEStrategy().lookForLeader() 这个逻辑看上去是进行选举leader节点操作setCurrentVote(makeLEStrategy().lookForLeader());checkSuspended();

3.创建选举算法

虽然startLeaderElection这方法,在QuorumPeer的start方法中,已经被进行调用了,此处在looking状态下很有可能是不会被调用的,我们可以简单的看下startLeaderElection这个方法,我们这边看下的是zookeeper-3.9.1版本的代码

 public synchronized void startLeaderElection() {try {//判断当前QuorumPeer的状态如果是LOOKING的状态 会进行构建一个选票信息if (getPeerState() == ServerState.LOOKING) {currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch (IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//根据选举的类型进行创建一个选举的算法逻辑this.electionAlg = createElectionAlgorithm(electionType);}
/***
* 以前版本还支持 多个选举类型 会有不同的选举算法来进行对应
* 现在支持1种选举算法 FastLeaderElection 
**/
protected Election createElectionAlgorithm(int electionAlgorithm) {Election le = null;switch (electionAlgorithm) {case 1:throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");case 2:throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");case 3://进行构建一个网络通信的managerQuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}//获取网络通信的组件的一个ListenerQuorumCnxManager.Listener listener = qcm.listener;if (listener != null) {//启动Listener 这个Listener主要是用来接收别的节点的信息listener.start();//构建FastLeaderElection对象FastLeaderElection fle = new FastLeaderElection(this, qcm);//启动选举算法 这个选举算法应该也是一个线程fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;}

FastLeaderElection方法的构造方法,在FastLeaderElection的构造方法中,主要进行发送队列和接收队列的初始化,并对QuorumPeer的网络通信组件进行封装,用于后期进行网络通信。

//构造方法  
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {this.stop = false;this.manager = manager;starter(self, manager);}private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;//初始化一个发送队列sendqueue = new LinkedBlockingQueue<>();//初始化的一个接收队列recvqueue = new LinkedBlockingQueue<>();this.messenger = new Messenger(manager);}
//FastLeaderElection的start方法public void start() {this.messenger.start();}
//Messenger的start方法 此时会进行启动两个线程
void start() {//发送线程的启动 this.wsThread.start();//接收线程的启动this.wrThread.start();
}

4.lookForLeader

从前端的代码逻辑中分析得出,org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader,这个方法开启leader节点的选举操作,当QuorumPeer的状态为LOOKING状态的时候,会进行调用此方法。

 public Vote lookForLeader() throws InterruptedException {//.....省略了部分代码try {//当前选票存放的集合Map<Long, Vote> recvset = new HashMap<>();Map<Long, Vote> outofelection = new HashMap<>();int notTimeout = minNotificationInterval;synchronized (this) {logicalclock.incrementAndGet();updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getMyId(),Long.toHexString(proposedZxid));//发送选票信息sendNotifications();SyncedLearnerTracker voteSet = null;//循环交换选票信息 直达选出leader节点while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {// 从接收队列中进行获取选票信息Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);//如果选票信息为null 当zkServer节点第一次启动的时候肯定是nullif (n == null) {//manager.haveDelivered() 这个方法主要进行判断是否有已经连接的机器信息if (manager.haveDelivered()) {//如果已经有连接的机器信息的话 就进行给所有的节点发送选票信息sendNotifications();} else {//zkServer节点第一次启动的时候 连接机器的信息列表肯定为空//所以第一次的时候肯定会进行连接其他机器信息manager.connectAll();}/** Exponential backoff*/notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);/** When a leader failure happens on a master, the backup will be supposed to receive the honour from* Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens** The leader election algorithm does not provide the ability of electing a leader from a single instance* which is in a configuration of 2 instances.* */if (self.getQuorumVerifier() instanceof QuorumOracleMaj&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}LOG.info("Notification time out: {} ms", notTimeout);} else if (validVoter(n.sid) && validVoter(n.leader)) {//....省略很多代码} else {//....省略很多代码}}return null;} finally {try {if (self.jmxLeaderElectionBean != null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());}}

5.集群中机器互联

zk节点最开始启动的时候,会进行leader节点的选举,在选举的过程中,要进行选票的统计,但要进行选票的统计的时候,需要接收zk集群中所有节点的数据。

//连接集群中的其他机器 
public void connectAll() {long sid;//循环遍历进行拦截机器信息for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {sid = en.nextElement();connectOne(sid);}}

通过代码的一步一步的进去查看,我们找到了最终进行连接的操作, 这块zookeeper是进行启动一个异步线程进行连接操作,

 public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {if (!inprogressConnections.add(sid)) {LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);return true;}try {//线程池启动一个异步线程进行连接别的zk节点信息connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));connectionThreadCnt.incrementAndGet();} catch (Throwable e) {inprogressConnections.remove(sid);LOG.error("Exception while submitting quorum connection request", e);return false;}return true;}

//异步连接请求线程private class QuorumConnectionReqThread extends ZooKeeperThread {final MultipleAddresses electionAddr;final Long sid;QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) {super("QuorumConnectionReqThread-" + sid);this.electionAddr = electionAddr;this.sid = sid;}@Overridepublic void run() {try {//连接请求initiateConnection(electionAddr, sid);} finally {inprogressConnections.remove(sid);}}}

initiateConnection方法是初始化连接请求的数据,

//初始化连接请求
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {Socket sock = null;try {LOG.debug("Opening channel to server {}", sid);//根据是否是SSL的类型进行创建不同的socketif (self.isSslQuorum()) {sock = self.getX509Util().createSSLSocket();} else {sock = SOCKET_FACTORY.get();}//设置socket的一些属性//tcpNoDelay soTimeout keepAlive等参数信息setSockOpts(sock);//开始正儿八经的连接操作sock.connect(electionAddr.getReachableOrOne(), cnxTO);if (sock instanceof SSLSocket) {SSLSocket sslSock = (SSLSocket) sock;sslSock.startHandshake();LOG.info("SSL handshake complete with {} - {} - {}",sslSock.getRemoteSocketAddress(),sslSock.getSession().getProtocol(),sslSock.getSession().getCipherSuite());}LOG.debug("Connected to server {} using election address: {}:{}",sid, sock.getInetAddress(), sock.getPort());} catch (X509Exception e) {LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);closeSocket(sock);return;} catch (UnresolvedAddressException | IOException e) {LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);closeSocket(sock);return;}try {//连接完成之后的一些处理,//包含设置一些输入,输出流,读写线程的启动等startConnection(sock, sid);} catch (IOException e) {LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",sid,sock.getRemoteSocketAddress(),e);closeSocket(sock);}}

机器连接完成的一些操作,当机器连接完成之后,会进行调用startConnection方法

private boolean startConnection(Socket sock, Long sid) throws IOException {//data数据的输出流 从socket中进行获取并进行封装DataOutputStream dout = null;//data数据的输入流,从socket中获取并进行封装DataInputStream din = null;LOG.debug("startConnection (myId:{} --> sid:{})", self.getMyId(), sid);try {BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());dout = new DataOutputStream(buf);//连接完成之后 向连接的zk节点输出自己的节点idlong protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;dout.writeLong(protocolVersion);dout.writeLong(self.getMyId());Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2? self.getElectionAddress().getAllAddresses(): Arrays.asList(self.getElectionAddress().getOne());String addr = addressesToSend.stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));byte[] addr_bytes = addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));} catch (IOException e) {LOG.warn("Ignoring exception reading or writing challenge: ", e);closeSocket(sock);return false;}QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);if (qps != null) {authLearner.authenticate(sock, qps.hostname);}//这块有一个逻辑 我觉得可以变更一下 这块是判断sid 如果小于自身的sid的时候//会进行关闭连接 那这块我觉得是不是可以在连接的时候 只连接比自己大的sid就可以了//而且这块的sid也是自己的sid 根本就不会大于 这个if里的操作就不会执行if (sid > self.getMyId()) {LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);closeSocket(sock);} else {LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);// 开始两个线程个 一个读的线程 一个写的线程 并进行启动 读写线程SendWorker sw = new SendWorker(sock, sid);RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);if (vsw != null) {vsw.finish();}senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));//启动读写线程sw.start();rw.start();return true;}return false;}

有连接就会有被连接的,比方说sid=1的节点进行发起连接的时候,别的zk节点是如何进行接收他的连接请求呢。这块代码逻辑是在哪里,还记得我们在创建选举算法的时候,会进行创建网络连接器,在网络连接器中有一个QuorumCnxManager.Listener,这个Listener会根据集群的数量启动一定的数量的ListenerHandler来进行监听连接。

 public void run() {if (!shutdown) {LOG.debug("Listener thread started, myId: {}", self.getMyId());//获取所有的连接地址的大小Set<InetSocketAddress> addresses;if (self.getQuorumListenOnAllIPs()) {addresses = self.getElectionAddress().getWildcardAddresses();} else {addresses = self.getElectionAddress().getAllAddresses();}//启动一个CountDownLatch 大小为连接地址集合的大小CountDownLatch latch = new CountDownLatch(addresses.size());listenerHandlers = addresses.stream().map(address ->new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch)).collect(Collectors.toList());// 异步线程提交 listenerHandlersfinal ExecutorService executor = Executors.newFixedThreadPool(addresses.size());try {listenerHandlers.forEach(executor::submit);} finally {executor.shutdown();}try {//在此进行等待,等待所有的节点都连接成功latch.await();} catch (InterruptedException ie) {LOG.error("Interrupted while sleeping. Ignoring exception", ie);} finally {//解释所有的ListenerHandler监听for (ListenerHandler handler : listenerHandlers) {try {handler.close();} catch (IOException ie) {LOG.debug("Error closing server socket", ie);}}}}LOG.info("Leaving listener");if (!shutdown) {LOG.error("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}",self.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|")));if (socketException.get()) {// After leaving listener thread, the host cannot join the quorum anymore,// this is a severe error that we cannot recover from, so we need to exitsocketBindErrorHandler.run();}}}

ListenerHandler的主要逻辑 :进行创建ServerSocket这个东西,然后调用serverSocket.accept(),进行接受别的scoket的连接,接收到别的连接之后也会进行封装输入流和输出流,然后启动读写线程,用来进行接收后续的消息,但是这块有一个不同的地方。在接收完连接之后,会有一个handleConnection方法,在这个方法中会进行读取连接请求发送过来的sid,当sid小于当前sid的时候会尽心关闭连接,然后自己在主动发起一个连接请求。

 public void run() {try {Thread.currentThread().setName("ListenerHandler-" + address);//接收请求参数信息acceptConnections();try {close();} catch (IOException e) {LOG.warn("Exception when shutting down listener: ", e);}} catch (Exception e) {// Output of unexpected exception, should never happenLOG.error("Unexpected error ", e);} finally {//lietener中传入的countDownLatch 进行减一的操作latch.countDown();}}//acceptConnections方法private void acceptConnections() {int numRetries = 0;Socket client = null;//如果机器没有宕机并且重试次数还没达到最大次数的时候 会在这里进行循环等待连接while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {try {serverSocket = createNewServerSocket();LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());while (!shutdown) {try {client = serverSocket.accept();setSockOpts(client);LOG.info("Received connection request from {}", client.getRemoteSocketAddress());if (quorumSaslAuthEnabled) {//异步接收连接请求receiveConnectionAsync(client);} else {//同步接收连接请求receiveConnection(client);}numRetries = 0;} catch (SocketTimeoutException e) {LOG.warn("The socket is listening for the election accepted "+ "and it timed out unexpectedly, but will retry."+ "see ZOOKEEPER-2836");}}} catch (IOException e) {if (shutdown) {break;}LOG.error("Exception while listening to address {}", address, e);if (e instanceof SocketException) {socketException.set(true);}numRetries++;try {close();Thread.sleep(1000);} catch (IOException ie) {LOG.error("Error closing server socket", ie);} catch (InterruptedException ie) {LOG.error("Interrupted while sleeping. Ignoring exception", ie);}closeSocket(client);}}if (!shutdown) {LOG.error( "Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.",formatInetAddr(address),numRetries,ELECTION_PORT_BIND_RETRY);}
}

相关文章:

  • 4. MySQL 约束
  • 东方博宜1317 - 正多边形每个内角的度数?
  • webpack学习
  • 掌握复选框(Checkbox)的奥秘:全选与反选功能实现
  • uniapp封装picker选择器组件,支持关键字查询
  • react快速开始(四)-之Vite 还是 (Create React App) CRA? 用Vite创建项目
  • Docker搭建ELKF日志分析系统
  • GPT-4o:免费且更快的模型
  • C语言 指针——函数指针的典型应用:计算定积分
  • CAD二次开发(8)-探索实现不重启CAD进行热部署代码
  • 算法-分治策略
  • 如何复制文件描述符
  • 解决nvidia驱动和CUDA升级问题
  • for of 和 for in 的区别
  • 二轴机器人大米装箱机:技术创新引领智能包装新潮流
  • JS 中的深拷贝与浅拷贝
  • [译] 怎样写一个基础的编译器
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • AWS实战 - 利用IAM对S3做访问控制
  • co.js - 让异步代码同步化
  • ES6系统学习----从Apollo Client看解构赋值
  • ES6之路之模块详解
  • GraphQL学习过程应该是这样的
  • interface和setter,getter
  • JAVA SE 6 GC调优笔记
  • java中的hashCode
  • springboot_database项目介绍
  • 测试开发系类之接口自动化测试
  • 从@property说起(二)当我们写下@property (nonatomic, weak) id obj时,我们究竟写了什么...
  • 跨域
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • 主流的CSS水平和垂直居中技术大全
  • 自定义函数
  • 回归生活:清理微信公众号
  • #1014 : Trie树
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (6)添加vue-cookie
  • (9)STL算法之逆转旋转
  • (附源码)ssm高校实验室 毕业设计 800008
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (九)信息融合方式简介
  • (离散数学)逻辑连接词
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (十一)图像的罗伯特梯度锐化
  • (一)appium-desktop定位元素原理
  • ./indexer: error while loading shared libraries: libmysqlclient.so.18: cannot open shared object fil
  • .NET Core 成都线下面基会拉开序幕
  • .NET delegate 委托 、 Event 事件,接口回调
  • .NET gRPC 和RESTful简单对比
  • .net 程序发生了一个不可捕获的异常
  • .NET/ASP.NETMVC 深入剖析 Model元数据、HtmlHelper、自定义模板、模板的装饰者模式(二)...
  • .net与java建立WebService再互相调用