一、Zookeeper简介

1.1 Zookeeper是什么

Zookeeper是一个分布式协调服务的开源框架。主要用来解决分布式集群中应用系统的一致性问题,例如怎样避免同时操作同一数据造成脏读的问题。分布式系统中数据存在一致性问题!!!

        Zookeeper本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。
        Zookeeper提供给客户端监控存储在zk内部数据的功能,从而可以达到基于数据的集群管理。诸如:同一命名服务(dubbo)、分布式配置管理(solr的配置集中管理)、分布式消息队列(sub/pub)、分布式锁、分布式协调等功能。

1.2 Zookeeper的架构组成

        

         Leader:(不是手动指定的,而是选举出来的)
                Zookeeper集群工作的核心角色;
                集群内部各个服务器的调度者;
                事务请求(写操作)唯一调度和处理者,保证集群事务处理的顺序性;对于create、delete、setData等有写操作的请求,需要统一发送给leader处理,leader需要决定编号、执行操作,这个过程称为事务。

        Follwer:
                处理客户端非事务请求(读操作)。
                转发事务请求给leader。
                参与集群leader选举投票 2n-1台可以做集群投票。

       此处针对访问量较大的Zookeeper集群,还可以增加观察者角色。
        Observer:
                观察Zookeeper集群的最新状态变化并将这些变化同步过来,其对于非事务请求可以进行独立处理,对于事务请求,转发给leader服务器进行处理。
                不会参与任务形式的投票只是提供非事务服务,通常在不影响集群事务处理能力的前提下提升集群非事务处理能力。增加了集群的并发的读请求。

        ZK也是Master/Slave架构,但是与之前不同的是zk集群中的leader不是指定而来,而是通过选举产生的。

1.3 Zookeeper的特点

        1. Zookeeper:一个领导者(leader),多个跟随者(follower)组成的集群。
        2. Leader负责投票的发起和决议,更新系统状态(内部原理)。
        3. Follower用于接受客户请求,并向客户端返回结果,在选举Leader过程中参与投票。
        4. 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
        5. 全局数据一致:每个server保存一份相同的数据副本,Client无论连接到哪个server,数据都是一致的。
        6. 更新请求顺序进行(内部原理)。
        7. 数据更新原子性,一次数据更新要么成功,要么失败。

二、Zookeeper环境搭建

2.1 Zookeeper环境搭建

        Zookeeper安装方式有三种:单机模式、集群模式、伪集群模式。

        单机模式:Zookeeper只运行在一台服务器上,适合测试环境。
        伪集群模式:就是在一台服务器上运行多个Zookeeper服务器。
        集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”。

2.2 Zookeeper集群搭建

        1. 下载稳定版本的zookeeper http://zookeeper.apache.org/releases.html

        2. 将zookeeper压缩包 zookeeper-3.4.14.tar.gz上传到linux系统/opt/lgsoftware

        3. 解压压缩包:tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/

        4. 修改配置文件,创建data与log目录
                #创建zk存储数据目录
                mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data
                #创建zk日志文件目录
                mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data/logs
                #修改zk配置文件
                cd /opt/lagou/servers/zookeeper-3.4.14/conf
                #文件改名
                mv zoo_sample.cfg zoo.cfg
                vim zoo.cfg
                #更新datadir
                dataDir=/opt/lagou/servers/zookeeper-3.4.14/data
                #增加logdir
                dataLogDir=/opt/lagou/servers/zookeeper-3.4.14/data/logs
                #增加集群配置
                ##server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端⼝
                server.1=linux121:2888:3888
                server.2=linux122:2888:3888
                server.3=linux123:2888:3888
                #打开注释
                #ZK提供自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时
                autopurge.purgeInterval=1
                

         5. 添加myid配置
                在zookeeper的 data 目录下创建一个 myid 文件,内容为1,这个文件就是记录每个服务器的ID。
                cd /opt/lagou/servers/zookeeper-3.4.14/data
                echo 1 > myid

        6. 安装包分发并修改myid
                rsync-script /opt/lagou/servers/zookeeper-3.4.14
                修改myid值 linux122:echo 2 >/opt/lagou/servers/zookeeper-3.4.14/data/myid
                修改myid值 linux123:echo 3 >/opt/lagou/servers/zookeeper-3.4.14/data/myid

        7. 依次启动三个zk示例
                启动命令:/opt/lg/servers/zookeeper-3.4.14/bin/zkServer.sh start

        8. 查看zk启动情况
                /opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh status
        

         9. 集群启动停止脚本
                vim zk.sh

