接上文 手把手带你撸zookeeper源码-zookeeper启动(五)leader选举投票归档-确认当前zk服务的角色

上篇文章主要分析了leader选举的最终阶段,根据zk集群的相互投票之后,进行投票归档,然后判断某个zk获得投票数是否大于集群数量的一半,如果未超过一半,则继续下一轮的投票选举,如果有某个zk有超过一半的选票,则leader确定,然后其他zk服务则为follower或者observer

不知道大家有没有搭建过三台zk的集群,如果你是按照myid从小到大依次启动三台服务器,一般情况下都是第二台服务器是会成为leader的,相信大家看了之前的代码分析应该能知道为什么了吧?

接下来还会有很多篇文章来剖析zookeeper集群之间的数据如何基于2pc的方式来进行数据同步,客户端和服务端的会话如何创建和维护、zookeeper内部的数据结构是如何保存的?如何创建、删除临时节点、持久节点、顺序节点,zookeeper的监听回调通知如何实现的,以及zookeeper的故障恢复是如何做的?东西还有很多,我们一一去剖析,一步一个脚印的学习

今天这篇文章我们来分析一下,既然leader已经选举出来了,接下来会做什么呢? 

上篇文章我们分析完了FastLeaderElection#lookForLeader这个方法,它返回了当前一个Vote对象,确定了leader、follower、observer的角色,此方法也就结束了。

                            if (n == null) {
                                //如果当前服务器是的sid和投票为leader的sid一样
                                //则设置peerState状态为LEADING,否则要么为FOLLOWING或者OBSERVING
                               self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }

 

我们接着看调用这行代码的地方,在QuorumPeer中的run方法中,把上面返回的Vote对象赋值给currentVote,接着会进行下一轮的循环,此时zk的角色都已经确定了,然后会进入到响应的分支当中

我们一个个看,如果peerState == LEADING,则进入如下分支

                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;

我们今天主要分析一下看看leader角色zookeeper主要做了什么,我们一步步来分析

setLeader(makeLeader(logFactory));

这一行代码主要是创建一个Leader对象,并且把创建出来的leader对象赋值给当前leader变量,下面会调用leader.lead(),这个方法很重要,等会分析,我们看看创建Leader对象都干了啥

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        return new Leader(this, new LeaderZooKeeperServer(logFactory,
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }

logFactory这个参数就是解析完zoo.cfg文件之后,把属性封装到了QuorumPeerConfig对象中,这个对象中肯定有我们配置的日志目录,和数据目录,然后根据这两个目录会创建一个FileTxnSnapLog对象传递给QuorumPeer对象,大家可以看一下之前的代码

然后创建了LeaderZookeeperServer对象,在这个方法里面会有一系列的调用链来处理客户端发送过来的请求, 如2PC阶段如何处理的

new ZooKeeperServer.BasicDataTreeBuilder()


static public class BasicDataTreeBuilder implements DataTreeBuilder {
        public DataTree build() {
            return new DataTree();
        }
    }

猜想一下,BasicDataTreeBuilder对象提供了一个build()方法,返回了一个DataTree对象,这个DataTree肯定就是zookeeper内纯的数据结构对象,然后会在某个地方会调用这个build方法来创建一个DataTree内纯对象, 然后客户端发送增删改查节点的时候肯定就是操作的这个数据结构,是不是这样的,我们之后再具体分析

最后创建一个Leader对象

Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        this.proposalStats = new ProposalStats();
            if (self.getQuorumListenOnAllIPs()) {
                ss = new ServerSocket(self.getQuorumAddress().getPort());
            } else {
                ss = new ServerSocket();
            }
            ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                ss.bind(self.getQuorumAddress());
            }
        this.zk=zk;
    }

在我们的配置文件中有server.x=zk01:2888:3888这样的配置,在这段代码中,就是对当前服务器的2888端口进行监听,等待其他follower和当前leader进行连接

创建完Leader对象之后,最终会调用leader.lead()方法

