简介

概述

Zookeeper原本是由Yahoo!开发的后来贡献给了Apache的一套开源的、用于进行分布式协调和管理的工具。Zookeeper的官网是:zookeeper.apache.org。图标如下:

图-1 Zookeeper图标

Zookeeper是根据Google的论文<The Chubby Lock Service for Loosely Couple Distribution System>来实现的,原本是Apache Hadoop的子项目之一,后来被独立出来成为了一个顶级工程。

Zookeeper提供了中心化的服务,包括:统一命名,统一配置,分布式锁和分布式组服务等。实际过程中,往往将Zookeeper充当注册中心来使用。从设计模式角度来看,Zookeeper是一个基于观察者模式来实现的分布式框架。即Zookeeper会存储和管理所有节点都关心的数据,然后接受观察者(即节点)的注册,一旦Zookeeper中存储的数据发生变化,注册到Zookeeper上的节点将都能够观察到这个变化,并且根据变化来做出对应的改变。

应用场景

统一命名。在分布式环境中,经常需要对应用服务进行统一命名以便于识别。在传统的互联网行业中,往往是利用其他框架来实现的,例如Nginx等。

图-2 统一命名

统一配置。在分布式环境中,往往会存在大量的配置文件。此时可以通过Zookeeper来实现对配置的统一管理。

图-3 统一配置

动态监控。当集群启动的时候,所有的节点可以注册到Zookeeper上,通过Zookeeper来进行监控。如果集群中某一个节点宕机,则Zookeeper对应的节点消失。

特点

1)数据一致性:客户端不论连接到哪个Zookeeper节点上,展示给它都是同一个视图,即查询的数据都是一样的。这是Zookeeper最重要的性能。

2)原子性:对于事务决议的更新,只能是成功或者失败两种可能,没有中间状态。 要么都更新成功,要么都不更新。即,要么整个集群中所有机器都成功应用了某一事务,要么都没有应用,一定不会出现集群中部分机器应用了该事务,另外一部分没有应用的情况。

3)可靠性:一旦Zookeeper服务端成功的应用了一个事务,并完成对客户端的响应,那么该事务所引起的服务端状态变更将会一直保留下来,除非有另一个事务又对其进行了改变。

4)实时性:Zookeeper保证客户端将在非常短的时间间隔范围内获得服务器的更新信息,或者服务器失效的信息,或者指定监听事件的变化信息。(前提条件是:网络状况良好)。

5)顺序性:如果在一台服务器上消息a在消息b前发布,则在所有服务器上消息a都将在消息b前被发布。客户端在发起请求时,都会跟一个递增的命令号,根据这个机制,Zookeeper会确保客户端执行的顺序性。底层指的是Zxid。

6)过半性:Zookeeper集群必须有半数以上的机器存活才能正常工作。因为只有满足过半性,才能满足选举机制选出leader。因为只有过半,在做事务决议时,事务才能更新。所以一般来说,Zookeeper集群的数量最好是奇数个。

数据结构

1) Zookeeper数据模型类似于Linux文件系统,本身是一个树状结构,每一个节点都称之为Znode节点,因此这棵树也称之为Znode树,根节点为/:

图-4 树状结构

2)不同于Linux的点是,在Zookeeper中,没有相对路径,所有节点路径都是从根节点开始计算。

3)当Zookeeper启动的时候,会自动生成一个`/zookeeper`节点,该节点用于保存Zookeeper集群本身的配置信息。

4)在Zookeeper中,每一个节点都必须存储数据,这个数据可以是对节点的描述信息或者是集群的统一配置信息。默认情况下,每一个节点存储数据的上限为1MB。

5)每一个持久节点都可以挂载子节点,但是临时节点不能挂载子节点。

6)每一个节点的路径都是唯一的,所以基于这一个特点,可以做集群的统一命名服务。

7)Zookeeper的树状结构是维系在内存中的,即每一个节点中的数据也是维系在内存中,这样做的目的是方便快速查找;同时Zookeeper的树状结构也会以快照的形式维系在磁盘上,在磁盘上的存储位置由dataDir属性来决定。 8)Zookeeper数据在磁盘上的存储位置由dataLogDir属性来决定,如果不指定,那么dataLogDir属性的值默认和dataDir属性的值一致。

10)在Zookeeper中,会将每一次的写操作看作是一个事务,会为每一个事务分配一个全局递增的编号,这个编号称之为事务id,简写为Zxid。

节点类型

表-1 节点类型

类型

解释

PERSISTENT

持久节点

EPHEMERAL

临时节点