#!/bin/sh
echo "start zookeeper server..."
if(($#==0));then
echo "no params";
exit;
fi
hosts="linux121 linux122 linux123"
for host in $hosts
do
ssh $host "source /etc/profile; /opt/lagou/servers/zookeeper-
3.4.14/bin/zkServer.sh $1"
done

三、Zookeeper数据结构与监听机制

        Zookeeper数据模型Znode
        在Zookeeper中,数据信息被保存在一个个数据节点上,这些节点被称为znode。znode是Zookeeper中最小数据单位,在zNode下面又可以再挂zNode,这样一层层下去就形成了一个层次化命名空间ZNode树,我们称之为ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。如下图:
        

         在Zookeeper中,每一个数据节点都是一个ZNode,上图根目录下有两个节点,分别是:app1 和app2,其中 app1 下面又有三个子节点,所有ZNode按层次化进⾏行组织,形成这么一颗树,ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进⾏行分割的路径表示,开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点。

3.1 ZNode的类型

        Zookeeper节点类型可以分为三大类:
        持久性节点(Persistent)、临时性节点(Ephemeral)、顺序性节点(Sequential)。

        可以通过组合生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点会有不同的生命周期。

        持久节点:Zookeeper中最常见的一种节点类型,节点被创建后会一直存在服务器,直到删除操作主动清除。
        持久顺序节点:就是有顺序的持久节点,节点特性和持久节点一样,只是额外特性表现在顺序上。顺序特性实质上是在创建节点的时候,在节点名后面加上一个数字后缀,来表示其顺序。
        临时节点:就是会被自动清除掉的节点,它的生命周期和客户端回话绑定在一起,客户端回话结束,节点就会被删除。与持久节点不同的是,临时节点不能创建子节点。
        临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀。

        事务ID
        往在现在的概念中,狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原⼦子性(Atomic)、⼀一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
        而在Zookeeper中,事务是指能够改变Zookeeper服务器状态的操作,也称之为事务操作或更新操作。一般包括数据节点的创建与删除、数据节点内容更新等操作。对于每一个事务请求,Zookeeper都会为其分配一个全局唯一的事务ID,用ZXID来表示(这个编号是自增长的),通常是一个64位数字。每一个ZXID对应一次更新操作,从这些ZXID中可以间接的识别出Zookeeper处理这些更新操作请求的全局顺序。

3.2 ZNode的状态信息

        #使⽤用bin/zkCli.sh 连接到zk集群
        [zk: localhost:2181(CONNECTED) 2] get /zookeeper
        cZxid = 0x0
        ctime = Wed Dec 31 19:00:00 EST 1969
        mZxid = 0x0
        mtime = Wed Dec 31 19:00:00 EST 1969
        pZxid = 0x0
        cversion = -1
        dataVersion = 0
        aclVersion = 0
        ephemeralOwner = 0x0
        dataLength = 0
        numChildren = 1

        整个ZNode节点内容包括两部分:节点数据内容和节点状态信息。数据内容是空,其它的属于状态信息。
        cZxid 就是 Create ZXID,表示节点被创建时的事务ID。
        ctime 就是 Create Time,表示节点创建时间。
        mZxid 就是 Modified ZXID,表示节点最后一次被修改时的事务ID。
        mtime 就是 Modified Time,表示节点最后一次被修改的时间。
        pZxid 表示该节点的子节点列表最后一次被修改时的事务 ID。只有子节点列表变更才会更新 pZxid,子节点内容变更不会更新。
        cversion 表示子节点的版本号。
        dataVersion 表示内容版本号。
        aclVersion 标识acl版本
        ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0。
        dataLength 表示数据长度。
        numChildren 表示直系子节点数。

3.3 Watcher机制

        Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能。

        一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使他们能够做出相应的处理。

        在Zookeeper中使用Watcher机制来实现这种分布式的通知功能。Zookeeper允许来客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么zk就会向指定客户端发送一个事件通知来实现分布式的通知功能。

        整个Watcher机制注册与通知过程如图所示:
        

         Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。

        具体工作流程为:
        1. 客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中。
        2. 当Zookeeper服务器触发Watcher事件后,会向客户端发送通知。
        3. 客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑。

四、Zookeeper的基本使用

4.1 Zookeeper命令行使用

        首先进入到Zookeeper的bin目录后,通过zkClient进入Zookeeper客户端命令行
        ./zkcli.sh 连接本地的Zookeeper服务器
        ./zkcli.sh -server ip:port(2181) 连接指定的服务器
        连接成功之后,系统会输出Zookeeper的相关环境及配置信息。输入help之后,屏幕会输出可用的Zookeeper命令,如下图:
        

         

         1. 创建节点
        使用create命令,可以创建一个Zookeeper节点,如
        create [-s][-e] path data
        其中 -s或 -e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点。

        1.1 创建顺序节点
        create -s /zk-test 123
        执行完成后,就在根节点下面创建了一个叫zk-test的节点,该节点内容是123,同时可以看到创建的zk-test节点后面添加了一串数字以示区别。

        1.2 创建临时节点
        create -e /zk-temp 123
        临时节点在客户端会话结束后,就会自动删除,下面使用quit命令退出客户端,再次使用客户端连接服务端,并使用ls /查看根目录下节点,可以看到根目录下已经不存在zk-temp临时节点了。

        1.3 创建永久节点
        create /zk-permanent 123

        2. 读取节点
        与读取相关的命令有ls命令和get命令

        ls命令可以列出Zookeeper指定节点下的所有子节点,但只能查看指定节点下的第一级所有子节点。
        ls path (其中path表示的是指定数据节点的节点路径)
        ls /       (获取跟节点下的所有子节点)

        get命令可以获取Zookeeper指定节点下的数据内容和属性信息。
        get path
        get /permanent
        

         3. 更新节点
        使用set命令,可以更新指定节点的数据内容。
        set path data
        其中data是要更新的新内容,version表示数据的版本,在Zookeeper中,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于Znode的哪一个数据版本进行的。                           set /permanent 456
        

         现在dataVersion以及变为1了,表示进行了更新。

        4. 删除节点
        使用delete命令可以删除Zookeeper上的指定节点
        delete path
        其中version也是表示数据版本,使用delete /zk-permanent 命令即可删除/zk-permanent节点
        注意:若删除节点存在子节点,那么无法删除此节点,必须先删除子节点,再删除父节点。

4.2 Zookeeper开源客户端

        ZkClient是Github上一个开源的zook客户端,在Zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。

        添加依赖
        pom.xml

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.2</version>
</dependency>

        1. 创建会话
        使用ZkClient创建会话,连接到服务器 

package com.hust.grid.leesf.zkclient.examples;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
    /*
    创建⼀一个zkClient实例例来进⾏行行连接
    */
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("linux121:2181");
        System.out.println("ZooKeeper session created.");
    }
}

        运⾏行行结果:ZooKeeper session created. 结果表明已经成功创建会话 。

        2. 创建节点
        ZkClient提供了递归创建节点的接口,先完成父节点的创建,再创建子节点。

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Create_Node_Sample {
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("linux123:2181");
        System.out.println("ZooKeeper session established.");
        //createParents的值设置为true,可以递归创建节点
        zkClient.createPersistent("/lg-zkClient/lg-c1",true);
        System.out.println("success create znode.");
    }
}

        运⾏行行结果:success create znode.结果表明已经成功创建了节点,值得注意的是ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点 。

        3. 删除节点
        ZkClient提供了递归删除节点的接口,即先删除所有子节点(存在),再删除父节点。

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/lg-zkClient/lg-c1";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.deleteRecursive(path);
        System.out.println("success delete znode.");
    }
}

        运行结果: success delete znode.结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点 。

        4. 监听节点变化

