Zk笔记(三):Zookeeper内嵌式运行
最近想试试把zk嵌入到程序里面看是否可行,但不赞同这样,因为把zk嵌入到应用程序里面,整个应用的可用性和Zookeeper的可用性被耦合在一起,如果其中一个退出,另一个也必然会退出。Zookeeper常常被用来提供高可用服务,但对于应用中嵌入Zookeeper的方式却降低了其最强的优势。
最近想试试把zk嵌入到程序里面看是否可行,但不赞同这样,因为把zk嵌入到应用程序里面,整个应用的可用性和Zookeeper的可用性被耦合在一起,如果其中一个退出,另一个也必然会退出。Zookeeper常常被用来提供高可用服务,但对于应用中嵌入Zookeeper的方式却降低了其最强的优势。
网上资料不多,看了2篇,Zookeeper实战之嵌入式运行Zookeeper单机模式、 Zookeeper实战之嵌入式运行Zookeeper集群模式,其中单机的我看了下没问题,但集群版的我测试了下发现有问题,3个zk并没有连起来,相互独立,简单的说就是各自为主。根据apache提供的API,稍稍改了下, 集群模式测试OK。
1.单例模式
废话少说,直接贴代码
import java.io.File;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public class StandaloneZKServer {
private static final Logger logger = Logger.getLogger(StandaloneZKServer.class);
/**
* 启动单例zk server
* @param tickTime Zookeeper中最小时间单元的长度
* @param dataDir Zookeeper服务器存储快照文件的目录
* @param clientPort 当前服务器对外的服务端口
* @param initLimit Leader服务器等待Follower启动,并完成数据同步的时间
* @param syncLimit Leader服务器和Follower之间进行心跳检测的最大延时时间
*/
public static void startStandaloneServer1(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) {
Properties props = new Properties();
props.setProperty("tickTime", tickTime);
props.setProperty("dataDir", dataDir);
props.setProperty("clientPort", clientPort);
props.setProperty("initLimit", initLimit);
props.setProperty("syncLimit", syncLimit);
QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
try {
quorumConfig.parseProperties(props);
final ZooKeeperServerMain zkServer = new ZooKeeperServerMain();
final ServerConfig config = new ServerConfig();
config.readFrom(quorumConfig);
zkServer.runFromConfig(config);
} catch (Exception e) {
logger.error("Start standalone server faile", e);
}
}
/**
* 启动单例zk server
* @param tickTime Zookeeper中最小时间单元的长度
* @param dataDir Zookeeper服务器存储快照文件的目录
* @param clientPort 当前服务器对外的服务端口
* @param maxcnxn 客户端最大连接数,通过 IP 来区分不同的客户端
*/
public static void startStandaloneServer2(String tickTime, String dataDir, String clientPort, String maxcnxn) {
ZooKeeperServerMain.main(new String[]{clientPort, dataDir, tickTime, maxcnxn}); // port datadir [ticktime] [maxcnxns]"
}
public static void main(String[] args) throws Exception {
startStandaloneServer1("2000", new File(System.getProperty("java.io.tmpdir"), "zookeeper").getAbsolutePath(), "2181", "10", "5");
//startStandaloneServer2("2000", new File(System.getProperty("java.io.tmpdir"), "zookeeper").getAbsolutePath(), "2181", "44");
}
}
这里有2个启动单例zk的方法,如果看ZooKeeperServerMain源码,会发现二者其实调用的一致,只不过在处理参数上稍稍不同,ZooKeeperServerMain.main(...)更适合处理用于命令行启动的方式。
在第二种方式中,initLimit和syncLimit默认分别为10、5
测试代码:
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class StandaloneZkClient {
private static final Logger logger = Logger.getLogger(StandaloneZkClient.class);
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181";
//String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZooKeeper zk = new ZooKeeper(connectString, 10000, new Watcher() {
public void process(WatchedEvent event) {
logger.info("Zk event: [" + event.toString() + "]");
}});
System.out.println(zk.getState());
logger.info("Zk Status: " + zk.getState());
zk.create("/nodes", "节点集合".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/nodes/persistent", "持久节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/nodes/persistent_sequential1", "持久顺序节点1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
zk.create("/nodes/persistent_sequential2", "持久顺序节点2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
zk.create("/nodes/ephemeral", "临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.create("/nodes/ephemeral_sequential1", "临时顺序节点1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zk.create("/nodes/ephemeral_sequential2", "临时顺序节点2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zk.setData("/nodes/persistent", "改变持久节点".getBytes(), -1);
for (String child : zk.getChildren("/nodes", true)) {
logger.info("Zk nodes: [" + "/nodes/" + child + ": " + new String(zk.getData("/nodes/" + child, true, null)) + "]");
zk.delete("/nodes/" + child, -1);
}
zk.delete("/nodes", -1);
zk.close();
}
}
2.集群模式
先贴关键代码
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
import com.company.conf.Configuration;
import com.google.common.base.Strings;
public class Tool_Zookeeper {
private static final Logger logger = Logger.getLogger(Tool_Zookeeper.class);
private static Properties zoo_properties = new Properties();
static {
try(InputStream zooInput = Tool_Zookeeper.class.getResourceAsStream("/conf/zoo.cfg");) {
zoo_properties.load(zooInput);
} catch (IOException e) {}
logger.info("#读取zoo.cfg");
}
/**
* 启动集群机器中的zookeeper
* @param myid 指定的myid,为空时,根据IP判断myid
* @throws IOException IOException
* @throws ConfigException ConfigException
*/
public static void startClusterZookeeper(String myid) throws IOException, ConfigException {
if (!Strings.isNullOrEmpty(myid) && myid.matches("\\d{1}")) { // 使用指定的myid
FileUtils.writeStringToFile(new File(zoo_properties.get("dataDir").toString() + "/myid"), myid);
} else { // 根据ip找出myid
for (String key : zoo_properties.stringPropertyNames()) {
if (key.matches("server\\.\\d{1}")) { // server.1=127.0.0.1:2888:3888
myid = key.replace("server.", "");
if (zoo_properties.get(key).toString().split(":")[0].equals(Configuration.ip)) {
FileUtils.writeStringToFile(new File(zoo_properties.get("dataDir").toString() + "/myid"), myid);
break;
}
}
}
}
QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
quorumConfig.parseProperties(zoo_properties);
QuorumPeerMain peer = new QuorumPeerMain();
peer.runFromConfig(quorumConfig); // To start the replicated server
}
public static void main(String[] args) throws Exception {
startClusterZookeeper("1");
}
}
startClusterZookeeper方法就用于启动集群中某一台zk,大家应该知道,集群模式中需要myid文件让zk知道这是属于集群中的哪一个节点(不懂可以看Zookeeper的两种安装和配置(Windows):单机模式与集群模式),所以这里需要根据ip来判断节点所对应的myid。因为本地测试3个zk的话ip都一样,所以额外提供一个参数手动指定myid。
测试用的maven项目,其中Configuration类和Launcher类如下,Launcher类中有jetty的启动不会触发,因为startClusterZookeeper会阻塞当前线程,这里不用管jetty。
Configuration类
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import com.company.service.Tool_Zookeeper;
import com.google.common.base.Strings;
public class Configuration {
private static final Logger logger = Logger.getLogger(Configuration.class);
public static final String ip = getLocalIp();
/**
* 获取本地IP
* @return
*/
private static String getLocalIp() {
//查询当前操作系统
Properties p = System.getProperties();
String osName = p.getProperty("os.name");
try {
//如果是windows
if (osName.matches("(?i).*win.*")) {
return InetAddress.getLocalHost().getHostAddress().toString();//获得本机IP
} else {
//根据"systeminfo"命令查询开机时间
Process process = Runtime.getRuntime().exec("ifconfig");
InputStream inputStream = process.getInputStream();
String s = "";
byte[] b = new byte[1024];
while (inputStream.read(b) != -1) {
s = s + new String(b);
}
if (!Strings.isNullOrEmpty(s)) {
Matcher m = Pattern.compile("inet addr:(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})").matcher(s);
if (m.find()) {
process.destroy();
return m.group(1);
}
}
}
} catch (Exception e) {
logger.error("获取ip地址异常", e);
}
return "127.0.0.1";
}
public static void init(String[] args) {
logger.info("#本地IP:" + ip);
// 启动集群中的zk
String myid = null;
try {
for (String str : args) {
if (!Strings.isNullOrEmpty(str) && str.matches("myid=\\d{1}")) {
myid = str.replace("myid=", "");
}
}
logger.info("#启动zookeeper,myid: " + myid);
Tool_Zookeeper.startClusterZookeeper(myid);
} catch (IOException | ConfigException e) {
logger.error("#项目初始化异常,退出!", e);
System.exit(-1);
}
}
}
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.webapp.WebAppContext;
import com.company.conf.Configuration;
public class Launcher {
private static Logger logger = Logger.getLogger(Launcher.class);
private static final int PORT = 8080;
private static final String WEBAPP = "src/main/webapp";
private static final String CONTEXTPATH = "/";
private static final String DESCRIPTOR = "src/main/webapp/WEB-INF/web.xml";
/*
* 创建 Jetty Server,指定其端口、web目录、根目录、web路径
* @param port
* @param webApp
* @param contextPath
* @param descriptor
* @return Server
*/
public static Server createServer(int port, String webApp, String contextPath, String descriptor) {
Server server = new Server();
//设置在JVM退出时关闭Jetty的钩子
//这样就可以在整个功能测试时启动一次Jetty,然后让它在JVM退出时自动关闭
server.setStopAtShutdown(true);
ServerConnector connector = new ServerConnector(server);
connector.setPort(port);
//解决Windows下重复启动Jetty不报告端口冲突的问题
//在Windows下有个Windows + Sun的connector实现的问题,reuseAddress=true时重复启动同一个端口的Jetty不会报错
//所以必须设为false,代价是若上次退出不干净(比如有TIME_WAIT),会导致新的Jetty不能启动,但权衡之下还是应该设为False
connector.setReuseAddress(false);
server.setConnectors(new Connector[]{connector});
WebAppContext webContext = new WebAppContext(webApp, contextPath);
webContext.setDescriptor(descriptor);
// 设置webapp的位置
webContext.setResourceBase(webApp);
webContext.setClassLoader(Thread.currentThread().getContextClassLoader());
server.setHandler(webContext);
return server;
}
/**
* 启动jetty服务
*
*/
public void startJetty() {
final Server server = Launcher.createServer(PORT, WEBAPP, CONTEXTPATH, DESCRIPTOR);
try {
//server.stop();
server.start();
server.join();
} catch (Exception e) {
logger.warn("启动 jetty server 失败", e);
System.exit(-1);
}
}
public static void main(String[] args) {
Configuration.init(args);
new Launcher().startJetty();
// jetty 启动后的测试url
// http://localhost:8080/hello/hello
}
}
项目结构:
下面说下测试步骤
1.修改zoo.cfg
2.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk1文件夹中
3.修改zoo.cfg
4.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk2文件夹中
5.修改zoo.cfg
6.执行maven clean package,把target中的classes、lib和zk_demo.jar拷到新建的zk3文件夹中
7.分别在zk1、zk2和zk3文件夹下执行"java -jar zk_demo.jar myid=1"、"java -jar zk_demo.jar myid=2"和"java -jar zk_demo.jar myid=3",如图:
当启动zk1而zk2、zk3尚未启动时,会不断出现报错信息,这是正常的,因为集群中的其它节点还未启动,不能进行选主操作,等所有节点启动OK就没问题了。这时候也可以继续用StandaloneZkClient来测试,connectString可改为"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",它会随机连其中的一台;当然,你可以任性的用"zkCli.cmd -server ip:port"来测试,不论在哪一天上更新信息,它会自动同步到其它节点。
上面测试很麻烦,当然,如果有3台独立的机器,或者虚拟机,测试就方便多了。
项目源码:https://github.com/leonzm/zk_demo
参考:
Zookeeper实战之嵌入式运行Zookeeper单机模式
Zookeeper实战之嵌入式运行Zookeeper集群模式
Zk笔记(一):Zookeeper的两种安装和配置(Windows):单机模式与集群模式
更多推荐
所有评论(0)