PERSISTENT_SEQUENTIAL

持久顺序节点

EPHEMERAL_SEQUENTIAL

临时顺序节点

节点信息属性

表-2 节点信息

属性

解释

cZxid

节点被创建的事务id

ctime

节点被创建的时间

mZxid

节点被修改的事务id

mtime

节点被修改的时间

pZxid

子节点个数变化的事务id

cversion

子节点个数的变化次数

dataVersion

节点数据的变化次数

aclVersion

 节点的权限变化次数

ephemeralOwner

如果是持久节点,则此项值为0;如果是临时节点,则此项值为sessionid

dataLength

节点数据的字节个数

numChildren

子节点个数

编译和安装

编译

此处以Zookeeper截止到目前(2023年6月30日)的最新版本3.8.1为基础进行编译。如果需要编译Zookeeper,那么要求虚拟机或者云主机中需要提前安装好JDK以及Maven环境。

1)从官网下载Zookeeper的安装包或者上传已经下载好的安装包。

# 进入软件的预装目录

cd /opt/presoftware

# 下载Zookeeper的安装包(官网下载地址)

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1.tar.gz

2)解压Zookeeper的源码包:

tar -xvf apache-zookeeper-3.8.1.tar.gz -C /opt/source/

3)进入源码目录:

cd /opt/source/apache-zookeeper-3.8.1

4)执行编译命令:

mvn clean package -Pdist,nativeN,docs -DskipTests -Dtar -Dmaven.skip.test=true -Dmaven.javadoc.skip=true -Denforcer.skip=true

5)编译好之后,安装包在zookeeper-assembly/target目录下。

完全分布式安装

安装Zookeeper之前需要先准备好Java环境,建议安装JDK8版本。实际过程中,一般是先在一台主机上配置好之后再远程分发给其他的主机。

1)所有主机关闭防火墙。

systemctl stop firewalld
systemctl disable firewalld

2)上传或者下载Zookeeper的安装包,此处可以直接使用之前编译好的安装包。

3)解压:

tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz -C /opt/module/

4)由于目录的原名比较长,所以实际过程中都会给目录重命名。习惯上,命名会保留框架名+版本。例如:

cd /opt/module/

mv apache-zookeeper-3.8.1-bin/ zookeeper-3.8.1

5)进入Zookeeper的配置目录:

cd zookeeper-3.8.1/conf/

6)Zookeeper需要使用的配置文件默认是zoo.cfg,所以需要复制配置文件:

cp zoo_sample.cfg zoo.cfg

7)编辑配置文件:

vim zoo.cfg

8)修改属性:

# 这个属性用于指定Zookeeper的数据存储目录,默认是放在/tmp目录下

dataDir=/opt/module/zookeeper-3.8.1/data

9)在文件末尾添加:

# 格式为:server.编号=IP或者主机名:原子广播端口:选举端口号

server.1=192.168.52.6:2888:3888

server.2=192.168.52.7:2888:3888

server.3=192.168.52.8:2888:3888

10)构建dataDir属性指定的数据目录并进入:

mkdir /opt/module/zookeeper-3.8.1/data

cd /opt/module/zookeeper-3.8.1/data

11)编辑文件,文件名固定为myid:

vim myid

在文件中添加当前主机的编号。例如192.168.52.6主机对应的编号为1,那么就在这个文件中添加数字1;192.168.52.7主机对应的编号为2,那么就在这个文件中添加数字2。

12)远程分发给其他的主机:

cd /opt/module/

scp -r zookeeper-3.8.1/ root@192.168.52.7:$PWD

scp -r zookeeper-3.8.1/ root@192.168.52.8:$PWD

13)修改对应主机上的myid。

14)所有主机配置环境变量。

# 编辑文件

vim /etc/profile.d/myenv.sh

# 在文件中添加

export ZOOKEEPER_HOME=/opt/module/zookeeper-3.8.1

export PATH=$PATH:$ZOOKEEPER_HOME/bin

# 保存退出,生效

source /etc/profile.d/myenv.sh

15)启动Zookeeper:

zkServer.sh start

16)查看Zookeeper的进程状态:

zkServer.sh status

配置信息

表-3 配置信息

参数

默认值

说明

clientPort

2181

客户端连接服务器端的端口,即Zookeeper的对外服务端口

dataDir

/tmp/zookeeper

数据目录,即存储快照文件的目录

dataLogDir

/tmp/zookeeper

事务日志输出目录,用于存储Zookeeper的写操作。在不指定的情况下,和dataDir一致

tickTime

2000