package com.lagou.zk.demo;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import java.util.List;
/*
演示zkClient如何使⽤用监听器器
*/
public class Get_Child_Change {
    public static void main(String[] args) throws InterruptedException {
        //获取到zkClient
        final ZkClient zkClient = new ZkClient("linux121:2181");
        //zkClient对指定⽬目录进⾏行行监听(不不存在⽬目录:/lg-client),指定收到通知之后的逻辑
        //对/lag-client注册了了监听器器,监听器器是⼀一直监听
        zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() {
            //该⽅方法是接收到通知之后的执⾏行行逻辑定义
            public void handleChildChange(String path, List<String> childs) throws Exception {
            //打印节点信息
            System.out.println(path + " childs changes ,current childs " + childs);
            }
        });
    
        //使⽤用zkClient创建节点,删除节点,验证监听器是否运行
        zkClient.createPersistent("/lg-client");
        Thread.sleep(1000); //只是为了了⽅方便便观察结果数据
        zkClient.createPersistent("/lg-client/c1");
        Thread.sleep(1000);
        zkClient.delete("/lg-client/c1");
        Thread.sleep(1000);
        zkClient.delete("/lg-client");
        Thread.sleep(Integer.MAX_VALUE);

        /*
        1 监听器器可以对不不存在的⽬目录进⾏行行监听
        2 一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端
        3 监听⽬目录创建和删除本身也会被监听到
        */
    }
}

        运行结果:
        /lg-zkClient 's child changed, currentChilds:[]
        /lg-zkClient 's child changed, currentChilds:[c1]
        /lg-zkClient 's child changed, currentChilds:[]
        /lg-zkClient 's child changed, currentChilds:null 

        5. 获取数据(节点是否存在、更新、删除)

