zookeeper实现高可用
原来的项目是main方法直接启动的jar包,但不能高可用,根据领导要求,增加高可用,在前面包一层。具体的看代码:1、导入Jar包:org.apache.curatorcurator-recipes2.7.1 2、HAMain.java package com.chinaunicom;import java.util.List;
·
原来的项目是main方法直接启动的jar包,但不能高可用,根据领导要求,增加高可用,在前面包一层。具体的看代码:
1、导入Jar包:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
2、
HAMain.java
package com.chinaunicom;
import java.util.List;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.chinaunicom.util.FileReadUtil;
public class HAMain {
private static Logger log = LoggerFactory.getLogger(HAMain.class.getName());
// 在zk上注册的机器节点列表
public static SortedSet<String> servers;
// 由zk自动递增分配的节点id
public static String myNodeID;
private ZooKeeper zk;
private final Stat stat = new Stat();
// 应用的顶层目录
private String spath = "/dtsEs";
// 分隔符
private final String delimiter = "/";
//topicName
private static String topicName="";
//属性文件
private static Properties prop;
private static boolean isUP=false;
private CountDownLatch connectedSignal = new CountDownLatch(1);
/**
* 主程序,读取zookeeper配置信息,连接zookeeper
* 监听节点变化信息。如果节点变化了,则判断当前节点是否为最小节点,如果是最小节点,则运行主程序
* @param id
* @throws Exception
*/
public HAMain(String id) throws Exception {
FileReadUtil propBean = new FileReadUtil();
//获得zookeeper配置信息
prop = propBean.getProperties("dts_es.properties");
String connectString=prop.getProperty("zookeeper.connectString");
int sessionTimeOut=Integer.parseInt(prop.getProperty("zookeeper.sessionTimeout"));
try {
// 创建一个与服务器的连接
zk = new ZooKeeper(connectString, sessionTimeOut , new Watcher() {
@Override
public void process(WatchedEvent event) {
log.info("-------node Change:" + event);
// 如果发生了spath节点下的子节点变化事件, 更新server列表, 并重新注册监听
if (event.getType() == EventType.NodeChildrenChanged
&& spath.equals(event.getPath())) {
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
createParentDirectory(id);
createAppNode(id);
updateServerList();
//此处开始阻塞
connectedSignal.await();
//如果满足条件,则开始执行任务
log.info("我开始做任务啦!~~~myNodeID:"+HAMain.myNodeID);
ClusterClientCore coreMain=new ClusterClientCore();
String param[]={topicName};
coreMain.main(param);
log.error("我挂啦!~~~myNodeID:"+HAMain.myNodeID);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建一个子目录节点
* @param id
* @throws Exception
*/
public void createAppNode(String id) throws Exception {
myNodeID = zk.create(spath + delimiter, id.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
myNodeID = myNodeID.substring(myNodeID.lastIndexOf('/') + 1);
}
/**
* 如果不存在则创建顶层目录
* @param id
* @throws Exception
*/
public void createParentDirectory(String id) throws Exception {
spath=spath+delimiter+id;
Stat stat = null;
try {
stat = zk.exists(spath, true);
} catch (Exception e) {
e.printStackTrace();
}
if (stat == null) {
zk.create(spath, id.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 更新服务器节点列表
* @throws Exception
*/
public void updateServerList() throws Exception {
SortedSet<String> set = new TreeSet<String>();
// 获取并监听spath的子节点变化
// watch参数为true, 表示监听子节点变化事件.
// 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
List<String> subList = zk.getChildren(spath, true);
for (String subNode : subList) {
// 获取每个子节点下关联的server地址
// byte[] data = zk.getData(spath + delimiter + subNode, false, stat);
// log.info(subNode + "\t" + stat);
// String sdata = new String(data, "utf-8");
set.add(subNode);
}
servers = set;
// 取消阻塞服务
cancelAwait();
}
/**
* 如果当前是最小的节点,则取消阻塞
*/
public void cancelAwait() {
String minNode = HAMain.servers.first();
log.info("当前节点:"+HAMain.myNodeID + ",最小的节点: " + minNode);
if (HAMain.myNodeID.equals(minNode)) {// 验证本机是否是最小节点
if(!isUP){//如果没有启动过
isUP=true;
connectedSignal.countDown();
}
}
}
// 关闭连接
public void close() throws InterruptedException {
zk.close();
}
/**
* chengxu rukou
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String nodeid = "autoid";
if (args.length == 1) {
nodeid = args[0];
topicName=nodeid;
}
new HAMain(nodeid);
}
}
文件工具类:主要读取配置文件
FileReadUtil:
package com.chinaunicom.util;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import com.alibaba.fastjson.JSON;
/**
* 属性文件工具类
* @author zsh
*
*/
public class FileReadUtil {
private static Logger log = LoggerFactory.getLogger(FileReadUtil.class);
public static void main(String[] args) {
FileReadUtil f = new FileReadUtil();
f.getFileList();
}
/**
* 获取某个jar包下面文件夹的文件集合
* @return
*/
public Resource[] getFileList(){
Resource[] workflowResources = null;
try {
final PathMatchingResourcePatternResolver pmrpr = new PathMatchingResourcePatternResolver(FileReadUtil.class.getClassLoader());
workflowResources = pmrpr.getResources("classpath*:tableJson/*.json");
if (workflowResources == null || workflowResources.length == 0) {
workflowResources = pmrpr.getResources("classpath*:resources/tableJson/*.json");
}
} catch (IOException e1) {
e1.printStackTrace();
}
/*
try {
String templateDir = fileName+File.separator;
log.error(templateDir);
URL templateUri = FileReadUtil.class.getClassLoader().getResource(templateDir);
if(templateUri != null) {
File fileTemplateDir = new File(templateUri.toURI());
templateFiles = fileTemplateDir.listFiles(new FilenameFilter(){
public boolean accept(File dir, String name) {
return name.endsWith(".json");
}
});
}
} catch (URISyntaxException e) {
e.printStackTrace();
}*/
return workflowResources;
}
/**
* 获取文件夹中所有文件名
* @param path
* @return
*/
public String[] getFileName(String fileName){
File file = new File(fileName);
String[] fileNames = file.list();
return fileNames;
}
/**
* 以行为单位读取文件,常用于读面向行的格式化文件
*/
public String readFileByLines(InputStream inputStream) {
// InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName);
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
try {
byte[] data = new byte[1024];
int count = -1;
while((count = inputStream.read(data,0,1024)) != -1)
outStream.write(data, 0, count);
data = null;
inputStream.close();
outStream.close();
return new String(outStream.toByteArray(),"utf-8");
} catch (IOException e) {
e.printStackTrace();
}
return "";
/*StringBuffer str = new StringBuffer();
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// 一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
// 显示行号
str.append(tempString);
}
reader.close();
log.debug("表信息读取成功!");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
return str.toString();*/
}
/**
* 读取属性文件的内容
* @return
*/
public Properties getProperties(String proName){
this.getClass().getClassLoader();
Properties props = null;
try {
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(proName);
props = new Properties();
props.load(inputStream);
log.error("加载配置文件属性实体类初始化成功!");
} catch (IOException e) {
e.printStackTrace();
} finally{
}
return props;
}
}
代码配置完毕。然后再在POM.XML中修改mian方法执行的入口。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.**.HAMain</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
至此,高可用配置完毕。把项目打完jar包,放在Linux下,执行
java -jar ***-0.0.1-SNAPSHOT.jar params> 1.log & 即可执行。
也可以在命令前面增加 nohup 在后台执行。例如:nohup java -jar dtsEs-0.0.1-SNAPSHOT.jar rdsa365i83st0q3h8yi5 > 1.log &
更多推荐
已为社区贡献1条内容
所有评论(0)