Zookeeper中的时间单元。Zookeeper中所有时间都是以这个时间单元为基础,进行整数倍配置,单位是毫秒

initLimit

10*tickTime

follower在启动过程中,会从leader同步所有最新数据,然后确定自己能够对外服务的起始状态。leader允许follower在initLimit 时间内完成这个工作,即follower需要在initLimit规定的时间内完成所有数据的同步。如果Zookeeper集群中的数据量较大,follower在启动的时候,从leader上同步数据的时间也会相应变长,因此在这种情况下,可以考虑适当调大这个参数

syncLimit

5*tickTime

表示leader与follower 之间发送消息,请求和应答时间长度。如果follower在设置的时间内不能与leader进行通信,那么此follower将被丢弃。当集群网络环境不好时,可以适当调大

minSessionTimeout

maxSessionTimeout

2*tickTime~20*tickTime

Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间

server.x=hostname:端口号1:端口号2

配置集群中的主机,其中:x是主机编号,即myid;端口号1是原子广播端口,用于leader和follower之间的通信;端口号2是选举端口,用于选举过程中的通信

jute.maxbuffer

1M

用于控制每一个节点的最大存储的数据量

globalOutstandingLimit

1000

最大请求堆积数。当请求过多leader暂时处理不了的时候,会将请求临时存储到本地队列中,这个过程称之为请求堆积

preAllocSize

64M

预先开辟磁盘空间,用于后续写入事务日志

leaderServers

yes

默认情况下,leader也可以接受客户端连接,并提供正常的读写服务。但是,如果需要leader专注于集群中机器的事务协调(原子广播),那么可以将这个参数设置为no,这样一来,会提高整个zk集群性能

maxClientCnxns

60

控制的每一台zk服务器能处理的客户端并发请求数

指令及API操作

指令操作

启动指令

表-4 启动指令

命令

解释

zkServer.sh start

启动Zookeeper

zkServer.sh status

查看Zookeeper的启动状态

zkServer.sh stop

停止Zookeeper

zkCli.sh

启动Zookeeper的命令行客户端

zkCli.sh -server 192.168.52.7:2181

连接指定节点的Zookeeper

操作指令

表-5 操作指令

命令

解释

create /node01 'hello'

在根节点下创建节点node01,并且数据为hello

create -e /node02 ''

在根节点下创建临时节点node02,并且数据为空。该节点在客户端关闭之后会删除

create -s /node03 'hi'

在根节点下创建顺序节点/node03000000X,并且数据为hi

create -e -s /node04 ''

在根节点下创建临时顺序节点/node040000000X,并且数据为空。该节点在客户端关闭之后会删除

get /node01

获取/node01的数据

get -w /node01

监控/node01节点的数据变化

delete /node01

删除/node01。注意,如果/node01有子节点,那么不能删除

rmr /node01

删除/node01及其子节点。该方法已过时

deleteall /node01

删除/node01及其子节点

set /node01 'test'

将/node01中的数据更新为test

ls /

查看/下的所有的子节点

ls -w /

监控/节点下的子结点个数变化

ls /

查看/下的所有的子节点及/节点的信息

quit

退出客户端

connect hadoop02:2181

连接指定节点上的Zookeeper

stat /

查看/节点的信息

removewatches /

移除监控

history

查看历史命令

setquota -n 2 /node01

限制/node01的子节点个数不超过2个。注意,如果子节点个数超过指定数量,Zookeeper不会报错,而是会在日志文件中进行警告

setquota -b 100 /node01

限制/node01的数据不能超过100个字节

listquota /node01

查看/node01的限制

delquota /node01

删除/node01的限制

redo 10

再次执行id为10的命令

sync /

所有follower同步/下的数据

version

查看版本

whoami

查看当前用户

getEphemerals /node01

获取/node01的临时子节点

getAllChildrenNumber /

获取所有的子节点数量

API操作

Zookeeper原生API

导入POM文件依赖:

<dependencies>

<!--单元测试-->

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.13.2</version>

</dependency>

<!--日志打印-->

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-core</artifactId>

<version>2.17.2</version>

</dependency>

<!--Zookeeper-->

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.8.1</version>

</dependency>

</dependencies>

Zookeeper的基本操作:

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;



import java.io.IOException;

import java.util.List;

import java.util.concurrent.CountDownLatch;



public class TestZookeeper1 {



    ZooKeeper zk;



    // 发起连接

    @Before