package com.lagou.zk.demo;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
//使⽤用监听器器监听节点数据的变化
public class Get_Data_Change {
    public static void main(String[] args) throws InterruptedException {
        // 获取zkClient对象
        final ZkClient zkClient = new ZkClient("linux121:2181");
    
        //设置⾃自定义的序列列化类型,否则会报错!!
        zkClient.setZkSerializer(new ZkStrSerializer());

        //判断节点是否存在,不不存在创建节点并赋值
        final boolean exists = zkClient.exists("/lg-client1");
        if (!exists) {
            zkClient.createEphemeral("/lg-client1", "123");
        }

        //注册监听器器,节点数据改变的类型,接收通知后的处理理逻辑定义
        zkClient.subscribeDataChanges("/lg-client1", new IZkDataListener() {
            public void handleDataChange(String path, Object data) throws Exception {
                //定义接收通知之后的处理理逻辑
                System.out.println(path + " data is changed ,new data " + data);
            }
            //数据删除--》节点删除
            public void handleDataDeleted(String path) throws Exception {
                System.out.println(path + " is deleted!!");
            }
        });
        
        //更更新节点的数据,删除节点,验证监听器器是否正常运⾏行行
        final Object o = zkClient.readData("/lg-client1");
        System.out.println(o);
        zkClient.writeData("/lg-client1", "new data");
        Thread.sleep(1000);
        //删除节点
        zkClient.delete("/lg-client1");
        Thread.sleep(Integer.MAX_VALUE);
    }
}


package com.lagou.zk.demo;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class ZkStrSerializer implements ZkSerializer {
    //序列列化,数据--》byte[]
    public byte[] serialize(Object o) throws ZkMarshallingError {
        return String.valueOf(o).getBytes();
    }
    //反序列列化,byte[]--->数据
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        return new String(bytes);
    }    
}

        运行结果:
        123
        /lg-client1 data is changed ,new data new data
        /lg-client1 is deleted!! 
        结果表明可以成功监听节点数据变化或删除事件。

五、Zookeeper的内部原理

5.1 Leader选举

        选举机制
        半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
        Zookeeper虽然在配置文件中没有指定Master和Slave。但是Zookeeper工作时,是有一个节点为Leader,其它为Follower,Leader是通过内部的选举机制产生的。

        集群首次启动
        假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么?
        

         Zookeeper的选举机制:       
        (1)服务器1启动,此时只有它1台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。
        (2)服务器2启动,它与最开始启动的服务器1进行通信,相互交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都统一选举它(这个例子半数以上是3)所以服务器1、2还是继续保持LOOKING状态。
        (3)服务器3启动,根据上面的分析,服务器成为1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成了这个选举的Leader。
        (4)启动服务器4,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但由于前面已经有半数以上的服务器选举了服务器3,所以它只能接受当小弟的命了。
        (5)服务器5启动,同4一样成为follower。

        集群非首次启动:
        每个节点在选举时都会参考自身节点的zxid值(事务ID);优先选择zxid值最大的节点成为Leader。

5.2 ZAB一致性协议

        1. 分布式数据一致性问题

        为什么会出现分布式数据一致性问题?
        将数据复制到分布式部署的多台机器中,可以消除单点故障问题,防止系统由于某台(些)机器宕机导致不可用。
        通过负载均衡技术,能够让分布在不同地方的数据副本全都对外提供服务。有效提高系统性能。

        在分布式系统中引入数据复制机制后,多台数据节点之间由于网络等原因很容易产生数据不一致的情况。
        举例:
        当客户端Client1将系统中的一个值K1由V1更新为V2,但是客户端Client2读取的是一个还没有同步更新的副本,K1的值依然是V1,这就导致了数据的不一致性。其中,常见的就是主从数据库之间的复制延时问题。
        
        

         2. ZAB协议

        ZK就是分布式一致性问题的工业解决方案,paxos是其底层理论方法(晦涩难懂著名),其中zab,raft和其它众多开源算法是对paxos的工业级实现。ZK没有完全采用paxos算法,而是使用了一种成为Zookeeper Atomic Brodcast(ZAB,Zookeeper原子消息广播协议)的协议作为其数据一致性的核心算法。

        ZAB协议是为分布式协调服务Zookeeper专门设计的一种支持崩溃恢复和原子广播协议。

        主备模式保持一致性
        

         ZK怎么处理集群中的数据?所有客户端写入数据都写入到Leader中,然后Leader复制到Follower中。ZAB会将服务器数据的状态以事务Proposal的形式广播到所有副本进程上,ZAB协议能够保证了事务操作的一个全局的变更序号(ZXID)。

        广播消息
        ZAB协议的消息广播过程类似于二阶段提交过程。对于客户端发送的写请求,全部由Leader接受,Leader将请求封装成一个事务Proposal(提议),将其发送给所有Follower,如果收到超过半数反馈ACK,则执行commit操作(先提交自己,再发送commit给所有Follower)。
        1. 发送Proposal到Follower
        

         2. Leader接受Follower的ACK
        

         3. 超过半数ACK则Commit
        

         不能正常反馈的Follower,恢复正常后会进入数据同步阶段,最终与Leader保持一致。

        细节:
        Leader收到Client请求后,会将请求封装成一个事务,并给这个事务分配一个全局递增的唯一的ID,称为事务ID(ZXID),ZAB协议要求保证事务的顺序,因此必须将每一个事务按照ZXID进行先后排序然后处理。
        ZK集群为了保证任何事务操作能够有序的顺序执行,只能是Leader服务器接受写请求,即使是Follower服务器接受到客户端的写请求,也会转发到Leader服务器进行处理。

        ZK提供的应该是最终一致性的标准,zk所有节点接收写请求之后,可以在一定时间内保证所有节点都能看到该条数据。!!!

        Leader崩溃问题
        Leader宕机后,ZK集群无法正常工作,ZAB协议提供了一个高效且可靠的Leader选举算法。
        Leader宕机后,被选举的新的Leader需要解决的问题:
                1. ZAB协议确保那些已经在Leader提交的事务,最终会被所有服务器提交。
                2. ZAB协议确保丢弃那些只在Leader提出/复制,但没有提交的事务。
        基于上面的目的,ZAB协议设计了一个选举算法:能够确保已被Leader提交的事务被集群接受,丢弃还没有提交的事务。
        这个选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最大编号的事务(ZXID)。

