hive学习之五:java通过zookeeper获取active namenode地址。
在项目开发前期,使用jdbc访问hive数据仓库,在配置文件配置active namenode的地址信息,上线的时候才发现如果namenode宕机了,程序将无法运行。后来改成通过zookeeper获取active namenode地址,这样即使namenode宕机了,也能通过zookeeper获取备机地址从而不影响程序运行。记录下代码:package cp.app.preproce...
·
在项目开发前期,使用jdbc访问hive数据仓库,在配置文件配置active namenode的地址信息,上线的时候才发现如果namenode宕机了,程序将无法运行。后来改成通过zookeeper获取active namenode地址,这样即使namenode宕机了,也能通过zookeeper获取备机地址从而不影响程序运行。记录下代码:
package cp.app.preprocess.service;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import com.google.protobuf.InvalidProtocolBufferException;
import cp.app.batch.utils.ConfigUtil;
/**
* hdfs公共服务类
* @author author
*
*/
public class HDFSService {
private static Logger log = Logger.getLogger(HDFSService.class.getName());
static ConfigUtil conf = new ConfigUtil("conf/zookeeper.properties");
private static final String ZOOKEEPER_IP = conf.getString("ZOOKEEPER_IP");
private static final int ZOOKEEPER_PORT = conf.getInt("ZOOKEEPER_PORT");
private static final int ZOOKEEPER_TIMEOUT = conf.getInt("ZOOKEEPER_TIMEOUT");
private static final String DATA_DIR = conf.getString("DATA_DIR");
/**
* 判断文件是否存在
* @param dirs hdfs路径
* @return 存在ture 否则false
*/
public static boolean isExist(List<String> dirs,String ddate) {
if(dirs==null||dirs.size()<1){
log.info("Please Check your sign directory configure is correct!");
return false;
}
log.info("Check File or Directory, uri is:"+dirs.toString());
Configuration conf = new Configuration();
List<String> flags = new ArrayList<String>();
FileStatus status=null;
String hostname = getHostname(ZOOKEEPER_IP,ZOOKEEPER_PORT,ZOOKEEPER_TIMEOUT,DATA_DIR);
log.info("According to Zookeeper get the active namenode domain:"+hostname);
try {
for(String dir:dirs){
String url = dir.replaceAll("url", hostname);
FileSystem fs = FileSystem.get(URI.create(url+"dt="+ddate), conf);
status = fs.getFileStatus(new Path(url+"dt="+ddate));
if(status==null){
flags.add("false");
}else{
flags.add("true");
}
fs.close();
}
if(flags.contains("false")){
return false;
}else{
return true;
}
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
}
}
/**
* ͨ通过zookeeper获取active namenode地址
* @param ZOOKEEPER_IP ip地址
* @param ZOOKEEPER_PORT 端口
* @param ZOOKEEPER_TIMEOUT 超时时间
* @return 地址
*/
public static String getHostname(String ZOOKEEPER_IP, int ZOOKEEPER_PORT,
int ZOOKEEPER_TIMEOUT,String DATA_DIR) {
String hostname = null;
Watcher watcher = new Watcher() {
public void process(org.apache.zookeeper.WatchedEvent event) {
log.info("event:"+event.toString());
}
};
ZooKeeper zk = null;
byte[] data1 = null;
String[] iparr = ZOOKEEPER_IP.split(";");
for (String ip : iparr) {
try {
zk = new ZooKeeper(ip + ":" + ZOOKEEPER_PORT,
ZOOKEEPER_TIMEOUT, watcher);
data1 = zk.getData(DATA_DIR,
true, new Stat());
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("This ip is not active..."+ip);
continue;
}
if (data1 != null) {
log.info("This ip is normal..."+ip);
ActiveNodeInfo activeNodeInfo=null;
try {
activeNodeInfo = HAZKInfoProtos.ActiveNodeInfo.parseFrom(data1);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
log.error(e);
}
hostname = activeNodeInfo.getHostname();
return hostname;
}
}
return hostname;
}
}
zookeeper配置文件zookeeper.properties:
ZOOKEEPER_IP=192.168.1.1;192.168.1.2;192.168.1.3
ZOOKEEPER_PORT=2181
ZOOKEEPER_TIMEOUT=30000
DATA_DIR=/hadoop-ha/hdp/ActiveStandbyElectorLock
读取配置文件工具类:
package com.huateng.spdbccc.mkt24.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 配置文件读取工具类
* @author author
*
*/
public class ConfigUtil {
private Properties props = new Properties();
public ConfigUtil(String file){
InputStream is = ConfigUtil.class.getClassLoader().getResourceAsStream(file);
try {
props.load(is);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
is.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int getInt(String key){
return Integer.parseInt(props.getProperty(key));
}
public String getString(String key){
return props.getProperty(key);
}
}
hiveserer2的配置文件:
driver=org.apache.hive.jdbc.HiveDriver
url=jdbc:hive2://192.168.1.1:10010/default
user=hdfs
password=""
更多推荐
已为社区贡献3条内容
所有评论(0)