目录

一. 源码环境搭建

(一)源码下载

1. 官方下载网址:

2. 百度网盘下载地址

(二)配置ANT

 (三)编译zookeeper源码

1. 修改配置文件

2. 编译

 (四)将编译后的源码直接open方式导入idea

(五)version.Info类缺失问题处理

二. 服务端与客户端的启动配置

(一)服务端启动

(二)客户端启动

三.  源码分析之单机模式服务端启动

四. 源码分析之Leader选举总体框架

五. 源码分析之Leader选举FastLeaderElection

六. 源码分析之集群模式服务端

(一)执⾏流程图

 (二)源码分析


一. 源码环境搭建

(一)源码下载

1. 官方下载网址:

zookeeper-release-3.5.4下载地址:
:https://github.com/apache/zookeeper/tree/release-3.5.4
apache-ant-1.10.6-bin下载地址:
https://ant.apache.org/bindownload.cgi

 

2. 百度网盘下载地址

zookeeper-release-3.5.4下载地址:zookeeper-release-3.5.4

apache-ant-1.10.6-bin下载地址:apache-ant-1.10.6-bin

(二)配置ANT

1. apache-ant-1.10.6-bin.zip 解压,解压是存放的磁盘路径不要有中文

2. 配置系统环境变量,如下图:

 

 3. 测试ANT是否安装成功

 (三)编译zookeeper源码

1. 修改配置文件

将zookeeper-release-3.5.4.zip解压,解压存放磁盘路径中含中文,修改zookeeper-release-3.5.4目录下的build.xml配置文件,将此配置文件中第1809行的:

https://downloads.sourceforge.net/project/ant-eclipse/ant-eclipse/1.0/ant-eclipse-1.0.bin.tar.bz2

替换成:

http://ufpr.dl.sourceforge.net/project/ant-eclipse/ant-eclipse/1.0/ant-eclipse-1.0.bin.tar.bz2

2. 编译

在zookeeper-release-3.5.4目录下打开CMD,输入ant eclipse命令 开始编译,如图:

由于网速因素,如编译过程较长,请耐心等待,编译完成,显示结果如下图:

 (四)将编译后的源码直接open方式导入idea

(五)version.Info类缺失问题处理

源码导入idea中以后会发现:src/java/main/org/apache/zookeeper/version/Info类缺失,而导致整个源码项目编译失败。下面介绍idea2020.3版本导入zookeeper源码时如何解决这一问题:

1. 找到src/java/main/org/apache/zookeeper/version/util/VerGen.java 这个类,将该类的main方法上添加三个运行参数并删除自动编译:

这三个运行参数为:版本号 1 你自己的编译时间(三个参数之间用空格分隔),如下图:

 2. 选中该类,使用快捷键ctrl+shift+F9进行手动编译

3.执行该类的main方法:

4. 执行完毕后,你会发现Info类文件出现在了

org.apache.zookeeper.version目录下,将其拖到src/java/main/org/apache/zookeeper/version目录下。

二. 服务端与客户端的启动配置

(一)服务端启动

运⾏主类 org.apache.zookeeper.server.QuorumPeerMain ,将 zoo.cfg 的完整路径配置在 Program
arguments

 

VM options 配置,即指定到 conf ⽬录下的 log4j.properties
-Dlog4j.configuration=file:/Users/ericsun/Documents/zookeeper-release-
3.5.4/conf/log4j.properties
(以上改成自己的log4j.properties文件的路径)
运⾏输出⽇志如下:

可以得知单机版启动成功,单机版服务端地址为127.0.0.1:2182 

(二)客户端启动

三.  源码分析之单机模式服务端启动

 zookeeper源码分析之单机模式服务端启动_舞鹤白沙编码日志-CSDN博客

四. 源码分析之Leader选举总体框架

分析Zookeeper中⼀个核⼼的模块,Leader选举。

总体框架图

对于Leader选举,其总体框架图如下图所示

 

AuthFastLeaderElection,LeaderElection其在3.4.0之后的版本中已经不建议使⽤。

说明:
  选举的⽗接⼝为 Election ,其定义了 lookForLeader shutdown 两个⽅法, lookForLeader 表示寻找 Leader shutdown 则表示关闭,如关闭服务端之间的连接。

五. 源码分析之Leader选举FastLeaderElection

刚刚介绍了 Leader 选举的总体框架,接着来学习 Zookeeper 中默认的选举策略, FastLeaderElection
FastLeaderElection 源码分析
类的继承关系

public class FastLeaderElection implements Election {}
说明: FastLeaderElection 实现了 Election 接⼝,重写了接⼝中定义的 lookForLeader ⽅法和 shutdown⽅法
在源码分析之前,我们⾸先介绍⼏个概念 :
  • 外部投票:特指其他服务器发来的投票。
  • 内部投票:服务器⾃身当前的投票。
  • 选举轮次:ZooKeeper服务器Leader选举的轮次,即logical clock(逻辑时钟)。
  • PK:指对内部投票和外部投票进⾏⼀个对⽐来确定是否需要变更内部投票。选票管理
  • sendqueue:选票发送队列,⽤于保存待发送的选票。
  • recvqueue:选票接收队列,⽤于保存接收到的外部投票。

  (1)FastLeaderElection第一个内部类Notification

 (2)FastLeaderElection第二个内部类ToSend

 

 (3)FastLeaderElection第三个内部类Messenger

 

 