六、Zookeeper应用实践

        Zookeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使用它来进行分布式数据的发布与订阅。另一方面,通过Zookeeper中丰富的数据节点类型进行较差使用,配合Watcher事件通知机制,可以非常方便的构建一系列分布式应用中会涉及的核心功能,如数据发布/订阅、命名服务、集群管理、Master选举、分布式锁和分布式队列等。

        Zookeeper的两大特性:
        1. 客户端如果对Zookeeper的数据节点注册了Watcher监听,那么当该数据节点的内容或其子节点列表发生变化时,Zookeeper服务器将就会向订阅的客户端发生变更通知。
        2. 对Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除。
        利用其两大特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[HostName],这样系统就能实时检测机器的动态变化。

6.1 服务器动态上下线监听

        分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,而任意⼀一台客户端都要能实时感知到主节点服务器的上下线。

        思路分析:
        

         Zookeeper服务器

package com.lagou.offonline;

import org.I0Itec.zkclient.ZkClient;

public class ZKServer {

    private ZkClient zkClient;

    // 获取到ZK对象
    private void connectZK(){
        zkClient = new ZkClient("linux121:2181,linux122:2181.linux123:2181");
        if (!zkClient.exists("/servers")){
            zkClient.createPersistent("/servers");
        }
    }

    // 注册服务端信息到zk节点
    private void registerServerInfo(String ip, String port){
        // 创建临时顺序节点
        String path = zkClient.createEphemeralSequential("/servers/server", ip + ":" + port);
        System.out.println("服务器注册成功, ip="+ip + "; port="+port + "; 节点路径信息=" + path);
    }

    public static void main(String[] args) {
        ZKServer zkServer = new ZKServer();
        zkServer.connectZK();

        zkServer.registerServerInfo(args[0], args[1]);

        // 启动提个服务线程,提供查询时间
        TimeServer timeServer = new TimeServer(Integer.parseInt(args[1]));
        timeServer.start();
    }
}

        服务端提供时间查询的线程类

package com.lagou.offonline;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

public class TimeServer extends Thread {

    private int port = 0;

    public TimeServer(int port){
        this.port = port;
    }

    @Override
    public void run() {
        // 启动ServerSocket监听一个端口
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true){
                Socket socket = serverSocket.accept();
                OutputStream out = socket.getOutputStream();
                out.write(new Date().toString().getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

         Client端

package com.lagou.offonline;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

// 注册监听zk指定目录
// 维护自己本地的一个servers信息,收到通知要进行更新
// 发送时间查询请求,并接受服务端返回的数据
public class Client {

    // 获取zkclient
    private ZkClient zkClient;
    // 维护一个servers信息集合
    private ArrayList<String> infos = new ArrayList<String>();

    // 获取到ZK对象
    private void connectZK(){
        zkClient = new ZkClient("linux121:2181,linux122:2181.linux123:2181");
        // 获取服务器信息,所有子节点
        List<String> childs = zkClient.getChildren("/servers");
        for (String child : childs){
            // 存着ip+port
            Object o = zkClient.readData("/servers/" + child);
            infos.add(String.valueOf(o));
        }

        // 对servers目录进行监听
        zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> children) throws Exception {
                ArrayList<String> list = new ArrayList<String>();
                for (String path : children){
                    Object o = zkClient.readData("/servers/" + path);
                    list.add(String.valueOf(o));
                }

                // 新数据覆盖老数据
                infos = list;
                System.out.println("==>接受到新通知,最新服务器信息为:"+list);
            }
        });
    }