    public void connect() throws IOException, InterruptedException {



        // 闭锁,用于产生阻塞。使一个或多个线程一直等待,直到其它线程的操作执行完后再执行。

        CountDownLatch cdl = new CountDownLatch(1);



        // 构建Zookeeper对象,发起连接

        // String connectString - 连接地址,包含IP/主机名+端口号

        // int sessionTimeout - 连接超时时间,单位是毫秒

        // Watcher watcher - 监控者,用于监控连接是否建立

        zk = new ZooKeeper("192.168.52.6:2181",

                5000,// 单位是毫秒,默认范围是4000~40000

                // 需要注意的是,Zookeeper底层采用的是Netty来完成连接

                // Netty本身是一个异步非阻塞的框架,那么就意味着无论是否连接上

                // 程序都会继续往下执行,所以需要阻塞

                event -> {

                    // 判断是否建立连接

                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected)

                        System.out.println("connected");

                    // 建立连接,那么减少计数,放开阻塞

                    cdl.countDown();

                }

        );

        // 连接上之前,程序需要阻塞

        cdl.await();

    }



    // 创建节点

    @Test

    public void createNode() throws InterruptedException, KeeperException {

        // String path - 节点路径

        // byte[] data - 节点数据

        // List<ACL> acl - 节点权限

        // CreateMode createMode - 创建模式,实际上就是节点类型

        // 返回值是节点名称

        String path = zk.create("/test", "testing".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        System.out.println(path);

    }



    // 修改节点数据

    @Test

    public void setData() throws InterruptedException, KeeperException {

        // String path - 节点路径

        // byte[] data - 节点数据

        // int version - 数据版本,需要和节点的dataVersion属性一致

        // 如果不一致,会出现BadVersionException

        // 如果需要忽略版本校验,那么可以指定值为-1

        // 返回值是节点信息

        Stat stat = zk.setData("/test", "test".getBytes(), -1);

        System.out.println(stat);

    }



    // 获取节点数据

    @Test

    public void getData() throws InterruptedException, KeeperException {

        // 如果需要节点状态,那么构建一个Stat对象传入

        // Stat s = new Stat();

        // byte[] data = zk.getData("/test", null, s);

        // 如果不需要节点状态,那么传入null即可

        byte[] data = zk.getData("/test", null, null);

        System.out.println(new String(data));

    }



    // 获取子节点

    @Test

    public void getChildren() throws InterruptedException, KeeperException {

        // 会将所有子节点的名字放入list中返回

        List<String> children = zk.getChildren("/", null);

        // 遍历

        for (String child : children) {

            System.out.println(child);

        }

    }



    // 判断节点是否存在

    @Test

    public void exists() throws InterruptedException, KeeperException {

        // 如果节点存在,则返回节点信息

        // 如果节点不存在,则返回null

        Stat stat = zk.exists("/test", null);

        System.out.println(stat != null);

    }



    // 删除节点

    @Test

    public void deleteNode() throws InterruptedException, KeeperException {

        zk.delete("/test", -1);

    }



}

除了基本的节点操作和数据操作以外,还可以对节点进行监控:

package com.gerry.zk;



import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.junit.Before;

import org.junit.Test;



import java.io.IOException;

import java.util.concurrent.CountDownLatch;



public class TestZookeeper2 {



    ZooKeeper zk;



    // 连接

    @Before

    public void connect() throws IOException, InterruptedException {

        CountDownLatch cdl = new CountDownLatch(1);

        zk = new ZooKeeper("192.168.52.6:2181", 5000,

                event -> {

                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected)

                        System.out.println("connected");

                    cdl.countDown();

                });

        cdl.await();

    }



    // 监控节点数据是否发生变化

    @Test

    public void dataChanged() throws InterruptedException, KeeperException {

        CountDownLatch cdl = new CountDownLatch(1);

        zk.getData("/test",

                event -> {

                    if (event.getType() == Watcher.Event.EventType.NodeDataChanged)

                        System.out.println("节点数据发生变化!");

                    cdl.countDown();

                }, null);

        cdl.await();

    }



    // 监控子节点个数是否发生变化

    @Test

    public void childrenChanged() throws InterruptedException, KeeperException {

        CountDownLatch cdl = new CountDownLatch(1);

        zk.getChildren("/test",

                event -> {

                    if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged)

                        System.out.println("子节点个数发生变化!");

                    cdl.countDown();

                });

        cdl.await();

    }



    // 监控节点被创建还是删除

    @Test

    public void nodeChanged() throws InterruptedException, KeeperException {

        CountDownLatch cdl = new CountDownLatch(1);

        zk.exists("/test",

                event -> {

                    if (event.getType() == Watcher.Event.EventType.NodeCreated)

                        System.out.println("节点被创建");

                    else if (event.getType() == Watcher.Event.EventType.NodeDeleted)

                        System.out.println("节点被删除!");

                    cdl.countDown();

                });

        cdl.await();

    }

}

