原来的项目是main方法直接启动的jar包,但不能高可用,根据领导要求,增加高可用,在前面包一层。具体的看代码:

1、导入Jar包:

		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.7.1</version>
		</dependency>


 

2、

HAMain.java

 

package com.chinaunicom;

import java.util.List;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chinaunicom.util.FileReadUtil;

public class HAMain {

	private static Logger log = LoggerFactory.getLogger(HAMain.class.getName());
	// 在zk上注册的机器节点列表
		public static SortedSet<String> servers;
		// 由zk自动递增分配的节点id
		public static String myNodeID;

		private ZooKeeper zk;
		private final Stat stat = new Stat();
		// 应用的顶层目录
		private  String spath = "/dtsEs";
		// 分隔符
		private final String delimiter = "/";
		//topicName
		private static  String topicName="";
		 //属性文件
		 private static Properties prop;
		 
		 private static boolean isUP=false;
		
		private CountDownLatch connectedSignal = new CountDownLatch(1);

		/**
		 * 主程序,读取zookeeper配置信息,连接zookeeper
		 * 监听节点变化信息。如果节点变化了,则判断当前节点是否为最小节点,如果是最小节点,则运行主程序
		 * @param id
		 * @throws Exception
		 */
		public HAMain(String id) throws Exception {
			FileReadUtil propBean = new FileReadUtil();
			//获得zookeeper配置信息
			prop = propBean.getProperties("dts_es.properties");
			String connectString=prop.getProperty("zookeeper.connectString");
			int sessionTimeOut=Integer.parseInt(prop.getProperty("zookeeper.sessionTimeout"));
			try {
				// 创建一个与服务器的连接
				zk = new ZooKeeper(connectString, sessionTimeOut , new Watcher() {
					@Override
					public void process(WatchedEvent event) {
						log.info("-------node Change:" + event);
						// 如果发生了spath节点下的子节点变化事件, 更新server列表, 并重新注册监听
						if (event.getType() == EventType.NodeChildrenChanged
								&& spath.equals(event.getPath())) {
							try {
								updateServerList();
							} catch (Exception e) {
								e.printStackTrace();
							}
						}
					}
				});
				createParentDirectory(id);
				createAppNode(id);
				updateServerList();
				//此处开始阻塞
				connectedSignal.await();
				//如果满足条件,则开始执行任务
				log.info("我开始做任务啦!~~~myNodeID:"+HAMain.myNodeID);
				ClusterClientCore coreMain=new ClusterClientCore();
				String param[]={topicName};
				coreMain.main(param);
				log.error("我挂啦!~~~myNodeID:"+HAMain.myNodeID);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		/**
		 * 创建一个子目录节点
		 * @param id
		 * @throws Exception
		 */
		public void createAppNode(String id) throws Exception {
			myNodeID = zk.create(spath + delimiter, id.getBytes("utf-8"),
					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			myNodeID = myNodeID.substring(myNodeID.lastIndexOf('/') + 1);
		}

		/**
		 * 如果不存在则创建顶层目录
		 * @param id
		 * @throws Exception
		 */
		public void createParentDirectory(String id) throws Exception {
			spath=spath+delimiter+id;
			Stat stat = null;
			try {
				stat = zk.exists(spath, true);
			} catch (Exception e) {
				e.printStackTrace();
			}
			if (stat == null) {
				zk.create(spath, id.getBytes("utf-8"),
						Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		}

		/**
		 * 更新服务器节点列表
		 * @throws Exception
		 */
		public void updateServerList() throws Exception {
			SortedSet<String> set = new TreeSet<String>();
			// 获取并监听spath的子节点变化
			// watch参数为true, 表示监听子节点变化事件.
			// 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
			List<String> subList = zk.getChildren(spath, true);
			for (String subNode : subList) {
				// 获取每个子节点下关联的server地址
//				 byte[] data = zk.getData(spath + delimiter + subNode, false, stat);
//				 log.info(subNode + "\t" + stat);
//				 String sdata = new String(data, "utf-8");
				set.add(subNode);
			}
			servers = set;
			 // 取消阻塞服务
			cancelAwait();
		}
		
		/**
		 * 如果当前是最小的节点,则取消阻塞
		 */
		public void cancelAwait() {
			String minNode = HAMain.servers.first();
			log.info("当前节点:"+HAMain.myNodeID + ",最小的节点:  " + minNode);
			if (HAMain.myNodeID.equals(minNode)) {// 验证本机是否是最小节点
				if(!isUP){//如果没有启动过
					isUP=true;
				connectedSignal.countDown();
				}
			}
		}

		// 关闭连接
		public void close() throws InterruptedException {
			zk.close();
		}
		/**
		 * chengxu rukou  
		 * @param args
		 * @throws Exception
		 */
		public static void main(String[] args) throws Exception {
			String nodeid = "autoid";
			if (args.length == 1) {
				nodeid = args[0];
				topicName=nodeid;
			}
			new HAMain(nodeid);
		}
}


文件工具类:主要读取配置文件

FileReadUtil:

package com.chinaunicom.util;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.alibaba.fastjson.JSON;


/**
 * 属性文件工具类
 * @author zsh
 *
 */
public class FileReadUtil {
 
 private static Logger log = LoggerFactory.getLogger(FileReadUtil.class);
 
 
 public static void main(String[] args) {
  FileReadUtil f = new FileReadUtil();
  f.getFileList();
 }
  

 
 
 
 /**
  * 获取某个jar包下面文件夹的文件集合
  * @return
  */
 public Resource[] getFileList(){
  Resource[] workflowResources  = null;
  try {
   final PathMatchingResourcePatternResolver pmrpr = new PathMatchingResourcePatternResolver(FileReadUtil.class.getClassLoader());
   workflowResources = pmrpr.getResources("classpath*:tableJson/*.json");
   if (workflowResources == null || workflowResources.length == 0) {
    workflowResources = pmrpr.getResources("classpath*:resources/tableJson/*.json");
   }
  } catch (IOException e1) {
   e1.printStackTrace();
  }
  /*
  try {
   String templateDir = fileName+File.separator;
   log.error(templateDir);
         URL templateUri = FileReadUtil.class.getClassLoader().getResource(templateDir);
         if(templateUri != null) {
             File fileTemplateDir = new File(templateUri.toURI());
             templateFiles = fileTemplateDir.listFiles(new FilenameFilter(){
              public boolean accept(File dir, String name) {
               return name.endsWith(".json");
              }
             });
         }
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }*/
  return workflowResources;
 }
 
 
 
 /**
  * 获取文件夹中所有文件名
  * @param path
  * @return
  */
 public String[] getFileName(String fileName){
        File file = new File(fileName);
        String[] fileNames = file.list();
        return fileNames;
    }
 
 /**
     * 以行为单位读取文件,常用于读面向行的格式化文件
     */
    public String readFileByLines(InputStream inputStream) {
//     InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName);  
     ByteArrayOutputStream outStream = new ByteArrayOutputStream();  
     try {
         byte[] data = new byte[1024];  
         int count = -1;  
         while((count = inputStream.read(data,0,1024)) != -1)  
             outStream.write(data, 0, count);  
           
         data = null;  
         inputStream.close();
         outStream.close();
         return new String(outStream.toByteArray(),"utf-8"); 
     } catch (IOException e) {
      e.printStackTrace();
     }  
     return "";
     /*StringBuffer str = new StringBuffer();
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                // 显示行号
             str.append(tempString);
            }
            reader.close();
            log.debug("表信息读取成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
        return str.toString();*/
    }
 
 /**
  * 读取属性文件的内容
  * @return
  */
 public  Properties getProperties(String proName){  
  this.getClass().getClassLoader();
  Properties props = null;
     try {
      InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(proName);  
      props = new Properties();
      props.load(inputStream);
      log.error("加载配置文件属性实体类初始化成功!");
  } catch (IOException e) {
   e.printStackTrace();
  } finally{
  } 
     return props;
 }  
}


代码配置完毕。然后再在POM.XML中修改mian方法执行的入口。

 

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<archive>
						<manifest>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
							<mainClass>com.**.HAMain</mainClass>
						</manifest>
					</archive>
				</configuration>
			</plugin>
		</plugins>

	</build>


至此,高可用配置完毕。把项目打完jar包,放在Linux下,执行 

java -jar ***-0.0.1-SNAPSHOT.jar params> 1.log &    即可执行。

也可以在命令前面增加 nohup 在后台执行。例如:nohup java -jar dtsEs-0.0.1-SNAPSHOT.jar rdsa365i83st0q3h8yi5 > 1.log &

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