zookeeper学习之一安装和简单使用
总结:该篇文章主要从Zookeeper的概念,安装启动,脚本命令和javaApi,从学到用比较详细的介绍了Zookeeper组件的使用,是一个不错的针对Zookeeper组件学习的入门参考,但是其中在操作javaApi时候我们发现操作会比较繁琐和一些场景局限性,比如不支持序列化,连接异步,无法递归创建和删除,这里推荐两个对zookeeper原生API封装不错的框架:ZkClient和Curator
目录
一、zookeeper概述
ZooKeeper 本质上是一个分布式提供目录树方式存储管理数据的文件存储系统,并可以监控和维护数据变化。
- 文件存储 (dubbo注册中心、分布式配置中心 配置信息存储)
- 多个进程访问临时资源互不影响,有序访问 (分布式协调,分布式消息队列,分布式锁)
PS: 分布式消息队列:有序访问可以基于节点建立分布式先进先出队列,从而实现分布式消息队列。
Zookeeper的特性
- 全局数据的一致:每个 server 保存一份相同的数据副本,client 无论链接到哪个 server,展示的数据都是一致的
- 可靠性:如果消息被其中一台服务器接受,那么将被所有的服务器接受
- 顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息 a 在消息 b 前发布,则在所有 server 上消息 a 在消息 b 前被发布,偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布,a 必须将排在 b 前面
- 数据更新原子性:一次数据更新要么成功,要么失败,不存在中间状态
- 实时性:ZooKeeper 保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息
二、zookeeper安装和使用
环境配置
| 环境 | 配置 |
| liunx | Centos 7.7 |
| zk | zookeeper-3.6.2 |
1、普通下载安装
Zk安装包下载
# 安装包下载 wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz # 解压文件 (进入下载文件目录) tar tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/zookeeper/zxvf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/zookeeper/ # 进入解压目录 ls
#conf目录下 复制其中的zoo_sample.cfg 为zoo.zfg # bin目录启动zk 服务端 bin/zkServer.sh start
#可连接客户端佐证。 bin/zkCli.sh |
单机zookeeper服务 安装包安装完成。
2、Docker安装
#查看一下镜像 docker search zookeeper #拉取指定版本zk镜像 docker pull zookeeper:3.6.2 #查看image ID docker images #创建docker数据存储目录 mkdir -p /tmp/docker/zookeeper/data
# docker 运行 # -d 后台运行 # -p 端口映射 主机端口:zk容器端口 # -v 设置容器对应的宿主机上的位置 主机的目录:映射到容器的目录 docker run -d -p 2182:2182 -v /tmp/docker/zookeeper/data:/tmp/docker/zookeeper/data/ --name zookeeper-docker zookeeper:3.6.2
#docker 进入容器 sudo docker exec -it #容器id# /bin/bash
#使用客户端连接zookeeper 验证连接成功 bin/zkCli.sh
|
3、zookeeper集群安装
集群安装单机安装相同,只是zoo.cfg的配置需要添加集群机器的所有ip相关信息,配置如下:
#机器集群ip 多个ip写多个 #server.${id}=${host}:${port}:${port} server.1=127.0.0.1:2888:3888 server.2=127.0.0.2:2888:3888 server.3=127.0.0.3:2888:3888 |
同时在每个zookeeper的服务对应的数据存储目录下创建myid文件 里面的内容为集群对应的server.${id}中的id
4、zookeeper的相关配置
参数 | 说明 |
clientPort | 客户端连接server的端口,即zk对外服务端口,一般设置为2181。 |
dataDir | 就是把内存中的数据存储成快照文件snapshot的目录,同时myid也存储在这个目录下(myid中的内容为本机server服务的标识)。写快照不需要单独的磁盘,而且是使用后台线程进行异步写数据到磁盘,因此不会对内存数据有影响。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能。 |
tickTime | ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session的最小超时时间是2*tickTime。默认3000毫秒。这个单元时间不能设置过大或过小,过大会加大超时时间,也就加大了集群检测session失效时间;设置过小会导致session很容易超时,并且会导致网络通讯负载较重(心跳时间缩短) |
dataLogDir | 事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能。 由于事务日志输出时是顺序且同步写到磁盘,只有从磁盘写完日志后才会触发follower和leader发回事务日志确认消息(zk事务采用两阶段提交),因此需要单独磁盘避免随机读写和磁盘缓存导致事务日志写入较慢或存储在缓存中没有写入。 |
globalOutstandingLimit | 最大请求堆积数。默认是1000。ZK运行的时候, 尽管server已经没有空闲来处理更多的客户端请求了,但是还是允许客户端将请求提交到服务器上来,以提高吞吐性能。当然,为了防止Server内存溢出,这个请求堆积数还是需要限制下的。当有非常多的客户端并且请求都比较大时,可以减少这个值,不过这种情况很少。 (Java system property:zookeeper.globalOutstandingLimit) |
preAllocSize | 预先开辟磁盘空间,用于后续写入事务日志。默认是64M,每个事务日志大小就是64M,这个默认大小是按snapCount为100000且每个事务信息为512b来计算的。如果ZK的快照频率较大的话,建议适当减小这个参数。(Java system property:zookeeper.preAllocSize)。当事务日志文件不会增长得太大的话,这个大小是可以减小的。比如1000次事务会新产生一个快照(参数为snapCount),新产生快照后会用新的事务日志文件,假设一个事务信息大小100b,那么事务日志预开辟的大小为100kb会比较好。 |
snapCount | 每进行snapCount次事务日志输出后,触发一次快照(snapshot), 此时,ZK会生成一个snapshot.*文件,同时创建一个新的事务日志文件log.*。默认是100000.(真正的代码实现中,会进行一定的随机数处理,以避免所有服务器在同一时间进行快照而影响性能)(Java system property:zookeeper.snapCount)。在通过快照和事务日志恢复数据时,使用的时间为读取快照时间和读取在这个快照之后产生的事务日志的时间,因此snapCount太大会导致读取事务日志的数量较多,snapCount较小会导致产生快照文件很多。 |
traceFile | 用于记录所有请求的log,一般调试过程中可以使用,但是生产环境不建议使用,会严重影响性能。(Java system property:requestTraceFile) |
maxClientCnxns | 单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。指定客户端IP的限制策略,这里有一个patch,可以尝试一下:http://rdc.taobao.com/team/jm/archives/1334(No Java system property) |
clientPortAddress | 对于多网卡的机器,可以为每个IP指定不同的监听端口。默认情况是所有IP都监听clientPort指定的端口。New in 3.3.0 |
minSessionTimeoutmaxSessionTimeout | Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2 * tickTime ~ 20 * tickTime这个范围 New in 3.3.0 |
fsync.warningthresholdms | 事务日志输出时,如果调用fsync方法超过指定的超时时间,那么会在日志中输出警告信息。默认是1000ms。(Java system property: fsync.warningthresholdms) New in 3.3.4 |
autopurge.purgeInterval | 在上文中已经提到,3.4.0及之后版本,ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是0,表示不开启自动清理功能,但可以运行bin/zkCleanup.sh来手动清理zk日志。(No Java system property) New in 3.4.0 |
autopurge.snapRetainCount | 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。(No Java system property) New in 3.4.0 |
electionAlg | 在之前的版本中, 这个参数配置是允许我们选择leader选举算法,但是由于在以后的版本中,只会留下一种“TCP-based version of fast leader election”算法,所以这个参数目前看来没有用了,这里也不详细展开说了。(No Java system property) |
initLimit | Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许Follower在initLimit时间内完成这个工作。通常情况下,我们不用太在意这个参数的设置。如果ZK集群的数据量确实很大了,Follower在启动的时候,从Leader上同步数据的时间也会相应变长,因此在这种情况下,有必要适当调大这个参数了。默认值为10,即10 * tickTime (No Java system property) |
syncLimit | 在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果Leader发出心跳包在syncLimit之后,还没有从Follower那里收到响应,那么就认为这个Follower已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题,设置大小依赖与网络延迟和吞吐情况。默认为5,即5 * tickTime (No Java system property) |
leaderServes | 默认情况下,Leader是会接受客户端连接,并提供正常的读写服务。但是,如果你想让Leader专注于集群中机器的协调,那么可以将这个参数设置为no,这样一来,会大大提高写操作的性能。默认为yes(Java system property: zookeeper.leaderServes)。 |
server.x=[hostname]:n:n | 这里的x是一个数字,与myid文件中的id是一致的,用来标识这个zk server,大小为1-255。右边可以配置两个端口,第一个端口用于Follower和Leader之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信。Zk启动时,会读取myid中的值,从而得到server.x的配置为本机配置,并且也可以通过这个id找到和其他zk通信的地址和端口。hostname为机器ip,第一个端口n为事务发送的通信端口,第二个n为leader选举的通信端口,默认为2888:3888。(No Java system property) |
group.x=nnnnn[:nnnnn] weight.x=nnnnn | 对机器分组和权重设置,可以 参见这里(No Java system property) |
cnxTimeout | Leader选举过程中,打开一次连接(选举的server互相通信建立连接)的超时时间,默认是5s。(Java system property: zookeeper.cnxTimeout) |
zookeeper.DigestAuthenticationProvider .superDigest | ZK权限设置相关,具体参见《使用super身份对有权限的节点进行操作》 和 《ZooKeeper权限控制》 |
skipACL | 对所有客户端请求都不作ACL检查。如果之前节点上设置有权限限制,一旦服务器上打开这个开头,那么也将失效。(Java system property:zookeeper.skipACL) |
forceSync | 这个参数确定了是否需要在事务日志提交的时候调用FileChannel.force来保证数据完全同步到磁盘。(Java system property:zookeeper.forceSync) |
jute.maxbuffer | 每个节点最大数据量,是默认是1M。这个限制必须在server和client端都进行设置才会生效。(Java system property:jute.maxbuffer) |
server.x:hostname:n:n:observer | 配置observer,表示本机是一个观察者(观察者不参与事务和选举,但会转发更新请求给leader)。比如:server.4:localhost:2181:3181:observer |
peerType=observer | 结合上面一条配置,表示这个zookeeper为观察者 |
三、zk相关命令
使用客户端 zkCli.sh /zkCli.cmd 连接zk服务 help命令可以查看其所有命令,这里仅仅罗列出常用命令,本文涉及不到的命令请自行学习使用
1、创建节点命令
# -s 创建顺序节点 # -e 创建临时节点 # -t ttl 创建一个具有生命周期(毫秒)的节点(如果该节点在指定ttl时间没有被修改 会被删除) # -c 不详 # path 节点路径 # data 节点数据 # acl 节点访问的权限控制 create [-s] [-e] [-c] [-t ttl] path [data] [acl] |
2、读取节点命令
ls命令
# -s 查看该节点的详细信息 比如创建该节点的事务id(cZxid) 等 # -w 不详 # -R 递归形式显示 ls [-s] [-w] [-R] path |
ls -s /
ls -R /
get命令
# 查看该节点的详细信息 比如创建该节点的事务id(cZxid) 等 # -w 不详 get [-s] [-w] path |
get -s / 与ls -s / 效果意向
3、修改节点命令
# -s 查看该节点的详细信息 比如创建该节点的事务id(cZxid) 等 # -v version 版本 # path 节点路径 # data 节点数据 set [-s] [-v version] path data |
4、删除节点命令
# -v version 版本 delete [-v version] path |
注意如果该节点下包含子节点 则该节点无法删除
四、javaApi的使用
前面我们使用客户端脚本命令进行了会话建立和节点的CURD相关命令,接下来我们进行使用java 相关api对zookeeper的操作。
1、创建会话(建立连接)
zookeeper 建立连接是异步的 会直接返回 可以使用CountDownLatch进行阻塞通过Watch监控对象来获取其是否建立成功,相关代码如下:
//异步回调通知 比如建立连接过程
@Override
public void process(WatchedEvent event) {
System.out.println("监听到zk事件:"+event);
log.info("监听到zk事件:{}",event);
if(event.getState() == Event.KeeperState.SyncConnected){
//连接已经建立停止阻塞
countDownLatch.countDown();
log.info("zk连接已经建立,连接状态:{}",event.getState());
}
}
//1、zookeeper 建立连接是异步的 会直接返回 可以使用CountDownLatch进行阻塞通过Watch监控对象来获取其是否建立成功
/**
* 创建zookeeper 连接api 相关参数
* connetionString: 连接ip和host组成的字符串 ip:host
* sessionTimeout: 建立连接的绘画超时时间,超过该时间需要重新建立连接
* Watcher: zk所有事件发生的回调函数接口 用于通知连接建立和节点变化
* canBeReadOnly: 对zk上节点读写权限 true 不能创建节点 只能获取节点数据
*/
//创建连接
@BeforeAll
public static void getConnection(){
try {
if(zooKeeper==null){
zooKeeper = new ZooKeeper(connetionString,sessionTimeOut,new ZkClient(),false);
log.info("zk 开始了异步连接,连接状态:{}",zooKeeper.getState());
countDownLatch.await();
//使用sessionId和sessionPasswd 复用会话
long sessionId = zooKeeper.getSessionId();
byte[] sessionPasswd = zooKeeper.getSessionPasswd();
// ZooKeeper repeatZk = new ZooKeeper(connetionString,sessionTimeOut,new ZkClient(),sessionId,sessionPasswd,false);
// log.info("复用zk:{}",repeatZk.hashCode());
// log.info("初始zk:{}",zooKeeper.hashCode());
//两者不一样
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
2、创建节点
2.1、同步创建节点
/**
* 创建同步节点节点相关参数
* path:节点对应的路径 (zk是一个基于树结构的文件目录系统 有路径)
* data: 节点对应的存储数据(byte[]形式)
* ACLs: access control List 目录节点访问需要相关的权限控制 该参数是对于节点访问的ACL权限控制集合
* CreateMode:创建节点模式 临时、持久化等
* PERSISTENT 持久化节点
* PERSISTENT_SEQUENTIAL 持久顺序节点
* EPHEMERAL 临时节点
* EPHEMERAL_SEQUENTIAL 临时自动编号节点
* PERSISTENT_WITH_TTL 持久化节点 客户端关闭不会被删除,但是在ttl时间内没有被修改过会被删除
* PERSISTENT_SEQUENTIAL_WITH_TTL 持久化顺序节点 作用同 PERSISTENT_WITH_TTL
*/
//创建同步节点
@Test
public void createSyncNode(){
try {
//创建一个顺序持久化节点 节点始终存在 节点权限对所有人开放
String persistentNodePath = zooKeeper.create(SYNC_NODE_PREFIX+"create", "persistent".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
//创建一个临时节点 连接关闭该节点会被删除
String ephemeralNodePath = zooKeeper.create(SYNC_NODE_PREFIX+"temp", "ephemeral".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//获取节点 的相关性质:路径 数据 节点属性
log.info("创建同步持久化节点成功。节点路径:{}",persistentNodePath);
log.info("创建同步临时节点成功。节点路径:{}",ephemeralNodePath);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2.2、异步创建节点
/**
* 异步常见节点不会直接返回节点信息,而是通过AsyncCallback的StringCallback和Create2Callback回调函数进行创建完成/创建异常的回调通知
* 两个回调函数区别
* StringCallback 上下文对象必须是String类型
* Create2Callback 上下文为Object对象且回调函数多了一个节点属性信息Stat对象
* 回调函数参数:
* int rc为服务器的响应码,0表示调用成功,-4表示连接已断开,-110表示指定节点已存在,-112表示会话已过期。
* String path调用create方法时传入的path。
* Object ctx调用create方法时传入的ctx。
* String name创建成功的节点名称。
*/
//创建异步节点
@Test
public void createASyncNode(){
//异步节点创建成功后的接口回调函数为StringCallback
zooKeeper.create(ASYNC_NODE_PREFIX + "create", "persistent".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("异步创建节点成功节点信息:rc:{},path:{},ctx:{},name:{}",rc,path,ctx,name);
}
},"睡觉");
//异步节点创建成功后的接口回调函数为Create2Callback 比StringCallBack多了一个节点属性对象Stat
zooKeeper.create(ASYNC_NODE_PREFIX + "create", "persistent".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
log.info("异步创建子节点成功节点信息:rc:{},path:{},ctx:{},name:{}",rc,path,ctx,name);
log.info("节点属性信息:{}",stat.toString());
}
},"睡觉");
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zookeeper创建节点有同步和异步两种,无论同步和异步节点 都无法递归的创建节点,即无法在父节点不存在的情况下创建子节点,节点存在再次创建会抛出NodeExistsException,同时zookeeper对于节点数据存储形式只只支持byte字节数组形式的存储,开发者需要自己进行序列化和反序列化。
3、读取节点
获取节点有两种方式
- getChildren(path,watch)
path:查看当前路径下的一级子节点列表(注意是节点列表而非节点数据)
Watch:可以添加Watch 监听该节点路径发生变化(节点新增子节点或者节点删除的) NodeChildrenChanged事件。(可使用boolean 注册默认的Watch)
- getData(path,watch,DataCallback,ctx):
path : 获取该节点的数据信息(同步获取和异步获取)
watch : 监听该节点的数据变化(数据改变或者数据删除)
DataCallback : 异步获取数据结果是通过该回调函数通知的
ctx:异步过程中的上下文环境对象
@Test
public void getNode(){
try {
//获取指定路径下的字节点列表 同时添加一个Watch监听指定节点路径的变化
zooKeeper.getChildren(SYNC_NODE_PREFIX+"create",new ZkClient());
zooKeeper.create(SYNC_NODE_PREFIX+"create/son2", "sonNew".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
Stat stat = new Stat();
//同步获取节点数据
byte[] data = zooKeeper.getData(SYNC_NODE_PREFIX + "create", new ZkClient(), stat);
zooKeeper.setData(SYNC_NODE_PREFIX+"create","改变节点".getBytes(),-1);
//异步获取数据
zooKeeper.getData(SYNC_NODE_PREFIX + "create", new ZkClient(), new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
log.info("获取数据响应码rc:{},path:{},ctx:{},data:{},stat:{}",rc,path,ctx,new String(data),stat);
}
},"环境数据对象");
zooKeeper.delete(SYNC_NODE_PREFIX+"create",-1);
log.info("节点数据:{},属性信息stat:{}",new String(data),stat);
Thread.sleep(1000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
4、修改节点
Zookeeper更新节点数据有版本的概念,为了解决并发修改的问题,更改数据必须带上版本,如果当前版本数据已经被修改 则该次修改不成功KeeperErrorCode = BadVersion for /xiu-sync-zk-create
/**
* 更新节点数据
* Zookeeper更新节点数据有版本的概念,为了解决并发修改的问题,更改数据必须带上版本,
* 如果当前版本数据已经被修改 则该次修改不成功。version=-1表示每次使用最新版本进行更新
*/
@Test
public void updateData(){
try {
//同步更新节点
Stat stat = zooKeeper.setData(SYNC_NODE_PREFIX + "create", "修改节点".getBytes(), -1);
zooKeeper.setData(SYNC_NODE_PREFIX + "create", "修改节点3".getBytes(), stat.getVersion()-1);
//异步更新节点 且使用旧版本更新(会出现错误)
zooKeeper.setData(SYNC_NODE_PREFIX + "create", "修改节点2".getBytes(), stat.getVersion()-1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
log.info("更新数据响应码rc:{},path:{},ctx:{},stat:{}",rc,path,ctx,stat);
}
}
, "更新数据");
Thread.sleep(1000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
5、删除节点
删除和上面的创建一样均有异步和同步,但是zookeeper不支持递归删除即在存在子节点情况下,无法直接删除该父节点
@Test
public void deleteNode(){
try {
//同步删除节点 -1删除最新版本
zooKeeper.delete(ASYNC_NODE_PREFIX+"create",-1);
//异步删除节点
zooKeeper.delete(ASYNC_NODE_PREFIX + "create", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
log.info("更新数据响应码rc:{},path:{},ctx:{}",rc,path,ctx);
}
},"上下文");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
五、zookeeper相关补充问题
1、Watch监视器?
在我们操作API的过程中发现常用的API操作都需要添加一个Watch监控器,要么使用默认的Watch,那么zookeeper为啥要使用Watch机制?这个需要从Zookeeper的解决问题来决定的,因为Zookeeper是一个分布式一致协调器,其工作原理为:多个分布式进程通过其API来操作共享的Zookeeper内存数据对象来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是”共享式内存通信“。但是他摒弃了java多线程状态变更的主动沦陷和notify通知机制,基于性能的考虑采用了类似的异步非阻塞的主动通知模式即Watch机制,使得分布式进程之间的“共享状态通信”更加实时高效。
2、AsyncCallback家族
所有异步操作过程中都需要使用其AsyncCallback中的内部类进行接口回调处理,AsyncCallback为所有异步回调处理的总类。
才疏学浅,聊以自慰,如有不当,敬请指正,手打不易,点赞评论。
更多推荐
所有评论(0)