Curator操作

Curator是Google开发的后来贡献给了Apache的用于操作Zookeeper的框架,Curator中提供了更简单、更便捷的方式用于操作Zookeeper。

导入POM文件:

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.13.2</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>apache-curator</artifactId>

<version>5.5.0</version>

<type>pom</type>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-client</artifactId>

<version>5.5.0</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-framework</artifactId>

<version>5.5.0</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>5.5.0</version>

</dependency>

</dependencies>

Curator的代码操作:

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.cache.CuratorCache;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;

import org.junit.After;



import java.util.List;



public class TestCurator {



    // 客户端

    CuratorFramework client;



    // 开启客户端

    @Before

    public void startClient() {

        // 重试策略

        // int baseSleepTimeMs - 间隔时间

        // int maxRetries - 重试次数

        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);

        // 初始化客户端

        client = CuratorFrameworkFactory.newClient("192.168.52.6:2181", rp);

        // 开启客户端

        client.start();

    }



    // 创建节点

    @Test

    public void createNode() throws Exception {

        // 返回值是节点路径

        String path = client.create() // 获取CreateBuilder对象

                .creatingParentsIfNeeded() // 递归创建

                .withMode(CreateMode.PERSISTENT) // 节点类型

                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 指定权限

                .forPath("/test/t1", "test curator".getBytes());// 指定节点和数据

        System.out.println(path);

    }

// 修改数据

    @Test

    public void setData() throws Exception {

        // 返回值表示节点信息

        Stat stat = client.setData() // 获取SetDataBuilder对象

                .withVersion(-1) // 数据版本,位置必须在前

                .forPath("/test/t1", "testing".getBytes());

        System.out.println(stat);

    }



    // 获取节点数据

    @Test

    public void getData() throws Exception {

        byte[] data = client.getData(). // 获取GetDataBuilder对象

                forPath("/test/t1");

        System.out.println(new String(data));

    }



    // 获取子节点

    @Test

    public void getChildren() throws Exception {

        // 将子节点名称放入list中返回

        List<String> children = client.getChildren() // 获取GetChildrenBuilder对象

                .forPath("/");

        for (String child : children) {

            System.out.println(child);

        }

    }



    // 判断节点是否存在

    @Test

    public void exists() throws Exception {

        // 如果对象存在,则返回对象的属性信息

        // 如果对象不存在,则返回null

        Stat stat = client.checkExists() // 获取ExistsBuilder对象

                .forPath("/test");

        System.out.println(stat != null);

    }



    // 删除节点

    @Test

    public void deleteNode() throws Exception {

        client.delete() // 获取DeleteBuilder对象

                .deletingChildrenIfNeeded() // 递归删除

                .withVersion(-1) // 数据版本

                .forPath("/test");

    }



    // 监控

    @Test

    public void watch() throws Exception {

        // 指定监听路径,Curator会将这个节点及其子节点全部监听

        CuratorCache cache = CuratorCache.build(client, "/test");

        cache.listenable() // 获取监听

                .addListener((type, oldData, data) -> {

                    // 判断事件类型

                    switch (type) {

                        case NODE_CREATED:

                            System.out.println("节点被创建");

                            break;

                        case NODE_DELETED:

                            System.out.println("节点被删除");

                            break;

                        case NODE_CHANGED:

                            System.out.println("节点数据发生变化");

                            break;

                        default:

                            System.out.println("其他操作");

                            break;

                    }

                }); // 添加监听器

        // 开始监听

        cache.start();

        // 持续监听

        while (true) ;

    }



    // 关闭客户端

    @After

    public void closeClient() {

        if (client != null) client.close();

    }



}

集群操作

步骤如下:

1)卸载Centos7中自带的nmap(开源免费的针对大型网络的端口扫描工具):

rpm -qa | grep -i nmap | xargs rpm -ev --nodeps

2)下载nc安装包或者通过yum安装:

yum -y install nc

3)修改Zookeeper的配置信息:

vim /opt/module/zookeeper-3.8.1/conf/zoo.cfg

在文件中添加:

4lw.commands.whitelist=*

4)重新启动Zookeeper。

命令如下表:

表-6 集群命令

命令

解释

echo conf | nc hadoop101 2181

查看hadoop101的配置信息

echo cons | nc hadoop101 2181

