HBase学习之二: hbase分页查询
在hbase中可以使用scan做一些简单的查询,但是要实现多条件复杂查询还需要借助filter(过滤器)来完成,甚至还可以自定义filter实现个性化的需求,项目中需要分页查询,记录了其中的核心代码,以便于查阅。zookeeper.properties配置文件内容:hbase_zookeeper_quorum=xxx.com,xxx.com,xxx.comzookeeper...
·
在hbase中可以使用scan做一些简单的查询,但是要实现多条件复杂查询还需要借助filter(过滤器)来完成,甚至还可以自定义filter实现个性化的需求,项目中需要分页查询,记录了其中的核心代码,以便于查阅。
zookeeper.properties配置文件内容:
hbase_zookeeper_quorum=xxx.com,xxx.com,xxx.com
zookeeper_znode_parent=/hbase
zookeeper集群配置,一般hbase会交给zookeeper管理。
ConfigUtil为读取配置文件类:
package cp.app.batch.utils;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
*
* @author author
*
*/
public class ConfigUtil {
private Properties props = new Properties();
public ConfigUtil(String file) {
String path = ConfigUtil.class.getClassLoader()
.getResource(file).getPath();
InputStream is = null;
try {
is = new FileInputStream(path);
props.load(is);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
is.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int getInt(String key) {
return Integer.parseInt(props.getProperty(key));
}
public String getString(String key) {
return props.getProperty(key);
}
}
HBase 查询工具类:
package cp.app.service.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import cp.app.batch.utils.ConfigUtil;
import cp.app.comm.CpConstants;
import cp.app.service.HBaseService;
/**
* HBase查询与插入操作工具类
*
* @author author
*
*/
//采用注入方式,HBaseService为定义的查询接口,可不需要。
@Service
public class HBaseServiceImpl implements HBaseService{
private static Logger log = Logger.getLogger(HBaseServiceImpl.class.getName());
static ConfigUtil util = new ConfigUtil("conf/zookeeper.properties");
private static final String HBASE_ZOOKEEPER_QUORUM = util
.getString("hbase_zookeeper_quorum");
private static final String ZOOKEEPER_ZNODE_PARENT = util
.getString("zookeeper_znode_parent");
private static Configuration conf = HBaseConfiguration.create();
static {
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM);
conf.set("zookeeper.znode.parent", ZOOKEEPER_ZNODE_PARENT);
}
/**
* 创建表
*
* @param tableName
* 表名
* @param columnFamily
* 列簇集合
* @return 成功-true 失败-false
*/
@SuppressWarnings("resource")
public boolean createTable(String tableName, List<String> columnFamily) {
try {
if (StringUtils.isBlank(tableName) || columnFamily == null
|| columnFamily.size() < 0) {
log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
}
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
return true;
} else {
HTableDescriptor tableDescriptor = new HTableDescriptor(
TableName.valueOf(tableName));
for (String cf : columnFamily) {
tableDescriptor.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(tableDescriptor);
log.info("===Create Table " + tableName
+ " Success!columnFamily:" + columnFamily.toString()
+ "===");
}
} catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
} catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
}
return true;
}
/**
* 查询单条记录
*
* @param tableName
* 表名
* @param rowKey
* rowKey值
* @return 返回单条记录
*/
public List<Map<String, String>> selectOneByRowKey(String tableName,
String rowKey) {
if (StringUtils.isBlank(rowKey) || StringUtils.isBlank(tableName)) {
log.error("===Parameters tableName|rowKey should not be blank,Please check!===");
return null;
}
List<Map<String, String>> rowList = new ArrayList<Map<String, String>>();
try {
Get get = new Get(Bytes.toBytes(rowKey));
HTableInterface hTable = getHTable(tableName);
if (hTable != null) {
Result result = hTable.get(get);
Map<String, String> cellMap = getRowByResult(result);
rowList.add(cellMap);
}
hTable.close();
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
}
return rowList;
}
/**
* 分页查询表数据
*
* @param tableName
* 表名
* @param ddate
* 数据日期
* @param pageSize
* 页大小
* @param lastrowKey
* 起始rowkey值
* @return 返回查询数据结果集
*/
public List<Map<String, String>> selectAllByPage(String tableName,
String ddate, int pageSize, String lastrowKey) {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
|| StringUtils.isBlank(pageSize + "")
|| StringUtils.isBlank(lastrowKey)) {
log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
return null;
}
HTable hTable = (HTable) getHTable(tableName);
Scan scan = new Scan();
FilterList filterList = new FilterList(
FilterList.Operator.MUST_PASS_ALL);
Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(ddate));
Filter pageFilter = new PageFilter(pageSize);
filterList.addFilter(rowFilter1);
filterList.addFilter(pageFilter);
if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
new BinaryComparator(Bytes.toBytes(lastrowKey)));
filterList.addFilter(rowFilter2);
}
scan.setFilter(filterList);
List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
try {
ResultScanner rs = hTable.getScanner(scan);
for (Result result : rs) {
lists.add(getRowByResult(result));
}
hTable.close();
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
}
return lists;
}
/**
* 根据状态分页查询表数据
*
* @param tableName
* 表名
* @param ddate
* 数据日期
* @param pageSize
* 页大小
* @param lastrowKey
* 起始rowkey值
* @param status
* 发送状态
* @return 返回查询数据结果集
*/
public List<Map<String, String>> selectAllByPageStatus(String tableName,
String ddate, int pageSize, String lastrowKey, String status) {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
|| StringUtils.isBlank(pageSize + "")
|| StringUtils.isBlank(lastrowKey)) {
log.error("===Parameters tableName|ddate|pageSize|rowKey should not be blank,Please check!===");
return null;
}
HTable hTable = (HTable) getHTable(tableName);
Scan scan = new Scan();
FilterList filterList = new FilterList(
FilterList.Operator.MUST_PASS_ALL);
filterList
.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),
Bytes.toBytes("status"), CompareOp.EQUAL, Bytes
.toBytes(status)));
Filter rowFilter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(ddate));
Filter pageFilter = new PageFilter(pageSize);
filterList.addFilter(rowFilter1);
filterList.addFilter(pageFilter);
if (!CpConstants.ROWKEY_FIRST.equals(lastrowKey)) {
Filter rowFilter2 = new RowFilter(CompareFilter.CompareOp.GREATER,
new BinaryComparator(Bytes.toBytes(lastrowKey)));
filterList.addFilter(rowFilter2);
}
scan.setFilter(filterList);
List<Map<String, String>> lists = new ArrayList<Map<String, String>>();
try {
ResultScanner rs = hTable.getScanner(scan);
for (Result result : rs) {
lists.add(getRowByResult(result));
}
hTable.close();
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
}
return lists;
}
/**
* 获取页数
*
* @param tableName
* 表名
* @param ddate
* 数据日期
* @param pageSize
* 分页大小
* @return 返回页数
*/
public int getPages(String tableName, String ddate, int pageSize) {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
|| StringUtils.isBlank(pageSize + "")) {
log.error("===Parameters tableName|ddate|pageSize should not be blank,Please check!===");
return 0;
}
enableAggregation(tableName);
int total = 0;
try {
HTable hTable = (HTable) getHTable(tableName);
Scan scan = new Scan();
Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(ddate));
scan.setFilter(rowFilter);
AggregationClient aggregation = new AggregationClient(conf);
Long count = aggregation.rowCount(hTable,
new LongColumnInterpreter(), scan);
total = count.intValue();
hTable.close();
} catch (Throwable e) {
// TODO Auto-generated catch block
log.error(e);
}
return (total % pageSize == 0) ? total / pageSize
: (total / pageSize) + 1;
}
/**
* 根据发送状态获取页数
*
* @param tableName
* 表名
* @param ddate
* 数据日期
* @param pageSize
* 分页大小
* @param status
* 发送状态
* @return 返回页数
*/
public int getPagesByStatus(String tableName, String ddate, int pageSize,
String status) {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(ddate)
|| StringUtils.isBlank(pageSize + "")
|| StringUtils.isBlank(status)) {
log.error("===Parameters tableName|ddate|pageSize|status should not be blank,Please check!===");
return 0;
}
enableAggregation(tableName);
int total = 0;
try {
HTable hTable = (HTable) getHTable(tableName);
Scan scan = new Scan();
FilterList filterList = new FilterList(
FilterList.Operator.MUST_PASS_ALL);
Filter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator(ddate));
filterList.addFilter(rowFilter);
filterList.addFilter(new SingleColumnValueFilter(Bytes
.toBytes("info"), Bytes.toBytes("status"), CompareOp.EQUAL,
Bytes.toBytes(status)));
scan.setFilter(filterList);
AggregationClient aggregation = new AggregationClient(conf);
Long count = aggregation.rowCount(hTable,
new LongColumnInterpreter(), scan);
total = count.intValue();
hTable.close();
} catch (Throwable e) {
// TODO Auto-generated catch block
log.error(e);
}
return (total % pageSize == 0) ? total / pageSize
: (total / pageSize) + 1;
}
/**
* 获取同一个rowkey下的记录集合
*
* @param result
* 结果集
* @return
*/
private Map<String, String> getRowByResult(Result result) {
if (result == null) {
log.error("===Parameter |result| should not be null,Please check!===");
return null;
}
Map<String, String> cellMap = new HashMap<String, String>();
for (Cell cell : result.listCells()) {
String rowkey = Bytes.toString(cell.getRowArray(),
cell.getRowOffset(), cell.getRowLength());
String cf = Bytes.toString(cell.getFamilyArray(),
cell.getFamilyOffset(), cell.getFamilyLength());
String qf = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
cellMap.put(CpConstants.HBASE_TABLE_PROP_ROWKEY, rowkey);
cellMap.put(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY, cf);
cellMap.put(qf, value);
}
return cellMap;
}
/**
* 获取HTableInterface
*
* @param tableName
* 表名
* @return 返回HTableInterface实例
*/
private HTableInterface getHTable(String tableName) {
if (StringUtils.isBlank(tableName)) {
log.error("===Parameter |tableName| should not be blank,Please check!===");
return null;
}
HTableInterface hTable = null;
try {
HConnection conn = HConnectionManager.createConnection(conf);
hTable = conn.getTable(Bytes.toBytes(tableName));
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
return null;
}
return hTable;
}
/**
* 批量插入或更新
*
* @param tableName
* 表名
* @param paraList
* 组装成json或xml后的参数
* @return 成功-true 失败-false
*/
public boolean batchPut(String tableName, List<Map<String, String>> paraList) {
try {
List<Put> puts = new ArrayList<Put>();
for (Map<String, String> map : paraList) {
Put put = getPutByMap(map);
puts.add(put);
}
HTable hTable = (HTable) getHTable(tableName);
hTable.put(puts);
hTable.close();
} catch (RetriesExhaustedWithDetailsException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
} catch (InterruptedIOException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
return false;
}
return true;
}
/**
* 根据map返回put
*
* @param paraMap
* 参数map
* @return 返回put
*/
private Put getPutByMap(Map<String, String> paraMap) {
if (paraMap == null) {
log.error("===Parameter |paraMap| should not be null,Please check!===");
return null;
}
Set<Entry<String, String>> set = paraMap.entrySet();
Iterator<Entry<String, String>> it = set.iterator();
byte[] rowkey = Bytes.toBytes(paraMap
.get(CpConstants.HBASE_TABLE_PROP_ROWKEY));
byte[] columnfamily = Bytes.toBytes(paraMap
.get(CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY));
Put put = new Put(rowkey);
while (it.hasNext()) {
Entry<String, String> entry = it.next();
String key = entry.getKey();
if (!CpConstants.HBASE_TABLE_PROP_ROWKEY.equals(key)
&& !CpConstants.HBASE_TABLE_PROP_COLUMNFAMILY.equals(key)) {
String value = entry.getValue();
put.add(columnfamily, Bytes.toBytes(key), Bytes.toBytes(value));
}
}
return put;
}
/**
* 使表具有聚合功能
*
* @param tableName
* 表名
*/
@SuppressWarnings("resource")
private void enableAggregation(String tableName) {
String coprocessorName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
try {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor htd = admin.getTableDescriptor(Bytes
.toBytes(tableName));
List<String> coprocessors = htd.getCoprocessors();
if (coprocessors != null && coprocessors.size() > 0) {
return;
} else {
admin.disableTable(tableName);
htd.addCoprocessor(coprocessorName);
admin.modifyTable(tableName, htd);
admin.enableTable(tableName);
}
} catch (TableNotFoundException e) {
// TODO Auto-generated catch block
log.error(e);
} catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
log.error(e);
} catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
log.error(e);
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e);
}
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)