    // 发送时间查询的请求
    public void sendRequest() throws IOException {
        // 目标服务器地址
        Random random = new Random();
        int i = random.nextInt(infos.size());
        String ipPort = infos.get(i);
        String[] arr = ipPort.split(":");

        // 建立socket连接
        Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));
        OutputStream out = socket.getOutputStream();
        InputStream in = socket.getInputStream();

        // 发送数据
        out.write("query time".getBytes());
        out.flush();

        // 接受返回数据
        byte[] bs = new byte[1024];
        in.read(bs);
        System.out.println("client接受到server:"+ ipPort + "返回结果:" + new String(bs));

        // 释放资源
        in.close();
        out.close();
        socket.close();
    }

    public static void main(String[] args) throws InterruptedException {
        Client client = new Client();
        client.connectZK(); // 开始监听

        while (true){
            try {
                client.sendRequest();
            } catch (IOException e) {
                e.printStackTrace();
            }
            Thread.sleep(9000);
        }
    }
}

6.2 分布式锁

        1. 什么是锁
        在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全(数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串行执行消除并发修改变量。
        对变量或堆代码块做同步本质上就是加锁。目的就是实现多个线程在同一时刻同一代码块只能由一个线程可执行。

        2. 分布式锁
        分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全问题。观察下面问题:
        

         上面设计存在线程安全问题。
        假设Redis 里面的某个商品库存为 1;此时两个用户同时下单,其中一个下单请求执行到第 3 步,更新数据库的库存为 0,但是第 4 步还没有执行。而另外一个用户下单执行到了第 2 步,发现库存还是 1,就继续执行第 3 步。但是商品库存已经为0,所以如果数据库没有限制就会出现超卖的问题。
        解决办法:
        用锁把 2、3、4 步锁住,让他们执行完之后,另一个线程才能进来执行。
        

         公司业务发展迅速,系统应对并发不断提⾼高,解决方案是要增加一台机器,结果会出现更更大的问题。
        

         假设有两个下单请求同时到来,分别由两个机器器执行,那么这两个请求是可以同时执行了,依然存在超卖的问题。
        因为如图所示系统是运行在两个不不同的 JVM 里面,不同的机器上,增加的锁只对自己当前 JVM 里面的线程有效,对于其他 JVM 的线程是无效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同一个锁,此时分布式锁就能解决该问题。

        分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作时都需要获得到该锁,才能进行相应操作。

        3. zk实现分布式锁
       
利用zookeeper可以创建临时带序号节点的特性来实现一个分布式锁。
        实现思路:
                锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时序列节点,因为zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
                每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
                获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或掉线zk删除临时节点)这个节点会获取到通知,代表获取到了锁。
               流程图:
                

         代码实现:

package com.lagou.dislock;

// 实现分布式锁
public class DisLockTest {
    public static void main(String[] args) {
        // 使用10个线程模拟分布式环境
        for (int i=0; i<10; i++){
            // 启动线程
            new Thread(new DisLockRunnable()).start();
        }
    }
    static class DisLockRunnable implements Runnable{
        @Override
        public void run() {
            // 每个线程具体的任务,每个线程就是抢占锁
            DisClient disClient = new DisClient();
            disClient.getDisLock();
            // 模拟获取锁之后其它动作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 释放锁
            disClient.deleteLock();
        }
    }
}
package com.lagou.dislock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 抢锁
 * 1. 去zk创建临时顺序节点,并获取到序号
 * 2. 判断自己创建的节点序号是否是当前节点最小序号,如果是则获得锁,执行相关操作,最后要释放锁
 * 3. 如果不是最下节点,当前线程需要等待,等待前一个序号的节点被删除,然后再判断自己是否是最小节点
 */
public class DisClient {

    // 获取到zkclient
    private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181,linux123:2181");
    // 前一个节点
    private String beforNodePath;
    // 当前节点
    private String currentNodePath;

    private CountDownLatch countDownLatch;

    public DisClient() {
        // 初始化zk的/distrilock节点,会出现线程安全问题
        synchronized (DisClient.class){
            if(!zkClient.exists("/distrilock")){
                zkClient.createPersistent("/distrilock");
            }
        }
    }
    /**
     * 完整获取锁方法
     */
    public void getDisLock(){

        // 获取当前线程名
        String threadName = Thread.currentThread().getName();
        // 先调用tryGetLock,判断是否能获取到锁
        if (tryGetLock()){
            // 说明获取到锁
            System.out.println(threadName + ": 获取到了锁!");
        }else {
            // 没有获取到锁
            System.out.println(threadName + ": 获取锁失败,进入等待状态!");
            waitForLock();
            // 递归获取锁
            getDisLock();
        }

    }