获取hadoop101上连接的客户端信息,包括每个客户端的客户端IP、会话ID和最后一次与服务器交互的操作类型等

echo crst | nc hadoop101 2181

重置hadoop101上连接的客户端信息

echo dump | nc hadoop101 2181

获取当前集群的所有会话信息,包括这些会话的会话ID,以及每个会话创建的临时节点等信息。

echo envi | nc hadoop101 2181

获取hadoop101服务器的环境信息

echo ruok | nc hadoop101 2181

判断hadoop101的Zookeeper是否运行,如果返回imok表示在运行

echo stat | nc hadoop101 2181

获取hadoop101上的集群状态。/zookeeper/leader /zookeeper/proc-id在新版本中不显示

echo srvr | nc hadoop101 2181

和stat功能一致,但是不会输出客户端连接信息

echo srst | nc hadoop101 2181

重置hadoop101的服务器信息

选举机制

选举过程

在Zookeeper集群中,当没有leader的时候,集群会自动进入选举状态,注意,此时所有的节点都会进入选举状态参与选举。如果集群中已经有了leader,那么此时Zookeeper集群会自动的给这个选举出来的leader赋予一个全局递增的编号,这个编号称之为epochid。之后,leader会将自己的epochid发送给每一个follower,follower收到之后会存储到本地的acceptedEpoch文件中。

集群进入选举状态之后,所有的节点都会参与选举,并且将自己的选举信息发送给其他的节点。选举信息包含:

1)leader编号,即epochid。如果epochid较小,说明操作已经过时。

2)当前节点的最大事务id(Zxid)。事务id越大,说明这个节点接受到的写操作越多。

3)节点的选举编号,即myid。这个由用户在zoo.cfg以及myid文件中指定。

节点在接收到其他节点的选举信息之后会进行比较:

1)先比较epochid,哪一个的epochid较大则哪一个节点胜出。注意,因为epochid是递增的,所以epochid较小的节点说明操作已经过时!

2)如果epochid一致,那么此时比较Zxid,谁大谁赢。Zxid越大说明写操作越多,可以理解为活干得比较多。

3)如果Zxid也一致,那么此时比较myid,同样,谁大谁赢。因此,要求各个节点的myid必须不同。

4)如果一个节点在比较过程中失败,那么会转而接受胜出节点的信息,相当于胜出节点获得一票支持。如果一个节点能够胜过一半及以上的节点(即获得一半及以上节点的支持),那么这个节点就会最终胜出,成为leader。这个特性称之为过半性

此处需要注意一个问题,判断一个节点是否成为leader,是看这个节点是否过半,而不是只看大小。例如有7个节点参与选举,那么此时6号节点抢先过半,那么就是6号节点成为leader!

而如果一个Zookeeper集群中已经存在了leader,那么此时新添一个或者多个节点,新添的节点的信息无论是什么样子,都只能成为follower。

如果Zookeeper集群中leader丢失,需要注意的是,此时集群只对外提供读服务不提供写服务,同时集群会自动进入选举状态,重新选举出来一个新的leader。

同样,如果在一个集群中出现了2个及以上的leader,这种现象称之为脑裂。Zookeeper集群中产生脑裂的原因:

1)集群产生了分裂。集群分裂可能是因为网络、设备等硬件原因,而硬件存在一定的故障率,所以集群分裂这个问题无法避免。

2)分裂之后进行了选举。选举是由Zookeeper自己来完成的,不是外部原因控制,所以可以从这个条件入手来避免或者解决Zookeeper的脑裂问题。

Zookeeper为了避免脑裂问题,提供了一个方案:如果集群中存活(注意,此处的存活指的是能够相互通信)的节点个数不足一半,那么此时无论是否存在leader,那么此时集群中所有的节点停止服务(对外停止接收请求,对内停止选举和原子广播)。这个特性也是过半性的体现。

因为Zookeeper存在过半性这一特性,所以Zookeeper集群中的节点个数一般是奇数个(偶数可能存在对半现象)。

在Zookeeper集群中,会存在的节点状态:

表-7 节点状态

状态

解释

looking/voting

选举状态

follower

追随者,跟随者

leader

领导者

observer

观察者

其中,observer状态不会发生改变,即一个节点如果设置为observer,那么这个节点从始至终都是observer状态。

需要注意的是,Zookeeper中虽然存在leader和follower,但是这是两种状态,即任意一个节点都可以是leader,也都可能是follower,随时可能发生切换,所以Zookeeper不是严格意义上的主从结构!

observer