lookForLeader函数

 

(1)首先进入FastLeaderElection 类中的lookForLeader方法

/**
     * Starts a new round of leader election. Whenever our QuorumPeer
     * changes its state to LOOKING, this method is invoked, and it
     * sends notifications to all other peers.
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                //1.更新逻辑时钟,每开始一次新一轮选举的时候,都需要更新逻辑时钟
                logicalclock.incrementAndGet();
                //2. 初始化选票
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            //3. 发送初始化选票
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             * 在循环中交换通知,直到找到一个领导者
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                //4. 接收外部投票:每台服务器会不断的从recvqueue中去获取外部投票
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    //判断自己是否和集群断开了连接
                    if(manager.haveDelivered()){
                        //没有断开连接:发送自己本身的投票信息
                        sendNotifications();
                    } else {
                        //断开:马上重新连接
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                //5. 处理外部投票(判断选票轮次)
                else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        //*外部投票的选举轮次大于内部投票
                        if (n.electionEpoch > logicalclock.get()) {
                            //更新选举轮次
                            logicalclock.set(n.electionEpoch);
                            //清空所有已经收到的投票
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            //再次发送自己本身的投票
                            sendNotifications();
                            //*如果外部投票的选举轮次小于内部投票
                        } else if (n.electionEpoch < logicalclock.get()) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            //直接忽略
                            break;
                            //* 外部投票的选举轮次和内部投票一致,也是绝大多数情况
                            //6. 进行投票PK
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            //更新自己本身的轮次
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            //7. 变更选票,重新发送选票信息
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        //8. 选票归档:将收到的外部投票放进选票集合recvset中
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //9.判断当前节点收到的票数是否可以结束选举
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                //设置服务器状态
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                //最终的选票
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                //清空recvqueue队列的选票
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock.get()){
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         * Only peer epoch is used to check that the votes come
                         * from the same ensemble. This is because there is at
                         * least one corner case in which the ensemble can be
                         * created with inconsistent zxid and election epoch
                         * info. However, given that only one ensemble can be
                         * running at a single point in time and that each 
                         * epoch is used only once, using only the epoch to 
                         * compare the votes is sufficient.
                         * 
                         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
                         */
                        outofelection.put(n.sid, new Vote(n.leader, 
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                        if (termPredicate(outofelection, new Vote(n.leader,
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

选举描述:

ZooKeeper 服务器检测到当前服务器状态变成 LOOKING 时,就会触发 Leader 选举,即调⽤
lookForLeader ⽅法来进⾏ Leader 选举。
之后每台服务器会不断地从 recvqueue 队列中获取外部选票。如果服务器发现⽆法获取到任何外部投票,就⽴即确认⾃⼰是否和集群中其他服务器保持着有效的连接,如果没有连接,则⻢上建⽴连接,如 果已经建⽴了连接,则再次发送⾃⼰当前的内部投票
在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进⾏不同的处理。 
         · 外部投票的选举轮次⼤于内部投票 。若服务器⾃身的选举轮次落后于该外部投票对应服务器 的选举轮次,那么就会⽴即更新⾃⼰的选举轮次 (logicalclock) ,并且清空所有已经收到的投票,然后使 ⽤初始化的投票来进⾏ PK 以确定是否变更内部投票。最终再将内部投票发送出去。
     · 外部投票的选举轮次⼩于内部投票 。若服务器接收的外选票的选举轮次落后于⾃身的选举轮次,那么 Zookeeper 就会直接忽略该外部投票,不做任何处理。
     · 外部投票的选举轮次等于内部投票 。此时可以开始进⾏选票 PK ,如果消息中的选票更优,则需要更新本服务器内部选票,再发送给其他服务器。

 

之后再对选票进⾏归档操作,⽆论是否变更了投票,都会将刚刚收到的那份外部投票放⼊选票集合
recvset 中进⾏归档,其中 recvset ⽤于记录当前服务器在本轮次的 Leader 选举中收到的所有外部投票,然后开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果 确定已经有过半服务器认可了该投票,然后再进⾏最后⼀次确认,判断是否⼜有更优的选票产⽣,若 ⽆,则终⽌投票,然后最终的选票
1. ⾃增选举轮次。 在 FastLeaderElection 实现中,有⼀个 logicalclock 属性,⽤于标识当前 Leader 的选举轮次, ZooKeeper 规定了所有有效的投票都必须在同⼀轮次中。 ZooKeeper 在开始新⼀轮的投票 时,会⾸先对 logicalclock 进⾏⾃增操作。
2. 初始化选票。 在开始进⾏新⼀轮的投票之前,每个服务器都会⾸先初始化⾃⼰的选票。在图 7-33 中我们已经讲解了 Vote 数据结构,初始化选票也就是对 Vote 属性的初始化。在初始化阶段,每台服务器都 会将⾃⼰推举为 Leader ,表 7-10 展示了⼀个初始化的选票。
3. 发送初始化选票。 在完成选票的初始化后,服务器就会发起第⼀次投票。 ZooKeeper 会将刚刚初始化好的选票放⼊ sendqueue 队列中,由发送器 WorkerSender
4. 接收外部投票。 每台服务器都会不断地从 recvqueue 队列中获取外部投票。如果服务器发现⽆法获取到任何的外部投票,那么就会⽴即确认⾃⼰是否和集群中其他服务器保持着有效连接。如果发现没有 建⽴连接,那么就会⻢上建⽴连接。如果已经建⽴了连接,那么就再次发送⾃⼰当前的内部投票。
5. 判断选举轮次。 当发送完初始化选票之后,接下来就要开始处理外部投票了。在处理外部投票的时 候,会根据选举轮次来进⾏不同的处理。 · 外部投票的选举轮次⼤于内部投票。如果服务器发现⾃⼰的 选举轮次已经落后于该外部投票对应服务器的选举轮次,那么就会⽴即更新⾃⼰的选举轮次 logicalclock ),并且清空所有已经收到的投票,然后使⽤初始化的投票来进⾏ PK 以确定是否变更内 部投票(关于 P K 的逻辑会在步骤 6 中统⼀讲解),最终再将内部投票发送出去。 · 外部投票的选举轮次 ⼩于内部投票。 如果接收到的选票的选举轮次落后于服务器⾃身的,那么 ZooKeeper 就会直接忽略该外 部投票,不做任何处理,并返回步骤 4
· 外部投票的选举轮次和内部投票⼀致。 这也是绝⼤多数投票的场景,如外部投票的选举轮次和内部投票⼀致的话,那么就开始进⾏选票 PK 。 总的来说,只有在同⼀个选举轮次的投票才是有效的投票。
6. 选票 PK 。 在步骤 5 中提到,在收到来⾃其他服务器有效的外部投票后,就要进⾏选票 PK —— 也就是 FastLeaderElection.totalOrderPredicate ⽅法的核⼼逻辑。选票 PK 的⽬的是为了确定当前服务器是否 需要变更投票,主要从选举轮次、 ZXID SID 三个因素来考虑,具体条件如下:在选票 PK 的时候依次 判断,符合任意⼀个条件就需要进⾏投票变更。 · 如果外部投票中被推举的 Leader 服务器的选举轮次⼤ 于内部投票,那么就需要进⾏投票变更。 · 如果选举轮次⼀致的话,那么就对⽐两者的 ZXID 。如果外部 投票的 ZXID ⼤于内部投票,那么就需要进⾏投票变更。 · 如果两者的 ZXID ⼀致,那么就对⽐两者的 SID 。如果外部投票的 SID ⼤于内部投票,那么就需要进⾏投票变更。
7. 变更投票。 通过选票 PK 后,如 果确定了外部投票优于内部投票(所谓的 优于 ,是指外部投票所推举的服务器更适合成为 Leader ), 那么就进⾏投票变更 —— 使⽤外部投票的选票信息来覆盖内部投票。变更完成后,再次将这个变更后的 内部投票发送出去。
8. 选票归档。 ⽆论是否进⾏了投票变更,都会将刚刚收到的那份外部投票放⼊ 选票集合 ”recvset 中进⾏ 归档。 recvset ⽤于记录当前服务器在本轮次的 Leader 选举中收到的所有外部投票 —— 按照服务器对应 SID 来区分,例如, {
1 vote1 ),(
2 vote2 ), …} 9. 统计投票。 完成了选票归档之后,就可
以开始统计投票了。统计投票的过程就是为了统计集群中是否已经有过半的服务器认可了当前的内部投 票。如果确定已经有过半的服务器认可了该内部投票,则终⽌投票。否则返回步骤 4 10. 更新服务器状 态。 统计投票后,如果已经确定可以终⽌投票,那么就开始更新服务器状态。服务器会⾸先判断当前被 过半服务器认可的投票所对应的 Leader 服务器是否是⾃⼰,如果是⾃⼰的话,那么就会将⾃⼰的服务器 状态更新为 LEADING 。如果⾃⼰不是被选举产⽣的 Leader 的话,那么就会根据具体情况来确定⾃⼰是 FOLLOWING 或是 OBSERVING 。 以上 10 个步骤,就是 FastLeaderElection 选举算法的核⼼步骤,其 中步骤 4 9 会经过⼏轮循环,直到 Leader 选举产⽣。另外还有⼀个细节需要注意,就是在完成步骤 9 之后,如果统计投票发现已经有过半的服务器认可了当前的选票,这个时候, ZooKeeper 并不会⽴即进 ⼊步骤 10 来更新服务器状态,⽽是会等待⼀段时间(默认是 200 毫秒)来确定是否有新的更优的投票。

附:发送选票信息源码图:

附:判断当前节点收到的票数是否可以结束选举

六. 源码分析之集群模式服务端

(一)执⾏流程图

 (二)源码分析

zookeeper源码分析之集群模式服务端_舞鹤白沙编码日志-CSDN博客

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