【菜鸟教程】Zookeeper基础入门【上】
基本概念大数据生态系统里很多组件的命名都是某种动物,例如Hadoop是????,hive是????,zookeeper就是动物园管理者,是管理大数据生态系统各组件的管理员。zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。应用场景维护配置信息Java编程经常会遇到配置项,例如数据库的user、passwo.
基本概念
大数据生态系统里很多组件的命名都是某种动物,例如Hadoop是🐘,hive是🐝,zookeeper就是动物园管理者,是管理大数据生态系统各组件的管理员。
zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。
应用场景
-
维护配置信息
Java编程经常会遇到配置项,例如数据库的user、password等,通常配置信息会放在配置文件中,再把配置文件放在服务器上。当需要修改配置信息时,要去服务器上修改对应的配置文件,但在分布式系统中很多服务器都需要使用该配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置的一致性。通常会将配置文件部署在一个集群上,但一个集群涉及的服务器数量是很庞大的,如果一台台服务器逐个修改配置文件是效率很低且危险的,因此需要一种服务可以高效快速且可靠地完成配置项的更改工作。
zookeeper就可以提供这种服务,使用Zab一致性协议保证一致性。hbase中客户端就是连接zookeeper获得必要的hbase集群的配置信息才可以进一步操作。在开源消息队列Kafka中,也使用zookeeper来维护broker的信息。在dubbo中也广泛使用zookeeper管理一些配置来实现服务治理。 -
分布式锁服务
一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,在多台服务器运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器故障,释放锁并fall over到其他机器继续执行。 -
集群管理
zookeeper会将服务器加入/移除的情况通知给集群中其他正常工作的服务器,以及即使调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复。 -
生成分布式唯一ID
在过去的单库单表系统中,通常使用数据库字段自带的auto_increment熟悉自动为每条记录生成一个唯一的id。但分库分表后就无法依靠该属性来标识一个唯一的记录。此时可以使用zookeeper在分布式环境下生成全局唯一性id。每次要生成一个新id时,创建一个持久顺序结点,创建操作返回的结点序号,即为新id,然后把比自己结点小的删除。
设计目标
- 高性能
将数据存储在内存中,直接服务于客户端的所有非事务请求,尤其适合读为主的应用场景 - 高可用
一般以集群方式对外提供服务,每台机器都会在内存中维护当前的服务状态,每台机器之间都保持通信。只要集群中超过一般机器都能正常工作,那么整个集群就能够正常对外服务。 - 严格顺序访问
对于客户端的每个更新请求,zookeeper都会生成全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。
数据结构
zookeeper的数据结点可以视为树状结构(或者目录),树中各节点成为znode,一个znode可以有多个子结点。zookeeper结点在结构上表现为树状,使用路径来定位某个znode。
znode兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。
znode大体上分为三部分(使用get命令查看)
①结点的数据
②结点的子结点
③结点的状态
结点类型(在创建时被确定且不能更改)
①临时结点:生命周期依赖于创建它的会话,会话结束结点将被自动删除。临时结点不允许拥有子结点。
②持久化结点:生命周期不依赖于会话,只有在客户端显式执行删除操作时才被删除。
安装(我恨linux)
使用centos8虚拟机,先创建一个zookeeper用户
zookeeper是基于jdk的,先安装jdk1.8:安装JDK
安装zookeeper(我是真的烦Linux 找个下载地址找一年 还慢死):
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
30kb/s 慢慢等吧
终于下载完了,tar -xzvf zookeeper-3.4.14.tar.gz
进行解压
然后进入zookeeper的conf配置目录,复制zoo_sample.cfg 命名zoo.cfg。然后回到上级目录,创建一个data目录。
然后进入conf目录,使用vi命令编辑刚才复制的zoo.cfg配置文件,打开后按a进入编辑模式。
修改datadir为刚才创建的data目录,然后按esc,再按:到文件末尾,使用wq保存退出。
卧槽…居然成功啦
先cd ..
返回zookeeper根目录,再cd bin
进入bin目录,./zkServer.sh start
启动zookeeper,等待一段时间后,./zkServer.sh status
查看zookeeper是否成功启动,Mode显示的是当前是单机状态
通过客户端连接本机zookeeper服务./zkCli.sh
,登录成功后会打印日志信息
关闭zookeeper:
基本命令
新增结点
//-s为有序结点,-e为临时结点
create [-s] [-e] path data
先启动服务器
确实已经启动
通过客户端连接服务器
-
创建持久化结点
path是/hadoop 数据是“123456” 不加选项默认是持久化结点
通过get读取数据
持久化结点与当前会话无关,quit退出会话后再次登陆,get数据还在
-
创建持久化有序结点
创建路径为/a,会自动补为/a0000000001
-
创建临时结点
临时结点在会话结束后会被删除
quit结束会话再次登陆,不能读取到数据
-
创建临时有序结点(用于分布式锁)
更新结点
更新结点的命令是set,可以直接进行修改
修改/hadoop的值从123456变为i want offer
成功
也可以基于版本号更改,类似CAS机制,当传入数据版本号和当前不一致时拒绝修改,初始版本号dataVersion为0,每次修改后会加1
当前版本为1,修改时使用1版本号,修改完变为2
版本号为2时,使用3会失败
删除结点
使用delete命令,同样可以传入版本号,如果版本号不符合也不会执行
版本号为2时,使用3删除失败,使用2删除成功
如果当前结点下有子结点,不能删除,如果要删除需要使用rmr
查看结点
使用get查看结点的属性和数据
cZxid 数据结点创建时的事务id
ctime 数据结点创建时间
mZxid 数据结点最后一次更新时的事务id
mtime 数据结点最后一次更新的时间
pZxid 子结点最后一次修改的事务id
cversion 子结点的更改次数
dataVersion 结点数据更改次数
aclVersion 结点ACL的更改次数
ephemeralOwner 如果是临时结点,表示会话的sessionID;如果是持久结点值为0
dataLength 数据内容长度
numChildren 子结点数
使用stat只返回属性 没有数据
查看结点列表
使用ls path
或ls2 path
,ls2是ls的增强,除了列出子结点还有当前结点的属性
监听器(维护配置信息)
使用get path watch
,监听器只能使用一次
左边客户端使用监听器,右边客户端更改了数据,左边监听到了数据的更改
也可以使用stat path watch
还可以使用ls/ls2 path watch
可以监听到子结点
ACL权限控制
类似linux针对文件的权限控制
acl权限控制使用scheme:id:permission标识,
①scheme :权限模式
- world 只有一个用户,代表登陆zookeeper的所有人(默认)
- ip 对客户端使用ip地址认证
- auth 使用已添加认证的用户认证
- digest 使用用户名:密码方式认证
②id :授权对象
③permission :授予的权限
- create 简写c 可以创建子结点
- delete 简写d 可以删除子结点(仅下一级)
- read 简写r 可以读取结点数据以及显示子结点
- write 简写w 可以设置结点数据
- admin 简写a 可以设置节点访问控制列表权限
例如setAcl /hadoop ip:192.168.2.142:crwda
表示ip地址为192.168.2.142的客户端对/hadoop有全部权限
getAcl path
读取权限
setAcl path acl
设置权限
addauth schema suth
添加认证用户
world授权模式
setAcl path world:anyone:<acl>
例如取消c创建权限,就不能再创建子结点
取消d权限,不能删除子结点
ip授权模式
命令setAcl path ip:<ip>:<acl>
要用两个虚拟机,这个就不演示了…
主要就是限制客户端ip地址的
auth模式
`addauth digest <user>:<password> `添加认证用户
`setAcl <path> auth:<user>:<acl> ` 授权
另一台客户端如果没有添加该用户就不能读取
digest授权模式
setAcl <path> digest:<user>:<password>:<acl>
这里的密码是经过SHA1及BASE64处理的密文
通过echo -n sjh:sjh2019. | openssl dgst -binary -sha1 | openssl base64
生成密文,复制该结果
添加结点/node4,使用digest模式授权,因为之前添加过sjh密码sjh019.的用户,所以可以访问
- 多种授权模式
同一个结点可以使用多种授权模式,用逗号隔开就行,例
- 超级管理员
假设超级管理员的账号是super:admin,先要为其生成密文:
得到密文xQJmxLMiHGwaqBvst5y6rkB6HQs=
在zookeeper目录下/bin/zkServer.sh服务器脚本文件,找到这一行
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="
加入这一句
修改完按esc ,按:, 输入wq保存退出
重启
使用客户端登陆,创建/node6,取消创建子结点权限
此时不能创建子结点
添加超级管理员用户,此时可以成功添加子结点
IDEA操作zookeeper
操作流程
- 连接到zookeeper服务器(好害怕连不上- - 。。。昨天GitHub连上了,不知道今天能不能再撞一次运气。。连不上这篇文章也就到此为止了。。。)
- 定期向服务器发送心跳,否则会过期
- 会话处于活动状态就可以获取/设置znode
- 所有任务完成后,断开连接
连接到zookeeper
创建一个新的Java工程,导入一下jar包
妈的连了快俩小时终于搞定了!!!
导完jar包后还需要导入一个log4j文件,创建一个连接zookeeper的测试类
public class zookeeperTest {
public static void main(String[] args) {
ZooKeeper zooKeeper = null;
try{
//创建一个计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
//第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
zooKeeper=new ZooKeeper("192.168.2.142:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected){
System.out.println("连接创建成功");
//通知主线程解除阻塞
countDownLatch.countDown();
}
}
});
//主线程阻塞,等待连接对象的创建成功
countDownLatch.await();
System.out.println("会话编号"+zooKeeper.getSessionId());
}catch (Exception e){
e.printStackTrace();
}finally {
if(zooKeeper!=null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
由于连接是异步的,所以用countdownlatch确保连接成功再继续
一直失败!
最后关了防火墙。。。就成功了!!!
创建结点
在Linux客户端先创建一个/create结点
在IDEA创建一个子结点/node1
public class zkCreate {
private static final String IP="192.168.2.142:2181";
private static ZooKeeper zooKeeper;
@Before
public void connect() throws Exception{
//创建一个计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
//第一个参数是服务器ip和端口号,第二个参数是客户端与服务器的会话超时时间单位ms,第三个参数是监视器对象
zooKeeper=new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected){
System.out.println("连接创建成功");
//通知主线程解除阻塞
countDownLatch.countDown();
}
}
});
//主线程阻塞,等待连接对象的创建成功
countDownLatch.await();
}
@After
public void close() throws Exception{
zooKeeper.close();
}
@Test
public void create1() throws Exception{
//同步创建结点
// 参数1 结点路径
// 参数2 结点数据
// 参数3权限列表 OPEN_ACL_UNSAFE代表world方式授权 cdrwa
// 参数4 结点类型 persistent表示持久化结点
zooKeeper.create("/create/node1","i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
运行后在Linux客户端查看
创建一个只有读权限的数据
@Test
public void create2() throws Exception{
// OPEN_ACL_UNSAFE代表world方式授权 r只能读
zooKeeper.create("/create/node2","i want offer".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}
运行后查询
自定义设置权限列表
使用world模式,设置有创建和删除权限
@Test
public void create3() throws Exception{
//自定义方式设置权限
List<ACL> acls=new ArrayList<>();
Id id=new Id("world","anyone");
acls.add(new ACL(ZooDefs.Perms.CREATE,id));
acls.add(new ACL(ZooDefs.Perms.DELETE,id));
zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
}
ip模式授权
@Test
public void create4() throws Exception{
//ip方式设置权限
List<ACL> acls=new ArrayList<>();
Id id=new Id("ip","192.168.2.142");
acls.add(new ACL(ZooDefs.Perms.CREATE,id));
zooKeeper.create("/create/node4","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
}
auth方式授权
@Test
public void create5() throws Exception{
//auth方式设置权限
zooKeeper.addAuthInfo("digest","sjh:sjh2019.".getBytes());
zooKeeper.create("/create/node5","i want offer".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
digest方式授权
@Test
public void create6() throws Exception{
//digest方式设置权限
List<ACL> acls=new ArrayList<>();
Id id=new Id("digest","sjh:base64和sha1加密后的密码");
acls.add(new ACL(ZooDefs.Perms.CREATE,id));
zooKeeper.create("/create/node5","i want offer".getBytes(), acls, CreateMode.PERSISTENT);
}
创建持久化有序结点
@Test
public void create7() throws Exception{
//持久化有序结点
String s = zooKeeper.create("/create/node7", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(s);
}
运行结果:
创建临时结点
@Test
public void create8() throws Exception{
//创建临时结点
String s = zooKeeper.create("/create/node8", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(s);
}
创建临时有序结点
@Test
public void create9() throws Exception{
//创建临时结点
String s = zooKeeper.create("/create/node9", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(s);
}
异步创建结点
@Test
public void create10() throws Exception{
//异步创建结点
zooKeeper.create("/create/node11", "i want offer".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建状态: "+rc);//0表示创建成功
System.out.println("path: "+path);//结点路径
System.out.println("name: "+name);//结点路径
System.out.println("ctx: "+ctx);//上下文
}
},"context");
}
运行结果,由于是异步的,可能还没有输出就结束了
调用sleep方法让线程休眠,等待zookeeper操作执行完毕
成功:
更新结点
先查询/create/node1结点的当前值为i wangt offer
同步更新结点:
@Test
public void set1() throws Exception{
//同步更新结点
//第一个参数 结点路径
//第二个参数 要修改的值
//第三个参数 数据版本 -1代表版本号不参与更新
zooKeeper.setData("/create/node1","2020GetGoodOffer".getBytes(),-1);
}
运行完成后,再次查询该结点,值已经更改为2020GetGoodOffer
异步修改结点
@Test
public void set2() throws Exception{
//异步更新结点
//第一个参数 结点路径
//第二个参数 要修改的值
//第三个参数 数据版本 -1代表版本号不参与更新
//第四个参数 匿名回调函数
//第五个参数 上下文参数
zooKeeper.setData("/create/node1", "2020 Get Offer!!!".getBytes(), -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: "+rc);//0表示成功
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
System.out.println("stat: "+stat);
}
},"context");
Thread.sleep(1000);
}
结果:
删除结点
同步删除
@Test
public void del1() throws Exception{
//同步删除数据
//第一个参数表示删除结点的路径
//第二个参数表示删除结点的数据版本 -1表示删除时不考虑版本信息
zooKeeper.delete("/create/node1",-1);
}
此时数据已不存在
异步删除
@Test
public void del2() throws Exception{
//异步删除数据
zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("rc: "+rc);//0表示成功
System.out.println("path: "+path);
System.out.println("ctx: "+ctx);
}
},"context");
Thread.sleep(1000);
}
运行结果
查看结点
同步查看结点
@Test
public void get1() throws Exception{
//同步读取数据
//第一个参数是路径
//第二个参数是watch,先填false(以后在讲)
//第三个参数用于获取结点属性
Stat stat = new Stat();
byte[] data = zooKeeper.getData("/create/node3", false, stat);
System.out.println(new String(data));//数据
System.out.println(stat);//属性
}
结果
异步查看结点
@Test
public void get2() throws Exception{
//异步读取数据
//第一个参数是路径
//第二个参数是watch,先填false(以后在讲)
//第三个参数是匿名回调函数
Stat stat = new Stat();
zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc: "+rc);//0表示成功
System.out.println("path: "+path);//结点路径
System.out.println("ctx: "+ctx);//上下文参数
System.out.println(new String(data));//数据
System.out.println(stat);//属性
}
},"context");
Thread.sleep(1000);
}
运行结果:
查看子结点
创建测试数据
同步
@Test
public void getChild1() throws Exception{
//同步
//第一个参数是父路径
//第二个参数是watch,先填false(以后在讲)
List<String> children = zooKeeper.getChildren("/create/father", false);
for(String str:children)
System.out.println(str);
}
运行结果:
异步
@Test
public void getChild2() throws Exception{
//异步
//第一个参数是父路径
//第二个参数是watch,先填false(以后在讲)
//第三个参数是匿名回调函数
zooKeeper.getChildren("/create/father", false, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
System.out.println("rc: "+rc);//0表示成功
System.out.println("path: "+path);//结点路径
System.out.println("ctx: "+ctx);//上下文参数
System.out.println(children);//子结点信息
}
},"context");
Thread.sleep(1000);
}
运行结果
检查结点是否存在
同步
@Test
public void exists1() throws Exception{
//同步判断
//第一个参数是路径
//第二个参数是watch,先填false(以后在讲)
Stat exists = zooKeeper.exists("/create/null", false);
System.out.println(exists==null?"不存在":"存在");
}
运行结果:
异步
@Test
public void exists2() throws Exception{
//异步判断
//第一个参数是路径
//第二个参数是watch,先填false(以后在讲)
//第三个参数是匿名回调函数
//第四个参数上下文
zooKeeper.exists("/create/null", false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: "+rc);//0表示成功
System.out.println("path: "+path);//结点路径
System.out.println("ctx: "+ctx);//上下文参数
System.out.println(stat==null?"不存在":"存在");
}
},"context");
Thread.sleep(1000);
}
运行结果:
更多推荐
所有评论(0)