在Zookeeper中,随着集群规模的扩大,写效率会越来越低:原子广播需要至少半数的节点才能完成,集群规模扩大,那么需要参与投票的节点也变多,效率也将会变低,因此在Zookeeper中引入了一类特殊的节点:observer。

observer需要在zoo.cfg文件中指定。如果一个节点被指定为observer,那么这个节点将不再参与任何决策,即observer不参与选举不参与原子广播,但是observer会监听选举和原子广播的结果,根据结果来执行对应的操作,可以理解为observer是没有决策权的follower。

因为observer没有决策权,所以在计算过半的时候不需要考虑observer,即Zookeeper集群中的过半性是针对有决策权的节点(leader和follower)而言的。例如一个集群中有21个节点,其中包含1个leader,6个follower以及14个observer。即使14个observer全部宕机,当前集群依然会正常对外提供服务。但是如果有4个follower宕机,那么即使observer全部存活,集群叶不会对外提供服务。

如果需要将哪一个节点设置为observer,那么只需要修改这个节点对应的zoo.cfg即可。

图-5 observer设置

ZAB协议

概述

ZAB(Zookeeper Atomic Broadcast)是专门为Zookeeper设计的一套协议。ZAB协议本身是基于了2PC算法来进行的设计,利用了PAXOS算法来进行了改进。

ZAB协议主要是为了完成原子广播,同时也支持崩溃恢复。

扩展:2PC算法

2PC(Two Phase Commit,二阶段提交)算法,顾名思义,是将一个请求的提交过程拆分成了2个阶段。在2PC算法中,节点可以分为两种角色:协调者(Negotiator)和参与者(Participant)。

阶段一:请求阶段。当集群任意一个节点收到请求之后,需要将请求提交给协调者,然后协调者会将这个请求分发给每一个参与者,并且要求参与者在规定时间内返回一个信号表示是否同意执行这个请求。

图-6 请求阶段

阶段二:提交阶段。如果在规定的时间内所有的参与者都返回了yes,那么协调者会命令所有的参与者执行这个请求。

阶段二:中止阶段。如果没有在规定时间内收到所有参与者的yes(即有的参与者返回了no,或者有的参与者返回超时),那么此时协调者就会放弃执行这个请求,并且命令所有的参与者放弃这个请求。

对于2PC算法而言,要么执行请求-提交,要么执行请求中止。

我们会发现,2PC的核心思想可以总结为:"一票否决",即有一个节点出现问题,那么所有的节点都不执行。

2PC算法的优势非常明显,就是2PC算法的理解和实现过程都相对简单,但是2PC算法劣势也同样非常明显:会非常受外界环境影响,例如网络产生波动、网络阻塞、硬件故障等,都会导致2PC过程的失败。

扩展:PAXOS算法

PAXOS算法是由莱斯利.兰伯特(Leslie Lamport)在1998年发表的论文<The Part-Time Parliament>首次提出,后Lamport于2001年发表<Paxos Made Simple>论文系统的证明了该算法,并且于2013年,Lamport凭借该算法获得了图灵奖。

故事背景:在一个小岛上生活着一群居民,这群居民由议会来统领,平时会通过议会来决定是否通过某项决策。但是议会的议员都是兼职的,那么就意味着议员并不一定每时每刻都会待在议会中。那么在这种情况下,需要一套什么样的规则或者协议,才能保证在随时有议员加入或者退出的前提下,能够持续、快速且一致的通过某项决策呢?

PAXOS算法是一种基于消息传递且具有高度容错性的共识算法。PAXOS算法的作用:如何快速且正确的在一个分布式系统中就某一个决策达成共识,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

在PAXOS算法中,一共有3类角色:提议者(Proposer),接收者(Acceptor),以及学习者(Learner):

1)Proposer:提议者,负责提出提案(Proposal);

2)Acceptor:接收者。接收并回应Proposer的提案;

3)Learner:学习者。不参与决策,学习最后提案的结果。

注意,一个节点可以既是提议者也是接收者。

PAXOS包含三个阶段:

第一阶段:Prepare阶段(准备阶段):

Proposer为当前请求生成一个全局唯一且递增的Proposal ID(提案号),向所有的Acceptor发送Propose请求。注意,此时这个Propose请求无需携带具体的内容,只携带一个Proposal ID即可。

Acceptor收到Propose请求之后进行Promise承诺:不再接收小于等于当前Proposal ID的Propose请求;不再接受小于当前Proposal ID的Accept请求;不违背以前的承诺前提下,回复已经Accept过的提案中Proposal ID最大的提案的Value和Proposal ID,没有则返回空。