    /**
     * 尝试获取锁
     * @return
     */
    public boolean tryGetLock(){
        // 创建临时顺序节点,/distrilock/序号
        if(null == currentNodePath || "".equals(currentNodePath)){
            currentNodePath = zkClient.createEphemeralSequential("/distrilock/", "lock");
        }
        // 获取/distrilock下面的子节点
        List<String> childs = zkClient.getChildren("/distrilock");
        // 对子节点按序号进行排序,默认是升序
        Collections.sort(childs);
        // 取最小序号节点
        String minNode = childs.get(0);
        // 判断当前节点是否是最小序号节点
        if (currentNodePath.equals("/distrilock/"+minNode)){
            // 当前节点就是最小序号节点
            return true;
        }else {
            // 当前节点不是最小序号节点,要监控当前节点的前一个序号节点
            // 当前节点在list中的排序
            int i = Collections.binarySearch(childs, currentNodePath.substring("/distrilock/".length()));
            // 前一个序号节点
            String beforeNode = childs.get(i - 1);
            beforNodePath = "/distrilock/" + beforeNode;
        }
        return false;
    }

    /**
     * 等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock
     */
    public void waitForLock(){

        // 准备一个监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String path) throws Exception {
                // 把值减1,变为0,唤醒之前await的线程。并再次提醒当前线程获取锁
                countDownLatch.countDown();
            }
        };

        // 监控前一个序号节点
        zkClient.subscribeDataChanges(beforNodePath, iZkDataListener);

        // 在监听的通知没来之前,该线程应该是等待状态,先判断一次上一个节点是否存在
        if (zkClient.exists(beforNodePath)){
            // 上一个序号节点存在
            // 开始等待,CountDownLatch:线程同步计数器
            countDownLatch = new CountDownLatch(1);
            try {
                // 阻塞,等待CountDownLatch值变为0
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 解除监听
        zkClient.unsubscribeDataChanges(beforNodePath, iZkDataListener);
    }

    public void deleteLock(){
        if(null != zkClient){
            zkClient.delete(currentNodePath);
            zkClient.close();
        }
    }
}

        分布式锁的实现可以是Redis、Zookeeper,相对来说生产环境如果使用分布式锁,可以考虑Redis而非Zookeeper。

七、Hadoop HA

7.1 HA概述

        1. 所谓HA(High Available),即高可用(7*24小时不中断服务)。
        2. 实现高可用最关键的策略是消除单点故障。Hadoop的HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。
        3. Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)。
        4. NameNode主要在以下两个方面影响HDFS集群。
                NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启。
                NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用。

        HDFS HA功能通过配置Active/Standby两个NameNode实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器升级需要维护,这时可以通过此种方式将NameNode很快的切换到另一台机器上。

7.2 HDFS-HA工作机制

        通过NameNode消除单点故障(Active/Standby)。

7.2.1 HDFS HA 工作要点

        1. 元数据管理方式需要改变
                内存中各保存一份元数据;
                Edits日志只有Active状态的NameNode节点可以做写操作;
                两个NameNode都可以读Edits;
                共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现);

        2. 需要一个状态管理功能模块
                实现了一个zkfailover,常驻在每一个NameNode所在的节点,每一个zkfailover负责监控自己所在的NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时要防止brain split现象(集群中出现两个Active的NameNode)的发生。

        3. 必须保证两个NameNode之间能够ssh无密码登录。

        4. 隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务。

7.2.2 HDFS HA工作机制

        配置部署HDFS-HA进行自动故障转移。自动故障转移是为HDFS部署增加了两个新组建:Zookeeper和ZKFailoverController(ZKFC)进程,Zookeeper是维护少量协调数据,通知客户端这些数据的改变和监听客户端故障的高可用服务。

        HA的自动故障转移依赖于Zookeeper的以下功能:
        故障检测:
                集群中每个NameNode在Zookeeper中维护了一个临时会话。如果有机器崩溃,Zookeeper中的会话将终止。Zookeeper会通知另外一个NameNode需要出发故障转移。
        现役NameNode选择:
                Zookeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果现役NameNode崩溃,另外一个节点可能从Zookeeper获得特殊的排外锁表明它应该成为现役的NameNode。

        ZKFC是自动故障转移的另外一个新组件,是Zookeeper的客户端,也是监听和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC线程,ZKFC负责:
        健康检测:
                ZKFC使用一个健康检查命令,定期ping与之在相同主机的NameNode,只要该NameNode及时的回复健康状态,ZKFC就认为该节点是健康的。如果该检点崩溃、冻结或进入不健康状态,健康检测器标识该节点为非健康的。
        Zookeeper会话管理:
                当本地的NameNode是健康的,ZKFC保持一个在Zookeeper中打开的会话。如果本地NameNode处于active状态,zkfc也保持一个特殊的znode锁,该锁使用了zookeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
        基于Zookeeper的选择:
                如果本地NameNode是健康的,且ZKFC发现没有其它节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责故障转移进程以使它的本地NameNode为active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转为active状态。

        

