zookeeper使用
源码:https://gitee.com/suwenguang/testzookeeper集群角色:leader 主follower 从observer 观察者 :不参与写的选举,但是提供读概念:数据模型zookeeper的数据模型和文件系统类似,每一个节点称为:znode.是zookeeper中的最小数据单元。每一个znode上都可以保存数据和挂载子...
个人博客:https://suveng.github.io/blog/
源码:https://gitee.com/suwenguang/test
zookeeper集群角色:
- leader 主
- follower 从
- observer 观察者 :不参与写的选举,但是提供读
概念:
-
数据模型
zookeeper的数据模型和文件系统类似,每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个znode上都可以
保存数据和挂载子节点。 从而构成一个层次化的属性结构
节点特性
持久化节点 : 节点创建后会一直存在zookeeper服务器上,直到主动删除
持久化有序节点 :每个节点都会为它的一级子节点维护一个顺序
临时节点 : 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理
临时有序节点 : 在临时节点上多勒一个顺序性特性
-
会话
-
watcherzookeeper提供了分布式数据发布/订阅****,zookeeper****允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候
**会触发watcher。服务端会向客户端发送一个事件通知****watcher的通知是一次性,一旦触发一次通知后,该watcher就失效
-
ACL
zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。CREATE /READ/WRITE/DELETE/ADMIN
zookeeper的命令操作
-
create [-s] [-e] path data acl
-s 表示节点是否有序
-e 表示是否为临时节点
默认情况下,是持久化节点 -
get path [watch]
获得指定 path的信息 -
set path data [version]
修改节点 path对应的data
乐观锁的概念
数据库里面有一个 version 字段去控制数据行的版本号 -
delete path [version]
删除节点
stat信息
cversion = 0 子节点的版本号
aclVersion = 0 表示acl的版本号,修改节点权限
dataVersion = 1 表示的是当前节点数据的版本号
czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID
ctime = Sat Aug 05 20:48:26 CST 2017
mtime = Sat Aug 05 20:48:50 CST 2017
java API操作zookeeper
demo实例我已经放在gitee上面。在zookeeper_demo的模块下面。
https://gitee.com/suwenguang/test
注意需要导入jar
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency>
package 操作zookeeper.javaAPI;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* author Veng Su
* email 1344114844@qq.com
* date 18-9-16 下午1:19
*/
//这里实现watcher借口是为了方便构造zookeeper对象
public class APIdemo implements Watcher {
public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";
public static CountDownLatch countDownLatch = new CountDownLatch(1);//计数器用于同步连接
public static ZooKeeper zooKeeper;//zookeeper对象
public static Stat stat = new Stat();//不给的全局,状态会话会丢失,导致执行失败
public static void main(String[] args) throws Exception {
APIdemo apIdemo = new APIdemo();
zooKeeper = new ZooKeeper(CONNECTSTRING, 500, new APIdemo());//创建zookeeper对象
countDownLatch.await();
System.out.println(zooKeeper.getState());
//注意我都是写死的节点。便于学习
// System.out.println(zooKeeper.getData("/suveng",apIdemo,stat));
// apIdemo.delete();//这里是删除节点,注意要有才能删除
// apIdemo.get(apIdemo);//这里是获取节点
// apIdemo.create();//创建节点
// apIdemo.set();//修改节点
// Thread.sleep(2000);
//
// Thread.sleep(1000);
// zooKeeper.create("/suveng/sds", "dddd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
// System.out.println(zooKeeper.getChildren("/suveng",true));
// 权限控制部分:ip,digest,world,super
// zooKeeper.addAuthInfo("digest","root:root".getBytes());
//第一种方式做权限
ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));
List<ACL> acls=new ArrayList<>();
acls.add(acl);
zooKeeper.create("/suveng/auth","2122".getBytes(),acls,CreateMode.PERSISTENT);
//第二种方式做权限
// zooKeeper.addAuthInfo("digest","root:root".getBytes());
//创建新的客户端
APIdemo apIdemo1=new APIdemo();
ZooKeeper zooKeeper1=new ZooKeeper(CONNECTSTRING,5000,apIdemo1);
zooKeeper1.getData("/suveng/auth",true,new Stat());
}
private void create() {
//create
String res = null;
try {
res = zooKeeper.create("/suveng", "hello,suwenguang".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("create " + res);
}
private String set() throws KeeperException, InterruptedException {
zooKeeper.setData("/suveng", "dddd".getBytes(), -1);
return null;
}
private void get(APIdemo apIdemo) throws KeeperException, InterruptedException {
zooKeeper.getData("/suveng", apIdemo, stat);
}
private void delete() throws KeeperException, InterruptedException {
zooKeeper.delete("/suveng",-1);
}
//watcher 的实现方法
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
if (watchedEvent.getType() == Event.EventType.None && watchedEvent.getPath() == null) {
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println("path---->" + watchedEvent.getPath() + " |data change--->" +
zooKeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeCreated) {
try {
System.out.println("path---->" + watchedEvent.getPath() + " |created--->" +
zooKeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
try {
System.out.println("path---->" + watchedEvent.getPath() + " |data deleted--->" +
zooKeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
try {
System.out.println("path---->" + watchedEvent.getPath() + " |data child changed--->" +
zooKeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
权限控制模式
schema:授权对象
ip : 192.168.1.1
Digest : username:password
world : 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyone
super :超级用户,可以对zookeeper上的数据节点进行操作
连接状态
KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。
KeeperStat.Disconnected 断开连接的状态
KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步
KeeperStat.authFailed 授权失败
事件类型
NodeCreated 当节点被创建的时候,触发
NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化
NodeDataChanged 节点数据发生变化
NodeDeleted 节点被删除
None 客户端和服务器端连接状态发生变化的时候,事件类型就是None
zkClient 操作zookeeper
注意:需要导入jar
zkClient只是简单的封装了Java的zookeeper API。但是总体比Java的要好用,支持递归创建和递归删除。订阅的时候也比较方便。
实例代码依然是放在了gitee上面。
package 操作zookeeper.zkclientdemo;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* author Veng Su
* email 1344114844@qq.com
* date 18-9-16 下午4:24
*/
public class Zkclientdemo {
public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";
public static ZkClient geetInstance() {
return new ZkClient(CONNECTSTRING, 5000);
}
@Test
public void testcreate() {
ZkClient zkClient = Zkclientdemo.geetInstance();
zkClient.createEphemeral("/zktest1");
ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));
List<ACL> acls=new ArrayList<>();
acls.add(acl);
zkClient.createEphemeral("/zktest2",acls);
zkClient.createEphemeral("/zktest3","324".getBytes(),acls);
zkClient.create("/zktest","21".getBytes(), CreateMode.PERSISTENT);
//递归创建
zkClient.createPersistent("/digui0/digui1/digui2/digui3",true);
System.out.println("success");
}
@Test
public void testdelete() {
ZkClient zkClient = Zkclientdemo.geetInstance();
//普通删除
zkClient.delete("/zktest");
zkClient.delete("/zktest1");
zkClient.delete("/zktest2");
zkClient.delete("/zktest3");
//递归删除
zkClient.deleteRecursive("/digui0");
boolean is = zkClient.exists("/suveng");
System.out.println(is);
}
@Test
public void testWatchers() throws InterruptedException {
ZkClient zkClient = Zkclientdemo.geetInstance();
zkClient.subscribeDataChanges("/suveng", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(s+"->"+o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
}
});
zkClient.writeData("/suveng","suwenguang");
Thread.sleep(1000);
}
}
curator操作zookeeper
curator是netflix公司开源的。提供了各种使用场景的封装
curator-framework 提供了fluent 风格api
curator-replice 提供实现封装
fluent风格介绍。也就是链式操作如下
package 操作zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* author Veng Su
* email 1344114844@qq.com
* date 18-9-16 下午6:32
*/
public class CuratorDemo {
public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";
public static CuratorFramework getInstance() {
return CuratorFrameworkFactory
.builder()
.connectString(CONNECTSTRING)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.build();
}
// 测试创建
@Test
public void testCreate() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/suveng/testcurator");
}
// 测试删除
@Test
public void testDelete() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
curatorFramework.delete()
.deletingChildrenIfNeeded()
// .withVersion()
.forPath("/suveng/testcurator");
}
//测试getdata
@Test
public void testGet() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
Stat stat = new Stat();
byte[] bytes = curatorFramework.getData()
.storingStatIn(stat)
.forPath("/suveng");
System.out.println(new String(bytes));
System.out.println("stat->" + stat);
}
// 测试setdata
@Test
public void testSet() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
curatorFramework.setData().forPath("/suveng", "knickknack".getBytes());
}
//测试异步
@Test
public void testasynchronous() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
ExecutorService executorService = Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch = new CountDownLatch(1);
//创建临时节点
curatorFramework.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(Thread.currentThread().getName() + "这是异步的回调函数" + " result code->" + curatorEvent.getResultCode() + "" +
"curator type" + curatorEvent.getType());
countDownLatch.countDown();
}
}, executorService).forPath("/suveng/linshi");//不给其他线程默认是当前线程。
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "main线程");
executorService.shutdown();
}
//事务
@Test
public void testTransaction() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
Collection<CuratorTransactionResult> curatorTransactions = curatorFramework.inTransaction().create().withMode(CreateMode.EPHEMERAL).forPath("/suveng/transaction")
.and()
.setData()
.forPath("/suveng", "sdfsdfsd".getBytes()).and()
.commit();
for (CuratorTransactionResult result : curatorTransactions) {
System.out.println(result.getForPath() + "->" + result.getType());
}
}
// 测试watcher
// patchcache 监视一个节点的子节点下的创建,删除,更新
// nodecache 监视一个节点的创建,删除,更新
// Treecache patchcache + nodecache 合体 监视路径下的创建,删除,更新,并且缓存路径 所有子节点的数据
@Test
public void testWatcher() throws Exception {
CuratorFramework curatorFramework = CuratorDemo.getInstance();
curatorFramework.start();
// NodeCache nodeCache=new NodeCache(curatorFramework,"/suveng",false);
// nodeCache.start();
//
// nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生改变,改变后的数据"+new String(nodeCache.getCurrentData().getData())));
// curatorFramework.setData().forPath("/suveng","sdfasfasfda".getBytes());
// Thread.sleep(2000);
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/suveng", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
switch (pathChildrenCacheEvent.getType()) {
case CONNECTION_RECONNECTED:
System.out.println("reconnetc");
break;
case CHILD_ADDED:
System.out.println("child add");
break;
case CHILD_REMOVED:
System.out.println("child remove");
break;
case CHILD_UPDATED:
System.out.println("child update");
break;
default:
break;
}
});
// curatorFramework.setData().forPath("/suveng","sdfsd".getBytes());
// curatorFramework.delete().forPath("/suveng/test");
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/suveng/test");
TimeUnit.SECONDS.sleep(1);
curatorFramework.setData().forPath("/suveng/test","sfs".getBytes());
TimeUnit.SECONDS.sleep(1);
}
@Test
public void testInit() {
// 创建会话的两种方式
// 第一种 normal
CuratorFramework curatorFramework = CuratorFrameworkFactory
.newClient(CONNECTSTRING, 5000, 5000, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
// 第二种 fluent
CuratorFramework curatorFramework2 = CuratorFrameworkFactory
.builder()
.connectString(CONNECTSTRING)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.build();
curatorFramework2.start();
System.out.println("success ");
}
}
private void makeFluent(Customer customer) {
customer.newOrder()
.with(6, "TAL")
.with(5, "HPK").skippable()
.with(3, "LGV")
.priorityRush();
}
curator的重试策略
ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间
更多推荐
所有评论(0)