Curator参数说明

1.connectString zookeeper服务器的连接

2.retryPolicy 重试策略,默认有四个ExponentialBackoffRetry、RetryNtime、Retryonetime、RetryUtilElapsed

3.sessionTimeoutMs 回话超时时间,默认60 000ms

4.connectionTimeoutMs 连接创建超时时间, 默认15 000ms

RetryPolicy

1.retryCount 已经重试的次数,如果是第一次,那么改值为0

2.elapsedTimeMs 从第一次尝试开始已经花费的时间

3.Sleepeer 用于sleep指定时间。不建议使用Thread.sleep()操作

ExponentialBackoffRetry

1.baseSleepTimeMs 初始sleep时间

2.maxRetries 最大重试次数

3.maxSleepMs 最大sleep时间

典型场景使用 

1.事件监听 简化原生zookeeper api反复注册监听

NodeCache --监控当前节点数据变化 

 1.client 客户端实例 

 2.path 数据节点的路径 

 3.dataIsCompressed 是否进行数据压缩

package demo6;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import demo4.MyConnectionStateListener;

public class Client {
	
	private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	private static CuratorFramework client = null;
	
	public static void main(String[] args) {
		try {
			client = createSimple(connectString);
			client.start();
			MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon");
			client.getConnectionStateListenable().addListener(stateListener);
			/**
			 * 监控当前节点数据变化
			*/
			final NodeCache nodeCache = new NodeCache(client, "/app2", false);
			nodeCache.start(true);
			nodeCache.getListenable().addListener(new NodeCacheListener() {
				
				@Override
				public void nodeChanged() throws Exception {
					System.out.println("current data : "+nodeCache.getCurrentData().getData());
				}
			}); 

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	private static CuratorFramework createSimple(String connects) {
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		//DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
	    //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
		return CuratorFrameworkFactory.newClient(connects, retryPolicy);
	}
	
	 public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
	        // using the CuratorFrameworkFactory.builder() gives fine grained control
	        // over creation options. See the CuratorFrameworkFactory.Builder javadoc details
	        return CuratorFrameworkFactory.builder().connectString(connects)
	                .retryPolicy(retryPolicy)
	                .connectionTimeoutMs(connectionTimeoutMs)
	                .sessionTimeoutMs(sessionTimeoutMs)
	                // etc. etc.
	                .build();
	    }

}


PathChildrenCache  --监控当前节点子节点的变化 新增 数据改变 删除

 1.client 客户端实例 

 2.path 数据节点的路径 

 3.dataIsCompressed 是否进行数据压缩 

 4.cacheDate 是否把节点内容缓存起来,如果true,收到节点的数据内容同时也能够获取节点的数据内容 

 5.threadFactory and executor Service 构造单独线程池处理事件通知

package demo6;

import java.util.concurrent.CountDownLatch;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Client2 {
	
	private static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	private static CuratorFramework client = null;
	private static CountDownLatch cl = new CountDownLatch(1);
	
	public static void main(String[] args) {
		try {
			client = createSimple(connectString);
			client.start();
//			MyConnectionStateListener stateListener = new MyConnectionStateListener("/app2", "this is recon");
//			client.getConnectionStateListenable().addListener(stateListener);
			/**
			 * 监控当前节点子节点的变化 新增 数据改变 删除
			*/
			PathChildrenCache cache = new PathChildrenCache(client, "/app2", true);
			cache.start(StartMode.POST_INITIALIZED_EVENT);
			cache.getListenable().addListener(new PathChildrenCacheListener() {
				
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
						throws Exception {
					switch (event.getType()){
					case CHILD_ADDED:
						System.out.println("add:"+event.getData().getPath());
						break;
					case CHILD_REMOVED:
						System.out.println("remove:"+event.getData().getPath());
						break;
					case CHILD_UPDATED:
						System.out.println("data change");
					    break;
					case CONNECTION_LOST:
						System.out.println("lost");
						break;
					case CONNECTION_RECONNECTED:
						System.out.println("recon");
						break;
					case CONNECTION_SUSPENDED:
						System.out.println("susp");
						break;
					default:
						break;
					}
				}
			});
			cl.await();
			

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	private static CuratorFramework createSimple(String connects) {
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		//DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
	    //DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
		return CuratorFrameworkFactory.newClient(connects, retryPolicy);
	}
	
	 public static CuratorFramework createWithOptions(String connects, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
	        // using the CuratorFrameworkFactory.builder() gives fine grained control
	        // over creation options. See the CuratorFrameworkFactory.Builder javadoc details
	        return CuratorFrameworkFactory.builder().connectString(connects)
	                .retryPolicy(retryPolicy)
	                .connectionTimeoutMs(connectionTimeoutMs)
	                .sessionTimeoutMs(sessionTimeoutMs)
	                // etc. etc.
	                .build();
	    }

}

Master选举

对于复杂得得任务,仅需要集群中一台服务器进行处理,可以进行master选举
package demo7;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Client1 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String master_path = "/master";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
			
			@Override
			public void takeLeadership(CuratorFramework client) throws Exception {
				//执行完后,后续的才开始获取锁,执行改方法
				System.out.println("client 1 成为 master");
				Thread.sleep(6000);
				System.out.println("client 1 完成操作,释放 master权利");
			}
		});
		//释放master后,重新排队
		selector.autoRequeue();
		selector.start();
		Thread.sleep(Integer.MAX_VALUE);
	}

}

分布式锁

package demo8;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 分布式锁
 * @author ywd
 *
 */
public class Client1 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String path = "/lock";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		final CountDownLatch downLatch = new CountDownLatch(1);
		final InterProcessMutex lock = new InterProcessMutex(client, path);
		for (int i = 0; i < 30; i++) {
			new Thread(new Runnable(){

				@Override
				public void run() {
					try {
						downLatch.await();
						//获取锁
						lock.acquire();
					} catch (Exception e) {
					}
					SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
					String orderNo = sdf.format(new Date());
					System.out.println("生成订单号是:" + orderNo);
					try {
						//释放锁
						lock.release();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				
			}).start();
		}
		downLatch.countDown();
	}

}
package demo8;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

/**
 * 分布式计数器
 * @author ywd
 *
 */
public class Client2 {

	static String connectString = "10.1.65.32:2181,10.1.65.31:2181,10.1.65.30:2181";
	static String path = "/lock";
	static CuratorFramework client = CuratorFrameworkFactory.builder()
										.connectString(connectString)
										.retryPolicy(new ExponentialBackoffRetry(1000, 3))
										.build();
	
	public static void main(String[] args) throws Exception {
		client.start();
		DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, path, new RetryNTimes(3, 1000));
		AtomicValue<Integer> rc = atomicInteger.add(8);
		System.out.println("Result: " + rc.succeeded());
	}

}




Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