第二阶段:Accept阶段(表决阶段):

当Proposer收到超过半数的Acceptor的Promise应答之后,从应答中选择Proposal ID最大的提案的Value作为本次要发起的提案。如果所有的应答的提案Value都是空,那么则Proposer则可以自己随意决定提案Value。然后携带Proposal ID,向所有的Acceptor发起Propose请求。

Acceptor接收到Propose请求之后,在不违背自己之前做出的承诺的前提下,进行Accept处理(接受并且持久化当前的ProposalID和提案)。

第三阶段:Learn阶段:

Proposer将Propose请求发送给所有的Learner,所有的Learner执行请求。

需要注意,PAXOS算法容易存在活锁问题。

原子广播(Atomic Broadcast)

在ZooKeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,ZooKeeper实现了一种主备模式的系统架构来保持集群中各副本之间数据的一致性,实现分布式数据一致性的这一过程称为消息广播(原子广播)。

ZAB协议的消息广播过程使用的是原子广播协议,类似于PAXOS的过程,但是不同的地方在于,在ZAB协议中,只能有一个节点(leader)来发起提案,其他的节点(follower)不能发起提案只能表决提案。

ZooKeeper使用一个单一的主进程(leader服务器)来接收并处理客户端的所有事务请求,并采用ZAB的原子广播协议,将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程(follower或observer)上去。即:所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被称为leader服务器,而余下的其他服务器则成为follower服务器或observer。

具体流程:

1)针对客户端的事务请求,leader服务器会先将该事务写到本地的log文件中;

2)如果记录成功,那么leader服务器会为这次请求生成对应的事务Proposal,并且为这个事务Proposal分配一个全局递增的唯一的事务ID,即Zxid;

3)leader服务器会为每一个follower服务器都各自分配一个单独的队列,将需要广播的事务Proposal依次放入队列中,发送给每一个follower;

4)每一个follower在收到队列之后,会从队列中依次取出事务Proposal,写到本地的事务日志中(这一步操作其实和leader的操作是相同的)。如果写成功了,则follower会给leader返回一个ACK(Acknowledge Character,确认字符)消息;

5)在规定的时间内,如果leader服务器接收到半数的follower返回的ACK响应之后,就会广播一个Commit(提交)消息给所有的follower以通知其进行事务提交,同时leader自身也进行事务提交。

如果需要查看日志文件,则步骤为:

# 进入数据目录

cd ../data/version-2/

# 解析查看日志文件

zkTxnLogToolkit.sh log.200000001

当然,在这种简化了的PAXOS提交模型下,是无法处理Leader服务器崩溃退出而带来的数据不一致问题的,因此在ZAB协议中添加了另一个模式,即采用崩溃恢复模式来解决这个问题。

整个消息广播协议是基于具有FIFO特性的TCP协议来进行网络通信的,因此能够很容易地保证消息广播过程中消息接收与发送的顺序性(即所有节点执行请求的顺序是相同的)。

这个过程中,需要注意的是,如果follower记录日志失败却要执行事务,那么此时follower会给leader发送一个请求,请求重新获取这个事务,重新记录日志,记录成功之后会提交这个事务。

如果一个节点重新连入集群中,那么这个节点会先找到自己最大的事务id,然后将自己最大的事务id发送给leader。leader在收到请求之后会比较这个事务id,如果事务id一致,则跳过;如果事务id不一致,那么leader会将这个节点所欠缺的事务放到队列中返回给这个follower,要求follower在规定时间内完成事务的提交和更新以达到和整个集群相同的水平。

崩溃恢复(Fail Over)

当leader服务器出现崩溃、重启等场景,或者因为网络问题导致过半的follower不能与leader服务器保持正常通信的时候,Zookeeper集群就会进入崩溃恢复模式。进入崩溃恢复模式后,只要集群中存在过半的服务器能够彼此正常通信,那么就可以选举产生一个新的leader。

在Zookeeper集群中,每次新选举出来的leader会自动分配一个全局递增的编号,即Epochid。新选举出来的leader会将自己的Epochid发送给每一个follower,follower收到之后会存储到本地的acceptedEpoch文件中。

当选举产生了新的leader服务器,同时集群中已经有过半的机器与该leader服务器完成了状态同步之后,ZAB协议就会退出崩溃恢复模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和leader服务器的数据保持一致。当集群中已经有过半的follower服务器完成了和leader服务器的状态同步,那么整个服务框架就可以进入消息广播模式了。

当一台同样遵守ZAB协议的服务器启动后加入到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会自觉地进入数据恢复模式:找到leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中。

Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