zookeeper之curator
Curator参数说明1.connectString zookeeper服务器的连接2.retryPolicy 重试策略,默认有四个ExponentialBackoffRetry、RetryNtime、Retryonetime、RetryUtilElapsed3.sessionTimeoutMs 回话超时时间,默认60 000ms4.connectionTimeoutMs 连接创建
Curator参数说明
1.connectString zookeeper服务器的连接
2.retryPolicy 重试策略,默认有四个ExponentialBackoffRetry、RetryNtime、Retryonetime、RetryUtilElapsed
3.sessionTimeoutMs 回话超时时间,默认60 000ms
4.connectionTimeoutMs 连接创建超时时间, 默认15 000ms
RetryPolicy
1.retryCount 已经重试的次数,如果是第一次,那么改值为0
2.elapsedTimeMs 从第一次尝试开始已经花费的时间
3.Sleepeer 用于sleep指定时间。不建议使用Thread.sleep()操作
ExponentialBackoffRetry
1.baseSleepTimeMs 初始sleep时间
2.maxRetries 最大重试次数
3.maxSleepMs 最大sleep时间
典型场景使用
1.事件监听 简化原生zookeeper api反复注册监听
NodeCache --监控当前节点数据变化
1.client 客户端实例
2.path 数据节点的路径
3.dataIsCompressed 是否进行数据压缩
package demo6; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import demo4.MyConnectionStateListener; public class Client { private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181"; private static CuratorFramework client = null; public static void main(String[] args) { try { client = createSimple(connectString); client.start(); MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon"); client.getConnectionStateListenable().addListener(stateListener); /** * 监控当前节点数据变化 */ final NodeCache nodeCache = new NodeCache(client, "/app2", false); nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("current data : "+nodeCache.getCurrentData().getData()); } }); } catch (Exception e) { e.printStackTrace(); } } private static CuratorFramework createSimple(String connects) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); //DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000); //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000); return CuratorFrameworkFactory.newClient(connects, retryPolicy); } public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // using the CuratorFrameworkFactory.builder() gives fine grained control // over creation options. See the CuratorFrameworkFactory.Builder javadoc details return CuratorFrameworkFactory.builder().connectString(connects) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // etc. etc. .build(); } }
PathChildrenCache --监控当前节点子节点的变化 新增 数据改变 删除
1.client 客户端实例
2.path 数据节点的路径
3.dataIsCompressed 是否进行数据压缩
4.cacheDate 是否把节点内容缓存起来,如果true,收到节点的数据内容同时也能够获取节点的数据内容
5.threadFactory and executor Service 构造单独线程池处理事件通知
package demo6; import java.util.concurrent.CountDownLatch; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.ExponentialBackoffRetry; public class Client2 { private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181"; private static CuratorFramework client = null; private static CountDownLatch cl = new CountDownLatch(1); public static void main(String[] args) { try { client = createSimple(connectString); client.start(); // MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon"); // client.getConnectionStateListenable().addListener(stateListener); /** * 监控当前节点子节点的变化 新增 数据改变 删除 */ PathChildrenCache cache = new PathChildrenCache(client, "/app2", true); cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()){ case CHILD_ADDED: System.out.println("add:"+event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("remove:"+event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("data change"); break; case CONNECTION_LOST: System.out.println("lost"); break; case CONNECTION_RECONNECTED: System.out.println("recon"); break; case CONNECTION_SUSPENDED: System.out.println("susp"); break; default: break; } } }); cl.await(); } catch (Exception e) { e.printStackTrace(); } } private static CuratorFramework createSimple(String connects) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); //DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000); //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000); return CuratorFrameworkFactory.newClient(connects, retryPolicy); } public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // using the CuratorFrameworkFactory.builder() gives fine grained control // over creation options. See the CuratorFrameworkFactory.Builder javadoc details return CuratorFrameworkFactory.builder().connectString(connects) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // etc. etc. .build(); } }
Master选举
对于复杂得得任务,仅需要集群中一台服务器进行处理,可以进行master选举package demo7; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.retry.ExponentialBackoffRetry; public class Client1 { static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181"; static String master_path = "/master"; static CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); public static void main(String[] args) throws Exception { client.start(); LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework client) throws Exception { //执行完后,后续的才开始获取锁,执行改方法 System.out.println("client 1 成为 master"); Thread.sleep(6000); System.out.println("client 1 完成操作,释放 master权利"); } }); //释放master后,重新排队 selector.autoRequeue(); selector.start(); Thread.sleep(Integer.MAX_VALUE); } }
分布式锁
package demo8; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; /** * 分布式锁 * @author ywd * */ public class Client1 { static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181"; static String path = "/lock"; static CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); public static void main(String[] args) throws Exception { client.start(); final CountDownLatch downLatch = new CountDownLatch(1); final InterProcessMutex lock = new InterProcessMutex(client, path); for (int i = 0; i < 30; i++) { new Thread(new Runnable(){ @Override public void run() { try { downLatch.await(); //获取锁 lock.acquire(); } catch (Exception e) { } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("生成订单号是:" + orderNo); try { //释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } downLatch.countDown(); } }
package demo8; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; /** * 分布式计数器 * @author ywd * */ public class Client2 { static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181"; static String path = "/lock"; static CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); public static void main(String[] args) throws Exception { client.start(); DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, path, new RetryNTimes(3, 1000)); AtomicValue<Integer> rc = atomicInteger.add(8); System.out.println("Result: " + rc.succeeded()); } }
更多推荐
所有评论(0)