void lead() throws IOException, InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        //记住leader选举的时间差
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);
        self.start_fle = 0;
        self.end_fle = 0;
        //JMX监控
        zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

        try {
            self.tick.set(0);
            //加载硬盘上的数据到内存中
            zk.loadData();
            
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from 
            // new followers.
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            
            readyToStart = true;
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
            
            synchronized(this){
                lastProposed = zk.getZxid();
            }
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);


            if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                LOG.info("NEWLEADER proposal has Zxid of "
                        + Long.toHexString(newLeaderProposal.packet.getZxid()));
            }
            
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);

            try {
                waitForNewLeaderAck(self.getId(), zk.getZxid());
            } catch (InterruptedException e) {
                shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                        + getSidSetString(newLeaderProposal.ackSet) + " ]");
                HashSet<Long> followerSet = new HashSet<Long>();
                for (LearnerHandler f : learners)
                    followerSet.add(f.getSid());
                    
                if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                    LOG.warn("Enough followers present. "
                            + "Perhaps the initTicks need to be increased.");
                }
                Thread.sleep(self.tickTime);
                self.tick.incrementAndGet();
                return;
            }
            
            startZkServer();
            
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }
            boolean tickSkip = true;
    
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }
                HashSet<Long> syncedSet = new HashSet<Long>();

                // lock on the followers when we use it.
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }

                // check leader running status
                if (!this.isRunning()) {
                    shutdown("Unexpected internal error");
                    return;
                }

              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                
                    return;
              } 
              tickSkip = !tickSkip;
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }

先分析第一个关键点

cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

LeanerCnxAcceptor是一个线程,这里开启一个线程,用来监听其他follower来和当前leader进行连接的,我们看一下LeanerCnxAcceptor.run方法

@Override
        public void run() {
            try {
                while (!stop) {
                    try{
                        Socket s = ss.accept();
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);

                        BufferedInputStream is = new BufferedInputStream(
                                s.getInputStream());
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        fh.start();
                    } catch (SocketException e) {
                        if (stop) {
                            stop = true;
                        } else {
                            throw e;
                        }
                    } catch (SaslException e){
                        LOG.error("Exception while connecting to quorum learner", e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception while accepting follower", e);
            }
        }

这里通过ServerSocket.accept()进行阻塞等待其他follower进行连接,如果有连接进来之后则会创建一个LearnerHandler对象,它也是一个线程,然后把socket交给LearnerHandler,由它来处理和连接进来的socket进行通信处理,这里也是用的阻塞bio,每当有一个follower或者observer进来连接时,都会创建一个单独线程去处理连接以及发送的数据

我们可以进入到LearnerHandler.run方法中去看看,现在有某个follower连接进来了,然后其对应的socket交给了LearnerHandler,接下来就是读取socket中的数据

先截取一部分代码看一下


            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");//读取follower发送过来的注册数据包
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            	LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

这块代码中用到了jute序列化协议,从输入流中读取数据,然后对数据进行反序列化,Jute序列化协议是zookeeper内部封装的一个序列化协议,然后通过特定的格式进行数据传输,从而解决传输过程当中可能出现的粘包拆包问题

 

OK,做个总结,今天大概梳理了一下,当前zk如果是leader时,会创建一个Leader对象,在创建Leader对象时会创建一个LeaderZookeeperServer,这个对象里面封装了一系列的RequestProcessor调用链,还有一个BasicDataTreeBuilder,其中的build方法返回一个DataTree对象。最后创建Leader对象时会创建ServerSocket然后监听端口等待其他Follower或者Observer来进行连接。所有的连接都会交给一个线程LearnerHandler进行处理,然后读取follower发送过来的数据,通过jute进行序列化反序列化

 

每篇文章不用太长,大家把一个点学会学透,每天进步一点点

 

下篇文章先讲一下Jute序列化协议是如何进行序列化反序列化的,以及它的格式是什么样的,怎么解决粘包拆包的问题的

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