7.3 HDFS-HA集群配置

        https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html

        1. 环境准备
                修改IP
                修改主机名及主机名和IP地址的映射
                关闭防火墙
                ssh免密登录
                安装JDK、配置环境变量等

        2. 集群规划
                

         3. 启动Zookeeper集群
                启动:zk.sh start
                查看状态:zk.sh status

        4. 配置HDFS-HA集群
                先停止HDFS集群:stop-dfs.sh
                在所有节点,opt/lg/servers目录下创建一个ha文件夹
                将opt/lg/servers下的hadoop-2.9.2拷贝到ha目录下:cp -r hadoop-2.9.2 ha
                删除原集群data目录:rm -rf /opt/lg/servers/ha/hadoop-2.9.2/data
                配置hdfs-site.xml

<property>
    <name>dfs.nameservices</name>
    <value>lagoucluster</value>
</property>
<property>
    <name>dfs.ha.namenodes.lagoucluster</name>
    <value>nn1,nn2</value>
</property>
<property>
    <name>dfs.namenode.rpc-address.lagoucluster.nn1</name>
    <value>linux121:9000</value>
</property>
<property>
    <name>dfs.namenode.rpc-address.lagoucluster.nn2</name>
    <value>linux122:9000</value>
</property>
<property>
    <name>dfs.namenode.http-address.lagoucluster.nn1</name>
    <value>linux121:50070</value>
</property>
<property>
    <name>dfs.namenode.http-address.lagoucluster.nn2</name>
    <value>linux122:50070</value>
</property>
<property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://linux121:8485;linux122:8485;linux123:8485/lagou</value>
</property>
<property>
    <name>dfs.client.failover.proxy.provider.lagoucluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence</value>
</property>
<property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/root/.ssh/id_rsa</value>
</property>
<property>
    <name>dfs.journalnode.edits.dir</name>
    <value>/opt/journalnode</value>
</property>
<property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
</property>

        配置core-site.xml

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://lagoucluster</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/lagou/servers/ha/hadoop-2.9.2/data/tmp</value>
</property>
<property>
    <name>ha.zookeeper.quorum</name>
    <value>linux121:2181,linux122:2181,linux123:2181</value>
</property>

         拷贝配置好的hadoop环境到其他节点
                rsync-script /opt/lg/servers/ha/hadoop-2.9.2/

        5. 启动HDFS-HA集群
                 在各个JournalNode节点上,输入以下命令启动journalnode服务(去往HA安装目录,不不使用环境变量量中命令)
                /opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start journalnode

                在[nn1]上,对其进行格式化,并启动
                /opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -format
                /opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode

                在[nn2]上,同步nn1的元数据信息
                /opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -bootstrapStandby

                在[nn1]上初始化zkfc
                /opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs zkfc -formatZK

                在[nn1]上,启动集群
                /opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-dfs.sh

        6. 验证
                将Active NameNode进程kill:kill -9 namenode的进程id

7.4 YARN-HA配置

7.4.1 YARN-HA工作机制

        1. 官网文档:
                https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-
site/ResourceManagerHA.html

        2. YARN工作机制

7.4.2 配置YARN-HA集群

        1. 环境准备
                修改IP
                修改主机名及主机名和IP地址的映射
                关闭防⽕火墙
                ssh免密登录
                安装JDK,配置环境变量等
                配置Zookeeper集群

        2. 规划集群
                

         3. 具体配置
               (1) yarn-site.xml


    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!--启⽤用resourcemanager ha-->
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <!--声明两台resourcemanager的地址-->
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>cluster-yarn</value>
    </property>
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>linux122</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>linux123</value>
    </property>
    <!--指定zookeeper集群的地址-->
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>linux121:2181,linux122:2181,linux123:2181</value>
    </property>
    <!--启⽤用⾃自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
    </property>
    <!--指定resourcemanager的状态信息存储在zookeeper集群-->
    <property>
        <name>yarn.resourcemanager.store.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</
value>
    </property>

                (2) 同步更新其它节点的配置信息
                        rsync-script yarn-site.xml

        3. 启动yarn
                sbin/start-yarn.sh

                注意:需要在linux122上单独启动下:yarn-demon.sh start resourcemanager

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