最近想试试把zk嵌入到程序里面看是否可行,但不赞同这样,因为把zk嵌入到应用程序里面,整个应用的可用性和Zookeeper的可用性被耦合在一起,如果其中一个退出,另一个也必然会退出。Zookeeper常常被用来提供高可用服务,但对于应用中嵌入Zookeeper的方式却降低了其最强的优势。

       网上资料不多,看了2篇,Zookeeper实战之嵌入式运行Zookeeper单机模式 Zookeeper实战之嵌入式运行Zookeeper集群模式,其中单机的我看了下没问题,但集群版的我测试了下发现有问题,3个zk并没有连起来,相互独立,简单的说就是各自为主。根据apache提供的API,稍稍改了下, 集群模式测试OK。


1.单例模式

废话少说,直接贴代码

import java.io.File;
import java.util.Properties;

import org.apache.log4j.Logger;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

public class StandaloneZKServer {
	
	private static final Logger logger = Logger.getLogger(StandaloneZKServer.class);
	
	/**
	 * 启动单例zk server
	 * @param tickTime Zookeeper中最小时间单元的长度
	 * @param dataDir Zookeeper服务器存储快照文件的目录
	 * @param clientPort 当前服务器对外的服务端口
	 * @param initLimit Leader服务器等待Follower启动,并完成数据同步的时间
	 * @param syncLimit Leader服务器和Follower之间进行心跳检测的最大延时时间
	 */
	public static void startStandaloneServer1(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) {
		Properties props = new Properties();
		props.setProperty("tickTime", tickTime);
		props.setProperty("dataDir", dataDir);
		props.setProperty("clientPort", clientPort);
		props.setProperty("initLimit", initLimit);
		props.setProperty("syncLimit", syncLimit);

		QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
		try {
			quorumConfig.parseProperties(props);
			final ZooKeeperServerMain zkServer = new ZooKeeperServerMain();
			final ServerConfig config = new ServerConfig();
			config.readFrom(quorumConfig);
			zkServer.runFromConfig(config);
		} catch (Exception e) {
			logger.error("Start standalone server faile", e);
		}
	}
	
	/**
	 * 启动单例zk server
	 * @param tickTime Zookeeper中最小时间单元的长度
	 * @param dataDir  Zookeeper服务器存储快照文件的目录
	 * @param clientPort 当前服务器对外的服务端口
	 * @param maxcnxn 客户端最大连接数,通过 IP 来区分不同的客户端
	 */
	public static void startStandaloneServer2(String tickTime, String dataDir, String clientPort, String maxcnxn) {
		ZooKeeperServerMain.main(new String[]{clientPort, dataDir, tickTime, maxcnxn}); // port datadir [ticktime] [maxcnxns]"
	}

	public static void main(String[] args) throws Exception {  
		startStandaloneServer1("2000", new File(System.getProperty("java.io.tmpdir"), "zookeeper").getAbsolutePath(), "2181", "10", "5");
		//startStandaloneServer2("2000", new File(System.getProperty("java.io.tmpdir"), "zookeeper").getAbsolutePath(), "2181", "44");
    }
	
}

这里有2个启动单例zk的方法,如果看ZooKeeperServerMain源码,会发现二者其实调用的一致,只不过在处理参数上稍稍不同,ZooKeeperServerMain.main(...)更适合处理用于命令行启动的方式。

在第二种方式中,initLimit和syncLimit默认分别为10、5

测试代码:


这个没啥好说的,对节点的创建、更新、遍历、删除,节点类型有持久节点、持久顺序节点、临时节点、临时顺序节点。

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class StandaloneZkClient {
	
	private static final Logger logger = Logger.getLogger(StandaloneZkClient.class);

	public static void main(String[] args) throws Exception {
		String connectString = "127.0.0.1:2181";
		//String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        ZooKeeper zk = new ZooKeeper(connectString, 10000, new Watcher() {

			public void process(WatchedEvent event) {
				logger.info("Zk event: [" + event.toString() + "]");
			}});  
   
        System.out.println(zk.getState());
        logger.info("Zk Status: " + zk.getState());
        
        zk.create("/nodes", "节点集合".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.create("/nodes/persistent", "持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.create("/nodes/persistent_sequential1", "持久顺序节点1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        zk.create("/nodes/persistent_sequential2", "持久顺序节点2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        zk.create("/nodes/ephemeral", "临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zk.create("/nodes/ephemeral_sequential1", "临时顺序节点1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        zk.create("/nodes/ephemeral_sequential2", "临时顺序节点2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   
        zk.setData("/nodes/persistent", "改变持久节点".getBytes(), -1);
        
        for (String child : zk.getChildren("/nodes", true)) {
        	logger.info("Zk nodes: [" + "/nodes/" + child + ": " + new String(zk.getData("/nodes/" + child, true, null)) + "]");
        	zk.delete("/nodes/" + child, -1);
        }
        
        zk.delete("/nodes", -1);
        
        zk.close();
    }
	
}


2.集群模式

先贴关键代码

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;

import com.company.conf.Configuration;
import com.google.common.base.Strings;

public class Tool_Zookeeper {

	private static final Logger logger = Logger.getLogger(Tool_Zookeeper.class);
	
	private static Properties zoo_properties = new Properties();
	static {
		try(InputStream zooInput = Tool_Zookeeper.class.getResourceAsStream("/conf/zoo.cfg");) {
			zoo_properties.load(zooInput);
		} catch (IOException e) {}
		logger.info("#读取zoo.cfg");
	}
	
	/**
	 * 启动集群机器中的zookeeper
	 * @param myid 指定的myid,为空时,根据IP判断myid
	 * @throws IOException IOException
	 * @throws ConfigException ConfigException
	 */
	public static void startClusterZookeeper(String myid) throws IOException, ConfigException {
		if (!Strings.isNullOrEmpty(myid) && myid.matches("\\d{1}")) { // 使用指定的myid
			FileUtils.writeStringToFile(new File(zoo_properties.get("dataDir").toString() +  "/myid"), myid);
		} else { // 根据ip找出myid
			for (String key : zoo_properties.stringPropertyNames()) {
				if (key.matches("server\\.\\d{1}")) { // server.1=127.0.0.1:2888:3888
					myid = key.replace("server.", "");
					if (zoo_properties.get(key).toString().split(":")[0].equals(Configuration.ip)) {
						FileUtils.writeStringToFile(new File(zoo_properties.get("dataDir").toString() +  "/myid"), myid);
						break;
					}
				}
			}
		}
		QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
		quorumConfig.parseProperties(zoo_properties);
		QuorumPeerMain peer = new QuorumPeerMain();
		peer.runFromConfig(quorumConfig); // To start the replicated server
	}

	public static void main(String[] args) throws Exception {
		startClusterZookeeper("1");
	}
	
}
startClusterZookeeper方法就用于启动集群中某一台zk,大家应该知道,集群模式中需要myid文件让zk知道这是属于集群中的哪一个节点(不懂可以看Zookeeper的两种安装和配置(Windows):单机模式与集群模式),所以这里需要根据ip来判断节点所对应的myid。因为本地测试3个zk的话ip都一样,所以额外提供一个参数手动指定myid。

测试用的maven项目,其中Configuration类和Launcher类如下,Launcher类中有jetty的启动不会触发,因为startClusterZookeeper会阻塞当前线程,这里不用管jetty。

Configuration类

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;

import com.company.service.Tool_Zookeeper;
import com.google.common.base.Strings;

public class Configuration {
	
	private static final Logger logger = Logger.getLogger(Configuration.class);
	
	public static final String ip = getLocalIp();

	/**
	 * 获取本地IP
	 * @return
	 */
	private static String getLocalIp() {
        //查询当前操作系统
        Properties p = System.getProperties();
        String osName = p.getProperty("os.name");

        try {
            //如果是windows
            if (osName.matches("(?i).*win.*")) {
                return InetAddress.getLocalHost().getHostAddress().toString();//获得本机IP
            } else {

                //根据"systeminfo"命令查询开机时间
                Process process = Runtime.getRuntime().exec("ifconfig");
                InputStream inputStream = process.getInputStream();
                String s = "";
                byte[] b = new byte[1024];
                while (inputStream.read(b) != -1) {
                    s = s + new String(b);
                }
                if (!Strings.isNullOrEmpty(s)) {
                    Matcher m = Pattern.compile("inet addr:(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})").matcher(s);
                    if (m.find()) {
                        process.destroy();
                        return m.group(1);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("获取ip地址异常", e);
        }

        return "127.0.0.1";
    }
	
	public static void init(String[] args) {
		logger.info("#本地IP:" + ip);
		// 启动集群中的zk
		String myid = null;
		try {
			for (String str : args) {
				if (!Strings.isNullOrEmpty(str) && str.matches("myid=\\d{1}")) {
					myid = str.replace("myid=", "");
				}
			}
			logger.info("#启动zookeeper,myid: " + myid);
			Tool_Zookeeper.startClusterZookeeper(myid);
		} catch (IOException | ConfigException e) {
			logger.error("#项目初始化异常,退出!", e);
			System.exit(-1);
		}
	}
	
}


Launcher类
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.webapp.WebAppContext;

import com.company.conf.Configuration;

public class Launcher {
	
	private static Logger logger = Logger.getLogger(Launcher.class);
	
	private static final int PORT = 8080;
	private static final String WEBAPP = "src/main/webapp";
	private static final String CONTEXTPATH = "/";
	private static final String DESCRIPTOR = "src/main/webapp/WEB-INF/web.xml";

	/*
	 * 创建 Jetty Server,指定其端口、web目录、根目录、web路径
	 * @param port
	 * @param webApp
	 * @param contextPath
	 * @param descriptor
	 * @return Server
	 */
	public static Server createServer(int port, String webApp, String contextPath, String descriptor) {
		Server server = new Server();
		//设置在JVM退出时关闭Jetty的钩子
		//这样就可以在整个功能测试时启动一次Jetty,然后让它在JVM退出时自动关闭
		server.setStopAtShutdown(true);
		
		ServerConnector connector = new ServerConnector(server); 
		connector.setPort(port); 
		//解决Windows下重复启动Jetty不报告端口冲突的问题
		//在Windows下有个Windows + Sun的connector实现的问题,reuseAddress=true时重复启动同一个端口的Jetty不会报错
		//所以必须设为false,代价是若上次退出不干净(比如有TIME_WAIT),会导致新的Jetty不能启动,但权衡之下还是应该设为False
		connector.setReuseAddress(false);
		server.setConnectors(new Connector[]{connector});
		
		WebAppContext webContext = new WebAppContext(webApp, contextPath);
		webContext.setDescriptor(descriptor);
		// 设置webapp的位置
		webContext.setResourceBase(webApp);
		webContext.setClassLoader(Thread.currentThread().getContextClassLoader());
				
		server.setHandler(webContext);
		
		return server;
	}
	
	/**
	 * 启动jetty服务
	 * 
	 */
	public void startJetty() {
		final Server server = Launcher.createServer(PORT, WEBAPP, CONTEXTPATH, DESCRIPTOR);
		
		try {
			//server.stop();
			server.start();
			server.join();
			
		} catch (Exception e) {
			logger.warn("启动 jetty server 失败", e);
			System.exit(-1);
		}
	}
	
	public static void main(String[] args) {
		Configuration.init(args);
		new Launcher().startJetty();
		// jetty 启动后的测试url
		// http://localhost:8080/hello/hello
	}
	
}


项目结构:



下面说下测试步骤

1.修改zoo.cfg


2.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk1文件夹中

3.修改zoo.cfg


4.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk2文件夹中

5.修改zoo.cfg


6.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk3文件夹中

7.分别在zk1、zk2和zk3文件夹下执行"java -jar zk_demo.jar myid=1"、"java -jar zk_demo.jar myid=2"和"java -jar zk_demo.jar myid=3",如图:


当启动zk1而zk2、zk3尚未启动时,会不断出现报错信息,这是正常的,因为集群中的其它节点还未启动,不能进行选主操作,等所有节点启动OK就没问题了。这时候也可以继续用StandaloneZkClient来测试,connectString可改为"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",它会随机连其中的一台;当然,你可以任性的用"zkCli.cmd -server ip:port"来测试,不论在哪一天上更新信息,它会自动同步到其它节点。

上面测试很麻烦,当然,如果有3台独立的机器,或者虚拟机,测试就方便多了。


项目源码:https://github.com/leonzm/zk_demo

参考:

Zookeeper实战之嵌入式运行Zookeeper单机模式

Zookeeper实战之嵌入式运行Zookeeper集群模式

Apache Zookeeper API

Zk笔记(一):Zookeeper的两种安装和配置(Windows):单机模式与集群模式

ZooKeeper的配置

ZooKeeper 节点类型

Zookeeper Api(java)入门与应用(转)


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