资料:zookeeper入门源码

基本概念

大数据生态系统里很多组件的命名都是某种动物,例如Hadoop是🐘,hive是🐝,zookeeper就是动物园管理者,是管理大数据生态系统各组件的管理员。

zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

应用场景

  • 维护配置信息
    Java编程经常会遇到配置项,例如数据库的user、password等,通常配置信息会放在配置文件中,再把配置文件放在服务器上。当需要修改配置信息时,要去服务器上修改对应的配置文件,但在分布式系统中很多服务器都需要使用该配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置的一致性。通常会将配置文件部署在一个集群上,但一个集群涉及的服务器数量是很庞大的,如果一台台服务器逐个修改配置文件是效率很低且危险的,因此需要一种服务可以高效快速且可靠地完成配置项的更改工作。
    zookeeper就可以提供这种服务,使用Zab一致性协议保证一致性。hbase中客户端就是连接zookeeper获得必要的hbase集群的配置信息才可以进一步操作。在开源消息队列Kafka中,也使用zookeeper来维护broker的信息。在dubbo中也广泛使用zookeeper管理一些配置来实现服务治理。

  • 分布式锁服务
    一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,在多台服务器运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器故障,释放锁并fall over到其他机器继续执行。

  • 集群管理
    zookeeper会将服务器加入/移除的情况通知给集群中其他正常工作的服务器,以及即使调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复。

  • 生成分布式唯一ID
    在过去的单库单表系统中,通常使用数据库字段自带的auto_increment熟悉自动为每条记录生成一个唯一的id。但分库分表后就无法依靠该属性来标识一个唯一的记录。此时可以使用zookeeper在分布式环境下生成全局唯一性id。每次要生成一个新id时,创建一个持久顺序结点,创建操作返回的结点序号,即为新id,然后把比自己结点小的删除。

设计目标

  • 高性能
    将数据存储在内存中,直接服务于客户端的所有非事务请求,尤其适合读为主的应用场景
  • 高可用
    一般以集群方式对外提供服务,每台机器都会在内存中维护当前的服务状态,每台机器之间都保持通信。只要集群中超过一般机器都能正常工作,那么整个集群就能够正常对外服务。
  • 严格顺序访问
    对于客户端的每个更新请求,zookeeper都会生成全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。

数据结构

zookeeper的数据结点可以视为树状结构(或者目录),树中各节点成为znode,一个znode可以有多个子结点。zookeeper结点在结构上表现为树状,使用路径来定位某个znode。
znode兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。

znode大体上分为三部分(使用get命令查看)
①结点的数据
②结点的子结点
③结点的状态

结点类型(在创建时被确定且不能更改)
①临时结点:生命周期依赖于创建它的会话,会话结束结点将被自动删除。临时结点不允许拥有子结点。
②持久化结点:生命周期不依赖于会话,只有在客户端显式执行删除操作时才被删除。

安装(我恨linux)

使用centos8虚拟机,先创建一个zookeeper用户
在这里插入图片描述
zookeeper是基于jdk的,先安装jdk1.8:安装JDK

安装zookeeper(我是真的烦Linux 找个下载地址找一年 还慢死):

wget  http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

30kb/s 慢慢等吧
在这里插入图片描述
终于下载完了,tar -xzvf zookeeper-3.4.14.tar.gz进行解压
在这里插入图片描述
然后进入zookeeper的conf配置目录,复制zoo_sample.cfg 命名zoo.cfg。然后回到上级目录,创建一个data目录。
在这里插入图片描述
然后进入conf目录,使用vi命令编辑刚才复制的zoo.cfg配置文件,打开后按a进入编辑模式。
在这里插入图片描述
修改datadir为刚才创建的data目录,然后按esc,再按:到文件末尾,使用wq保存退出。
在这里插入图片描述
卧槽…居然成功啦
cd ..返回zookeeper根目录,再cd bin进入bin目录,./zkServer.sh start 启动zookeeper,等待一段时间后,./zkServer.sh status查看zookeeper是否成功启动,Mode显示的是当前是单机状态
在这里插入图片描述
通过客户端连接本机zookeeper服务./zkCli.sh,登录成功后会打印日志信息
在这里插入图片描述
关闭zookeeper:
在这里插入图片描述

基本命令

新增结点

//-s为有序结点,-e为临时结点
create [-s] [-e] path data 

先启动服务器
在这里插入图片描述
确实已经启动
在这里插入图片描述
通过客户端连接服务器
在这里插入图片描述

  • 创建持久化结点
    path是/hadoop 数据是“123456” 不加选项默认是持久化结点
    在这里插入图片描述
    通过get读取数据
    在这里插入图片描述
    持久化结点与当前会话无关,quit退出会话后再次登陆,get数据还在
    在这里插入图片描述

  • 创建持久化有序结点
    创建路径为/a,会自动补为/a0000000001
    在这里插入图片描述

  • 创建临时结点
    临时结点在会话结束后会被删除
    在这里插入图片描述
    quit结束会话再次登陆,不能读取到数据
    在这里插入图片描述

  • 创建临时有序结点(用于分布式锁)
    在这里插入图片描述

更新结点

更新结点的命令是set,可以直接进行修改

修改/hadoop的值从123456变为i want offer
在这里插入图片描述
成功
在这里插入图片描述

也可以基于版本号更改,类似CAS机制,当传入数据版本号和当前不一致时拒绝修改,初始版本号dataVersion为0,每次修改后会加1

当前版本为1,修改时使用1版本号,修改完变为2
在这里插入图片描述
版本号为2时,使用3会失败
在这里插入图片描述


删除结点

使用delete命令,同样可以传入版本号,如果版本号不符合也不会执行
版本号为2时,使用3删除失败,使用2删除成功
在这里插入图片描述
如果当前结点下有子结点,不能删除,如果要删除需要使用rmr
在这里插入图片描述

查看结点

使用get查看结点的属性和数据
在这里插入图片描述
cZxid 数据结点创建时的事务id
ctime 数据结点创建时间
mZxid 数据结点最后一次更新时的事务id
mtime 数据结点最后一次更新的时间
pZxid 子结点最后一次修改的事务id
cversion 子结点的更改次数
dataVersion 结点数据更改次数
aclVersion 结点ACL的更改次数
ephemeralOwner 如果是临时结点,表示会话的sessionID;如果是持久结点值为0
dataLength 数据内容长度
numChildren 子结点数

使用stat只返回属性 没有数据
在这里插入图片描述

查看结点列表
使用ls pathls2 path,ls2是ls的增强,除了列出子结点还有当前结点的属性
在这里插入图片描述

监听器(维护配置信息)

使用get path watch,监听器只能使用一次
左边客户端使用监听器,右边客户端更改了数据,左边监听到了数据的更改
在这里插入图片描述
也可以使用stat path watch
在这里插入图片描述
还可以使用ls/ls2 path watch 可以监听到子结点

ACL权限控制

类似linux针对文件的权限控制
acl权限控制使用scheme:id:permission标识,
①scheme :权限模式

  • world 只有一个用户,代表登陆zookeeper的所有人(默认)
  • ip 对客户端使用ip地址认证
  • auth 使用已添加认证的用户认证
  • digest 使用用户名:密码方式认证

②id :授权对象
③permission :授予的权限

  • create 简写c 可以创建子结点
  • delete 简写d 可以删除子结点(仅下一级)
  • read 简写r 可以读取结点数据以及显示子结点
  • write 简写w 可以设置结点数据
  • admin 简写a 可以设置节点访问控制列表权限

例如setAcl /hadoop ip:192.168.2.142:crwda 表示ip地址为192.168.2.142的客户端对/hadoop有全部权限
getAcl path 读取权限
setAcl path acl 设置权限
addauth schema suth 添加认证用户

world授权模式

setAcl path world:anyone:<acl>
例如取消c创建权限,就不能再创建子结点
在这里插入图片描述
取消d权限,不能删除子结点
在这里插入图片描述

ip授权模式

命令setAcl path ip:<ip>:<acl>
要用两个虚拟机,这个就不演示了…
主要就是限制客户端ip地址的

auth模式

`addauth digest <user>:<password> `添加认证用户
`setAcl <path> auth:<user>:<acl> 	` 授权

在这里插入图片描述
另一台客户端如果没有添加该用户就不能读取

digest授权模式

setAcl <path> digest:<user>:<password>:<acl>
这里的密码是经过SHA1及BASE64处理的密文
通过echo -n sjh:sjh2019. | openssl dgst -binary -sha1 | openssl base64生成密文,复制该结果
在这里插入图片描述
添加结点/node4,使用digest模式授权,因为之前添加过sjh密码sjh019.的用户,所以可以访问
在这里插入图片描述

  • 多种授权模式
    同一个结点可以使用多种授权模式,用逗号隔开就行,例
    在这里插入图片描述
  • 超级管理员
    假设超级管理员的账号是super:admin,先要为其生成密文:
    在这里插入图片描述
    得到密文xQJmxLMiHGwaqBvst5y6rkB6HQs=
    在zookeeper目录下/bin/zkServer.sh服务器脚本文件,找到这一行
    在这里插入图片描述
    "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="加入这一句
    在这里插入图片描述
    修改完按esc ,按:, 输入wq保存退出
    重启
    在这里插入图片描述
    使用客户端登陆,创建/node6,取消创建子结点权限
    在这里插入图片描述
    此时不能创建子结点
    在这里插入图片描述
    添加超级管理员用户,此时可以成功添加子结点
    在这里插入图片描述

IDEA操作zookeeper

操作流程

  • 连接到zookeeper服务器(好害怕连不上- - 。。。昨天GitHub连上了,不知道今天能不能再撞一次运气。。连不上这篇文章也就到此为止了。。。)
  • 定期向服务器发送心跳,否则会过期
  • 会话处于活动状态就可以获取/设置znode
  • 所有任务完成后,断开连接

连接到zookeeper

创建一个新的Java工程,导入一下jar包
在这里插入图片描述
妈的连了快俩小时终于搞定了!!!
在这里插入图片描述
导完jar包后还需要导入一个log4j文件,创建一个连接zookeeper的测试类

public class zookeeperTest {

    public static void main(String[] args) {
        ZooKeeper zooKeeper = null;
        try{
            //创建一个计数器对象
            CountDownLatch countDownLatch=new CountDownLatch(1);
            //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
            zooKeeper=new ZooKeeper("192.168.2.142:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("连接创建成功");
                        //通知主线程解除阻塞
                        countDownLatch.countDown();
                    }
                }
            });
            //主线程阻塞,等待连接对象的创建成功
            countDownLatch.await();
            System.out.println("会话编号"+zooKeeper.getSessionId());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(zooKeeper!=null) {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

由于连接是异步的,所以用countdownlatch确保连接成功再继续
一直失败!
最后关了防火墙。。。就成功了!!!
在这里插入图片描述
在这里插入图片描述


创建结点

在Linux客户端先创建一个/create结点
在这里插入图片描述
在IDEA创建一个子结点/node1

public class zkCreate {

    private static final String IP="192.168.2.142:2181";
    private static ZooKeeper zooKeeper;

    @Before
    public void connect() throws Exception{
        //创建一个计数器对象
        CountDownLatch countDownLatch=new CountDownLatch(1);
        //第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
        zooKeeper=new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()==Event.KeeperState.SyncConnected){
                    System.out.println("连接创建成功");
                    //通知主线程解除阻塞
                    countDownLatch.countDown();
                }
            }
        });
        //主线程阻塞,等待连接对象的创建成功
        countDownLatch.await();
    }

    @After
    public void close() throws Exception{
        zooKeeper.close();
    }

    @Test
    public void create1() throws Exception{
        //同步创建结点
        // 参数1 结点路径
        // 参数2 结点数据
        // 参数3权限列表 OPEN_ACL_UNSAFE代表world方式授权 cdrwa
        // 参数4 结点类型 persistent表示持久化结点
        zooKeeper.create("/create/node1","i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

运行后在Linux客户端查看
在这里插入图片描述
创建一个只有读权限的数据

	@Test
    public void create2() throws Exception{
        //  OPEN_ACL_UNSAFE代表world方式授权 r只能读
        zooKeeper.create("/create/node2","i want offer".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

运行后查询
在这里插入图片描述
自定义设置权限列表
使用world模式,设置有创建和删除权限

	@Test
    public void create3() throws Exception{
        //自定义方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("world","anyone");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        acls.add(new ACL(ZooDefs.Perms.DELETE,id));
        zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

ip模式授权

@Test
    public void create4() throws Exception{
        //ip方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("ip","192.168.2.142");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

auth方式授权

@Test
    public void create5() throws Exception{
        //auth方式设置权限
        zooKeeper.addAuthInfo("digest","sjh:sjh2019.".getBytes());
        zooKeeper.create("/create/node5","i want offer".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }

digest方式授权

 @Test
    public void create6() throws Exception{
        //digest方式设置权限
        List<ACL> acls=new ArrayList<>();
        Id id=new Id("digest","sjh:base64和sha1加密后的密码");
        acls.add(new ACL(ZooDefs.Perms.CREATE,id));
        zooKeeper.create("/create/node5","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
    }

创建持久化有序结点

@Test
    public void create7() throws Exception{
        //持久化有序结点
        String s = zooKeeper.create("/create/node7", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(s);
    }

运行结果:
在这里插入图片描述
创建临时结点

@Test
    public void create8() throws Exception{
        //创建临时结点
        String s = zooKeeper.create("/create/node8", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(s);
    }

创建临时有序结点

@Test
    public void create9() throws Exception{
        //创建临时结点
        String s = zooKeeper.create("/create/node9", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(s);
    }

异步创建结点

	 @Test
    public void create10() throws Exception{
        //异步创建结点
        zooKeeper.create("/create/node11", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                System.out.println("创建状态: "+rc);//0表示创建成功
                System.out.println("path: "+path);//结点路径
                System.out.println("name: "+name);//结点路径
                System.out.println("ctx: "+ctx);//上下文
            }
        },"context");
    }

运行结果,由于是异步的,可能还没有输出就结束了
在这里插入图片描述
调用sleep方法让线程休眠,等待zookeeper操作执行完毕
在这里插入图片描述
成功:
在这里插入图片描述


更新结点

先查询/create/node1结点的当前值为i wangt offer
在这里插入图片描述
同步更新结点:

	@Test
    public void set1() throws Exception{
        //同步更新结点
        //第一个参数 结点路径
        //第二个参数 要修改的值
        //第三个参数 数据版本 -1代表版本号不参与更新
        zooKeeper.setData("/create/node1","2020GetGoodOffer".getBytes(),-1);
    }

运行完成后,再次查询该结点,值已经更改为2020GetGoodOffer
在这里插入图片描述
异步修改结点

@Test
    public void set2() throws Exception{
        //异步更新结点
        //第一个参数 结点路径
        //第二个参数 要修改的值
        //第三个参数 数据版本 -1代表版本号不参与更新
        //第四个参数 匿名回调函数
        //第五个参数 上下文参数
        zooKeeper.setData("/create/node1", "2020 Get Offer!!!".getBytes(), -1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);
                System.out.println("ctx: "+ctx);
                System.out.println("stat: "+stat);
            }
        },"context");
        Thread.sleep(1000);
    }

结果:
在这里插入图片描述

删除结点

同步删除

@Test
    public void del1() throws Exception{
        //同步删除数据
        //第一个参数表示删除结点的路径
        //第二个参数表示删除结点的数据版本 -1表示删除时不考虑版本信息
        zooKeeper.delete("/create/node1",-1);
    }

此时数据已不存在
在这里插入图片描述
异步删除

 @Test
    public void del2() throws Exception{
        //异步删除数据
        zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);
                System.out.println("ctx: "+ctx);
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果
在这里插入图片描述


查看结点

同步查看结点

@Test
    public void get1() throws Exception{
        //同步读取数据
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数用于获取结点属性
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData("/create/node3", false, stat);
        System.out.println(new String(data));//数据
        System.out.println(stat);//属性
    }

结果
在这里插入图片描述
异步查看结点

@Test
    public void get2() throws Exception{
        //异步读取数据
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        Stat stat = new Stat();
        zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(new String(data));//数据
                System.out.println(stat);//属性
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果:
在这里插入图片描述


查看子结点

创建测试数据
在这里插入图片描述
同步

	@Test
    public void getChild1() throws Exception{
        //同步
        //第一个参数是父路径
        //第二个参数是watch,先填false(以后在讲)
        List<String> children = zooKeeper.getChildren("/create/father", false);
        for(String str:children)
            System.out.println(str);
    }

运行结果:
在这里插入图片描述
异步

@Test
    public void getChild2() throws Exception{
        //异步
        //第一个参数是父路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        zooKeeper.getChildren("/create/father", false, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(children);//子结点信息
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果
在这里插入图片描述


检查结点是否存在

同步

	@Test
    public void exists1() throws Exception{
        //同步判断
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        Stat exists = zooKeeper.exists("/create/null", false);
        System.out.println(exists==null?"不存在":"存在");
    }

运行结果:
在这里插入图片描述

异步

@Test
    public void exists2() throws Exception{
        //异步判断
        //第一个参数是路径
        //第二个参数是watch,先填false(以后在讲)
        //第三个参数是匿名回调函数
        //第四个参数上下文
        zooKeeper.exists("/create/null", false, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println("rc: "+rc);//0表示成功
                System.out.println("path: "+path);//结点路径
                System.out.println("ctx: "+ctx);//上下文参数
                System.out.println(stat==null?"不存在":"存在");
            }
        },"context");
        Thread.sleep(1000);
    }

运行结果:
在这里插入图片描述


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