Hbase源码解析和开发实战笔记
Hbase笔记简介1)HBase是一个分布式的、多版本的、面向列的开源数据库2)HBase利用Hadoop HDFS作为其文件存储系统,提供高可靠性、高性能、列存储、可伸缩、实时读写、适用于非结构化数据存储的数据库系统3)HBase利用Hadoop MapReduce来处理HBase中的海量数据4)HBase利用Zookeeper作为分布式协同服务特点:
·
Hbase笔记
HBase基础
1)HBase简介
2)HBase特点
3)HBase数据模型
4)HBase体系结构
5)HBase存储模型
6)HBase应用
简介
1)HBase是一个分布式的、多版本的、面向列的开源数据库
2)HBase利用Hadoop HDFS作为其文件存储系统,提供高可靠性、高性能、列存储、可伸缩、实时读写、适用于非结构化数据存储的数据库系统
3)HBase利用Hadoop MapReduce来处理HBase中的海量数据
4)HBase利用Zookeeper作为分布式协同服务
特点:
1)数据量大:一个表可以有上亿行,上百万列(列多时,插入变慢)
2)面向列:面向列(族)的存储和权限控制,列(族)独立检索
3)稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏
4)多版本:每个cell中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳
5)无类型:HBase中的数据都是字符串,没有类型
6)强一致性:同一行数据的读写只在同一台Region Server上进行
7)有限查询方式:仅支持三种查询方式(单个rowkey查询,通过rowkey的range查询,全表扫描)
8)高性能随机读写
数据模型
1)行:同一个key对应的所有数据
2)列族:相似的列数据通常被划分成一个列族,建表时确定
3)列:列名在写入时确定
4)Cell及时间戳(版本):
每个cell有任意多的版本
建表时设置每个列族可以保留多少个版本
5)三维有序
SortedMap(RowKey,List(SortedMap(Column,List(Value,TimeStamp))))
rowkey(ASC)+columnLabel(ASC)+Version(DESC)->value
|------------|--------------|--------------------|----------------------------|---------------|
|Row key |Time Stamp | Column "contents:" | Column "anchor:" | Column "mime:"|
|------------|--------------|--------------------|------------------|---------| |
|com.cnn.www |t9 | | anchor:cnnsi.com |CNN | |
|------------|--------------|--------------------|------------------|---------|---------------|
| |t8 | | achor:my.look.ca |CNN.com | |
|------------|--------------|--------------------|------------------|---------|---------------|
| |t6 | "<html>..." | | "text/html" |
|------------|--------------|--------------------|----------------------------|---------------|
| |t5 | "<html>..." | | |
|------------|--------------|--------------------|----------------------------|---------------|
| |t3 | "<html>..." | | |
|------------|--------------|--------------------|----------------------------|---------------|
--------------------------------------------------
1)面向列的存储
2)一张表可以被划分成若干个region
3)行按照rowkey进行字典序排序
4)支持随机读写
5)region是负载均衡调度的最小单位
------------------------------------------------------------------------------
体系结构
1)Client
包含访问HBase的接口并维护cache来加快对HBase的访问
2)Zookeeper
保证任何时候,集群中只有一个master
存储所有Region的寻址入口:
实时监控Region server的上线和下线信息,并实时通知给Master;
存储HBase的schema和table元数据
3)Master
为Region server分配region;
负责Region server的负载均衡
发现失效的Region server并重新分配其上的 region
管理用户对table的增删改查操作
4)Region Server
负责维护region,处理对这些region的IO请求
负责切分在运行过程中变得过大的region
5)ROOT表
记录META表中的每个region的位置,ROOT表最多只有一个region Zookeeper中记录了ROOT表的location
6)META表
记录各个表每个region所在的region server,META表可能包含多个region
------------------------------------------------------------------------------------------
HBase操作
1)flush
内存容量有限,需要定期将内存中的数据flush到磁盘
每次flush,每个region的每个column family都会产生一个HFile
读取操作,region server会把多个HFile数据归并到一起
2)compaction
flush操作产生的HFile会越来越低,需要归并来减少HFile的数量
旧数据会被清理
3)split
HFile大小增长到某个阀值就会split,同时把Region split成两个region,这两个region被分发到其他不同的region server上
4)scan
hbase原生提供的方法,顺序扫库;当然可以使用MapReduce并发扫库的方法
5)Bulk Load
快速导入大批量数据的方法
-------------------------------------------------------------------------------------------
存储模型
1)一个table中的一个region会被随机分配给一个region server
2)region是分布式存储和负载均衡的最小单位
------------------------------------------------------
最开始table只有一个region,但是随着数据的put,到达某个阀值的时候,一个大的region会split成几个小region,被分配到其他region server
------------------------------------------------
HFile结构
1)基于Block的存储结构
2)Block的索引驻留内存
---------------------------------
HBase应用
1)HBase环境搭建
配置hbase-env.sh
hbase-site.xml
regionservers
运行start-hbase.sh
启动后执行hbase shell进入交互式命令行
2)HBase API使用举例
Put/Get
Scan
.....
-----------------------------------------------------------------------------------------------------------
Hbase_test.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
public class Hbase_test {
public static void main(String[] args){
Configuration conf = new Configuration();
HBaseConfiguration hbconf = new HBaseConfiguration(conf);
/*try {
HBaseAdmin admin = new HBaseAdmin(hbconf);
HTableDescriptor tableDesc = new HTableDescriptor("test");
tableDesc.addFamily(new HColumnDescriptor("info"));
admin.createTable(tableDesc);
} catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
try {
HTable table = new HTable(hbconf,"test");
/*Put put = new Put("row1".getBytes());
put.add("info".getBytes(),"A".getBytes(),"1".getBytes());
table.put(put);
table.close();*/
Scan scan = new Scan();
ResultScanner res = table.getScanner(scan);
for(Result r:res){
for(KeyValue kv: r.raw()){
System.out.println("rowkey=> " + new String(r.getRow())
+ " family=> " + new String(kv.getFamily())
+ " qualifier=> " + new String(kv.getQualifier())
+ " timestamp=> " + kv.getTimestamp()
+ " value=> " + new String(kv.getValue()));
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------------------------------------------
HBase系统搭建与部署
1)开源社区版本下载
-http://hbase.apache.org/
2)Cloudera发行版下载
-http://archive-primary.cloudera.com/cdh5/cdh/5/
Hadoop2.0部署
1)配置hosts 确保涉及的主机名均可以解析
2)编辑hadoop-env.sh,mapred-env.sh,yarn-env.sh
3)编辑core-site.xml,hdfs-site.xml,yam-site.xml
5)编辑slaves文件
6)把Hadoop复制到其他节点
7)启动Hadoop
8)验证启动
Hadoop2.0 HA部署
采用QJM方案
1、启动JN
./sbin/hadoop-daemon.sh start jourmalnode
./sbin/hadoop-daemons.sh start journalnode
hdfs namenode -initialize SharedEdits
2、启动Active NN
hdfs namenode -format
bin/hadoop-daemon.sh start namenode
3、启动Standby NN
hdfs namenode -bootstrap Standby
./sbin/hadoop-daemon.sh start namenode
4、启动Automatic Failover
hdfs zkfc -formatZK
./sbin/hadoop-daemon.sh start zkfc
hdfs haadmin -transitionToStandby nn1
hdfs haadmin -failover nn1 nn2
hdfs haadmin -getServiceState nn1
-----------------------------------------------------------------------------------------------------------------------------
HBase Shell 操作
(1)建立一个表scores,有两个列族grad和courese
>>create 'scores','grade','course'
(2)查看Hbase中的所有表
>>list
(3)查看表结构
>>describe 'scores'
Shell操作
4、按设计的表结构插入值
put 'scores','Tom','grade:','5'
put 'scores','Tom','course:math','97'
put 'scores','Tom','course:art','87'
put 'scores','Jim','grade:','4'
put 'scores','Jim','course:','89'
put 'scores','Jim','course:','80'
put命令比较简单,只有这一种用法:
put 't1','r1','c1','value',ts1
t1指表名,r1指定键名,c1指列名,value指单元格值。ts1指时间戳,一般都省略掉了。
5、根据键值查询数据
>>get 'scores','Jim'
>>get 'scores','Jim','grade'
get 't1','r1'
get 't1','r1',{TIMERANGE=>[ts1,ts2]}
get 't1','r1',{COLUMN=>'c1'}
get 't1','r1',{COLUMN=>['c1','c2','c3']}
get 't1','r1',{COLUMN=>'c1',TIMESTAMP=>ts1}
get 't1','r1',{COLUMN=>'c1',TIMERANGE=>[ts1,ts2],VERSIONS=>4}
get 't1','r1',{COLUMN=>'c1',TIMESTAMP=>ts1,VERSION=>4}
get 't1','r1','c1'
get 't1','r1','c1','c2'
get 't1','r1',['c1','c2']
6、扫描所有数据
scan 'scores'
也可以指定一些修饰词:TIMERANGE,FILTER,LIMIT,STARTROW,STOPROW,TIMESTAMP,MAXLENGTH,or COLUMNS.没任何修饰词,就是上边例句,就会显示所有数据行
scan '.META.'
scan '.META.',{COLUMNS=>'info:regioninfo'}
scan 't1',{COLUMNS=>['c1','c2'],LIMIT=>10,STARTROW=>'xyz'}
scan 't1',{COLUMNS=>'c1',TIMERANGE=>[1303668804,1303668904]}
scan 't1',{FILTER=>"(PrefixFilter('row2')AND(QualifierFilter(>=,'binary:xyz')))AND(TimestampsFilter(123,456))"}
scan 't1',{FILTER=>org.apache,hadoop,hbase.filter,ColumnPaginationFilter.new(1,0)}
7、删除指定数据
delete 'scores','Jim','grade'
delete 'scores','JIm'
删除数据命令也没太多变化,只有一个:
delete 't1','r1','c1','ts1'
另外有一个deleteall命令,可以进行整行的范围的删除操作,慎用!
如果需要进行全表删除操作,就使用truncate命令,其实没有直接的全表删除命令,这个命令也是disable,drop,create三个命令组合出来的。
8、修改表结构
disable 'scores'
alter 'scores',NAME=>'info'
enable 'scores'
alter命令使用如下(如果无法成功的版本,需要先通过表disable):
a、改变或添加一个列族
alter 't1',NAME=>'f1',VERSIONS=>5
b、删除一个列族:
alter 't1',NAME=>'f1',METHOD=>'delete'
alter 't1','delete'=>'f1'
9、统计行数
>>count 't1'
>>count 't1',INTERVAL=>100000
>>count 't1',CACHE=>1000
>>count 't1',INTERVAL=>10,CACHE=>1000
count一般会比较耗时,使用MapReduce进行统计,统计结果会缓存,默认是10行。统计间隔默认的是1000行(INTERVAL)
10、表的删除
先禁用表 distable 'table'
删除表 drop 'table'
------------------------
客户端API使用操作
|--------------------|--------------------------------|
|Java类 | HBase数据模型 |
|--------------------|--------------------------------|
|HBaseAdmin | 数据库(DataBase) |
|HBaseConfiguration | |
|--------------------|--------------------------------|
|HTable | 表(Table) |
|--------------------|--------------------------------|
|HTableDescriptor | 列族(Column Family) |
|--------------------|--------------------------------|
|Put | |
|Get | 列修饰符(Column Qualifier)|
|Scanner | |
|--------------------|--------------------------------|
HBaseConfiguration
包名:org.apache.hadoop.hbase.HBaseConfiguration
作用: 对HBase进行配置
用法示例:
HBaseConfiguration hconfig = new HBaseConfiguration();
hconfig.set("hbase.zookeeper.property.clientPort","2181");
|--------|-----------------------------------------|------------------------------------------------------------------------|
|返回值 | 函数 | 描述 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | addResource(Path file) |通过给定的路径所指的文件来添加资源 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | deal() |清空所有设置的属性 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|string | get(String name) |获取属性名对应的值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|String | getBoolean |获取为boolean类型的属性值,如果其属性值类型部位boolean,则返回默认属性值|
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | set(String name,String value) |通过属性名来设置值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | setBoolean(String name,boolean value) |设置boolean类型的属性值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
HBaseAdmin
包名:org.apache.hadoop.hbase.client.HBaseAdmin
作用:提供了一个接口来管理HBase数据库的表信息。它提供的方法包括:创建表,删除表,列出表项,使表有效或无效,以及添加或删除表列族成员等。
用法示例:
HbaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("tablename")
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|返回值 | 函数 | 描述 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|void | addColumn(String tableName,HColumnDescriptor column) | 向一个已经存在的表添加列 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | checkHBaseAvailable(HBaseConfiguration conf) | 静态函数,查看HBase是否处于运行状态| |
| |--------------------------------------------------------|-----------------------------------------------------|
| | create Table(HTableDescriptor desc) | 创建一个表,同步操作 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | delete Table(byte[] tableName) | 删除一个已经存在的表 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | enable Table(byte[] tableName) | 是表处于有效状态 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | disable Table(byte[] tableName) | 使表处于无效状态 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|HTableDescription[] | list Tables() | 列出所有用户控件表项 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|void | modify Table(byte[] tableName,HTableDescriptor htd) | 修改表的模式,是异步的操作,可能需要花费一定的时间 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|boolean | tableExists(String tableName) | 检查表是否存在 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
HTableDescriptor
包名:org.apache.hadoop.hbase.HTableDescriptor
作用:包含了表的名字极其对应表的列族
用法示例:
HTableDescriptor htd = new HTableDescrptor(table);
htd.addFamily(new HcolumnDescriptor("family"));
|------------------|-----------------------------------|--------------|
|返回值 | 函数 | 描述 |
|------------------|-----------------------------------|--------------|
|void | addFamily(HColumnDescriptor) | 添加一个列族 |
|------------------|-----------------------------------|--------------|
|HColumnDescriptor | removeFamily(byte[] column) | 移除一个列族 |
|------------------|-----------------------------------|--------------|
|byte[] | getName() | 获取表的名字 |
|------------------|-----------------------------------|--------------|
|byte[] | getValue(byte[] key) | 获取属性的值 |
|------------------|-----------------------------------|--------------|
|void | setValue(String key,String value) | 设置属性的值 |
|------------------|-----------------------------------|--------------|
HColumnDescriptor
包名:org.apache.hadoop.hbase.HColumnDescriptor
作用:维护着关于列族的信息,例如版本号,压缩设置等。它通常在创建表或者为表添加列族的时候使用。列族被创建后不能直接修改,只能通过删除然后重新创建的方式。列族被删除的时候,列族里面的数据也会同时被删除。
用法示例:
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
|-------|-----------------------------------|------------------------|
|返回值 | 函数 | 描述 |
|-------|-----------------------------------|------------------------|
|byte[] | getName() | 获取列族的名字 |
|-------|-----------------------------------|------------------------|
|byte[] | getValue(byte[] key) | 获取对应的属性的值 |
|-------|-----------------------------------|------------------------|
|void | setValue(String key,String value)| 设置对应属性的值 |
|-------|-----------------------------------|------------------------|
HTable
包名:org.apache.hadoop.hbase.client.HTable
作用:可以用来和HBase表直接通信。此方法对于更新操作来说是非线程安全的
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
ResultScanner scanner = table.getScanner(family);
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|返回值 | 函数 | 描述 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | checkAdnPut(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put)| 自动的检查row/family/qualifier是否与给定值匹配 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | close() | 释放所有的资源或挂起内部缓冲中的更新 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|Boolean | exists(Get get) | 检查Get实例所指定的值是否存在于HTable的列中 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|Result | get(Get get) | 获取指定行的某些单元格所对应的值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|byte[][] | getEndKeys() | 获取当前一打开的表每个区域的结束键值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|ResultScanner | getScanner(byte[] family) | 获取当前给定列族的scanner实例 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|HtableDescriptor | getTableDescriptor() | 获取当前表的HTableDescriptor实例 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|byte[] | getTableName() | 获取表名 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|static boolean | isTableEnabled(HBaseConfiguration conf,String tableName) | 检查表是否有效 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | put(Put put) | 向表中添加值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
Put
包名:org.apache.hadoop.hbase.client.Put
作用:用来对单个行执行添加操作
用法示例:
Htable table = new HTable(conf.Bytes.toBytes(tablename));
Put p = new Put(brow);//为指定行创建一个Put操作
p.add(family.qualifier.value);
table.put(p);
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|返回值 | 函数 | 描述 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | checkAndPut(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put) |自动的检查row/family/qualifier是否与给定的值匹配 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | close() |释放所有的资源或挂起内部缓冲区中的更新 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|Boolean | exists(Get get) exists(Get get) |检查Get实例所指定的值是否存在于HTable的列中 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|Result | get(Get get) |获取指定行的某些单元格所对应的值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|byte[][] | getEndKeys() |获取当前一打开的表每个区域的结束键值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|ResultScanner | getScanner(byte[] family) |获取当前给定列族的scanner实例 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|HTableDescriptor | getTableDescriptor() |获取当前表的HTableDescription实例 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|byte[] | getTableName() |获取表名 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|static boolean | is TableEnabled(HBaseConfiguration conf,String tableName) |检查表是否有效 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | put(Put put) |向表中添加值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
Get
包名:org.apache.hadoop.hbase.client.Get
作用:用来获取单个行的相关信息
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
Get g = new Get(Bytes.toBytes(row));
table.get(g);
|--------|-------------------------------------------|-----------------------------------|
|返回值 |函数 | 描述 |
|--------|-------------------------------------------|-----------------------------------|
|Get |addColumn(byte[] family,byte[] qualifier) |获取指定列族和列修饰符对应的列 |
|--------|-------------------------------------------|-----------------------------------|
|Get |addFamily(byte[] family) |通过指定的列族获取对应的所有列 |
|--------|-------------------------------------------|-----------------------------------|
|Get |setTimeRange(long minStamp,long maxStamp) |获取指定取件的列的版本号 |
|--------|-------------------------------------------|-----------------------------------|
|Get |setFilter(Filter filter) |当执行Get操作时设置服务器端的过滤器|
|--------|-------------------------------------------|-----------------------------------|
Result
包名:org.apache.hadoop.hbase.client.Result
作用:存储Get或者Scan操作后获取表的单行值。使用此类提供的方法可以直接获取值或者各种Map结构(key-value对)
|----------------------------|------------------------------------------------|-----------------------------------------------|
|返回值 | 函数 | 描述 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|boolean | containsColumn(byte[] family,byte[] qualifier) | 检查指定的列是否存在 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|NavigableMap<byte[],byte[]> | getFamilyMap(byte[] family) | 获取对应列族所包含的修饰符与值的键值对 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|byte[] | getValue(byte[] family,byte[] qualifier) | 获取对应列的最新值 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
ResultScanner
包名:org.apache.hadoop.hbase.client.ResultScanner
作用:存储Get或者Scan操作后获取表的单行值。使用此类提供的方法可以直接获取值或者各种Map结构(key-value对)
|--------|---------|----------------------------------|
|返回值 |函数 | 描述 |
|--------|---------|----------------------------------|
|void |close() | 关闭scanner并释放分配给它的资源|
|--------|---------|----------------------------------|
|Result |next() | 获取下一行的值 |
|--------|---------|----------------------------------|
HTablePool
包名:org.apache.hadoop.hbase.client.HTablePool
作用:可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象
说明:
1、HTablePool可以自动创建HTable对象,而且对客户端来说使用上是完全透明的,可以避免多线程间数据并发修改问题。
2、HTablePool中的HTable对象之间是公用Configuration连接的,能够可以减少网络开销。
HTablePool的使用很简单:每次进行操作前,通过HTablePool的getTable方法HTabelPool的putTable方法将HTable对象放回到TablePool中。
------------------------------------------------------------------------------------------------------------------
create 'students_age','f1'
-------------------------创建表程序-----------------------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseConnection{
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
public HbaseConnection(String rootDir,String zkServer,String port){
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir",rootDir);
conf.set("hbase.zookeeper.quorum",zkServer);
conf.set("hbase.zookeeper.property.clientPort",port);
try{
hConn = HConnectionManager.createConnection(conf);
}catch(IOException e){
e.printStackTrace();
}
}
public void createTable(String tableName,List<String> cols){
try{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName))
throw new IOException("table exists");
else{
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(String col : cols){
HColumnDescriptor colDesc = new HColumnDescriptor(col);
colDesc.setCompressionType(Algorithm.GZ);
colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
tableDesc.addFamily(colDesc);
}
admin.createTable(tableDesc);
}
}catch(MasterNotRunningException e){
e.printStackTrace();
}catch(ZooKeeperConnectionException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
public void saveData(String tableName,List<Put> puts){
try{
HTableInterface table = hConn.getTable(tableName);
table.put(puts);
table.setAutoFlush(false);
table.flushCommits();
}catch(IOException e){
e.printStackTrace();
}
}
public Result getData(String tableName,String rowkey){
try{
HTableInterface table = hConn.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch(IOException e){
e.printStackTrace();
}
return null;
}
public void format(Result result){
String rowkey = Bytes.toString(result.getRow());
KeyValue[] kvs = result.raw();
for(KeyValue kv: kvs){
String family = Bytes.toString(kv.getFamily());
String qualifier = Bytes.toString(kv.getQualifier());
System.out.println("rowkey->" + rowkey + "family->" + family + "qualifier->" + qualifier);
}
}
public void hbaseScan(String tableName){
Scan scan = new Scan();
scan.setCaching(1000);
try{
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res)
}
}catch(IOException e){
e.printStackTrace();
}
}
public void filterTest(String tableName){
Scan scan = new Scan():
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
///new BinaryComparator(Bytes.tobytes("Jack")));
new RegexStringComparator("J\\w+")));
PageFilter filter = new PageFilter(15);//分页显示
scan.setFilter(filter);
try{
HtableIntegerface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
}
}catch(IOException e){
e.printStackTrace();
}
}
public void pageFilterTest(String tableName){
PageFilter filter = new PageFilter(4);
byte[] lastRow = null;
int pageCount = 0;
try{
HtableInterface table = hConn.getTable(tableName);
while(++pageCount > 0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
scan.setStartRow(lastRow);
}
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
lastRow = res.getRow();
if(++count > 3)
break;
format(res);
}
if(count < 3)
break;
}
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args){
String rootDir = "hdfs://hbase";
String zkServer = "192.168.1.198";
String port = "2181";
HbaseConnection conn = new HbaseConnection(rootDir,zkServer,port);
/* List<String> cols = new LinkedList<>();
cols.add("basicInfo");
cols.add("moreInfo");
conn.createTable("students",cols);*/
List<Put> puts = new LinkedList<Put>();
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("27"));
put1.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11232"));
Put put2 = new Put(Bytes.toBytes("Jim"));
put2.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("28"));
put2.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11233"));
puts.add(put1);
puts.add(put2);
conn.saveData("students",puts);
Result result = conn.getData("students","Tom");
conn.format(result);
conn.hbaseScan("studnets");
conn.filterTest("students");
}
}
-------------------------------------------------------------------------------------------------------------------------
machine
1、公网ip
内网ip
2-4 内网ip
log4j.properties
log4j.rootLogger = info,stdout,hbase
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.hbase = org.apache.log4j.DailyRollingFileAppender
log4j.appender.hbase.File = ./log.log
log4j.appender.hbase.Append = true
log4j.appender.hbase.layout = org.apache.log4j.PatternLayout
log4j.appender.hbase.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%t:%r] - [%p] %m%n
---------------------------------------------------------------------------------------------------------
HBase 扫描器
Scanner扫描器
Hbase在扫描数据的时候,使用scanner表扫描器。
HTable通过一个Scan实例,调用getScanner(scan)来获取扫描器。可以配置扫描起止位,以及其他的过滤条件。
通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂。但是这里有一点可能被忽略的地方,就是返回的scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegionServer。这在网络不是很好的情况下,对性能的影响是很大的。
建议配置扫描器缓存。
扫描器使用
扫描器缓存
hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的而时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:
1)在HBase的conf配置文件中进行配置
2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置
3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。
Hbase过滤器
过滤器简介
1、使用过滤器可以提高操作表的效率,HBase中两种数据读取函数get()和scan()都支持过滤器,支持直接访问和通过指定起止行键来访问,但是缺少细粒度的筛选功能,如基于正则表达式对键或值进行筛选的功能。
2、可以使用预定义好的过滤器或者是实现自定义过滤器
3、过滤器在客户端创建,通过RPC传送到服务器端,在服务器端执行过滤操作,把数据返回给客户端
scan 'studnets',{STARTROW=>'J',ENDROW=>'K'}
过滤器:
1、Comparsion Filter(比较过滤器)
RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter
2、Dedicated Filter(专用过滤器)
SingleColumnValueFilter
SingleColumnValueExcluderFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
TimestampsFilter
RandomRowFilter
3、Decorating Filter(附加过滤器)
SkipFilter
WhileMatchFilters
HBase协处理器
HBase作为列数据库最经常被人的特性包括:
无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。
比如,在旧版本的(<0.92)HBase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效用于数据表的分布式计算。
然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。于是,HBase在0.92以后引入了处理器(coprocessors),
实现了一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器以及访问控制等。
HBase协处理器的灵感来自于JEFFDean 09年的演讲。它根据该演讲实现了类似于bigtable的协处理器,包括以下特性:
1)每个表服务器的任意子表都可以运行代码
2)客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用)
3)提供一个非常灵活的、可用于建立分布式服务的数据模型
4)能够自动化扩展、负载均衡、应用请求路由
HBase的协处理器灵感来自bigtable,但是实现细节不尽相同。HBase建立一个框架,它为用户提供类库和运行时环境,使得他们的代码能够在HBase region server和master上处理。
协处理器分两种类型,系统协处理器可以全局导入region servre上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。
协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。
Observer
观察者的设计意图是允许用户通过代码来重载处理器框架的upcall方法,而具体的事件触发的callback方法有HBase的核心代码来执行。处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。
以Hbase0.92版本为例,它提供了三种观察者接口:
RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
WALObserver:提供WAL相关操作钩子。
MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。
这些接口可以同时使用在同一个地方,按照不同优先级顺序执行,用户可以任意基于协调处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所改动,不同版本的接口改动比较大。
-----------------------------------------------------------------
EndPoint
终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。
用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。终端的使用,如下面流程所示:
定义一个新的protocol接口,必须继承CoprocessorProtocol。
实现终端接口,该实现会被导入region环境执行。
继承抽象类BaseEndpointCoprocessor。
在客户端,终端可以被两个新的Hbase Client API调用。单个region:
HTableInterface.coprocessorProxy(Class<T> protocol,byte[] row).regions区域:HTableInterface.coprocessorExec(Class<T> protocol,byte[] startkey,byte[] endKey,Batch.Call<T,R> callable)
有三个方法对Endpoint进行设置:
A.启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site。xml这个文件来实现,只需要添加如下代码:
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.RowCountEndpoint</value>
</property>
B.启动表aggregation,只对特定的表生效。通过HBase Shell来实现。
(1)disable指定表。
hbase>> disable 'students'
(2)添加aggregation hbase>> alter ‘students’,METHOD =>'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.RowCountEndpoint||'
alter ‘students’,'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||'
协处理器命令:
增加协处理器方法一:alter 'students','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||'
-----------------------------------------------------------------------------------------------------------------------------------------------------
增加协处理器方法二:
package coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
public class RegionObserverTest extends BaseRegionObserver{
private static byte[] fixed_rowkey = Bytes.toBytes("John");
@Override
public void preGet(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, List<KeyValue> result) throws IOException {
// TODO Auto-generated method stub
if(Bytes.equals(get.getRow(), fixed_rowkey)){
KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("time"),Bytes.toBytes("time"),Bytes.toBytes(System.currentTimeMillis()));
result.add(kv);
}
}
}
alter 'students','coprocessor'=>'hdfs://192.168.1.198:8020/coprocessor.jar|coprocessor.RegionObserverTest||'
enable 'students'
java -cp .:/usr/lib/hbase/lib/* HbaseConnection
备注:程序打包,上传到hdfs根目录下,执行alter。
----------------------------------------------------------------------------------------------------------------------------------------------------------
enable 'students'
describe 'students'
删除协处理器
alter 'students',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
sudo jar tf /usr/lib/hbase/lib/hbase-examples-1.0.0-cdh5.5.0.jar | grep RowCountEndpoint
org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.class
hadoop fs -put coprocessor.jar /
javac -cp /home/cloudera/softback/hbase-0.96.2-hadoop2/lib/* HbaseConnection.java
java -cp /usr/lib/hbase/lib/* HbaseConnection
(3)重启指定表hbase > enable 'mytable'
C.API调用
HTableDescriptor htd = new HTableDescriptor("testTable");
htd.setValue("CORPROCESSOR$1",path.toString + "|" + RowCountEndpoint.class.getCanonicalName() + "|" + Coprocessor.Priority.USER);
---------------------------------------------------------
CoprocessorRowCounter.java
public class CoprocessorRowCounter{
public static void main(String[] args) throws IOException{
Configuration conf = HbaseConfiguration.create();
HTable table = new HTable(conf,"students");
final ExampleProtos.CountRequest request = ExampleProcess.CountRequest.getDefaultInstance();
Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,null,null,new Batch.Call<ExampleProtos.RowCountService,Long>(){
public Long call(ExampleProtos.RowCountService counter) throws IOException{
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ExamplerProtos.CountResponse>();
counter.getRowCount(controller,request,rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
if(controller.failedOnException()){
throw controller.getFailedOn();
}
return (response != null && response.hasCount())?response.getCount():0;
}
});
}
}
几点说明:
1、协调处理器配置的加载顺序:先加载配置文件中定义的协处理器,后加载表描述符中的协处理器
2、COPROCESSOR$<number>中的number定义了加载的顺序
3、协处理器配置格式
Coprocess can also be configured to load on a per table basis,via a shell command “alter”
hbase > alter "t1".METHOD => 'table_alt'
'coprocessor1'=>'hdfs://foo.jar|com.foo.FooRegionObserver[100]arg1=1,arg2=2'
RegionObserverTest.java
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
public class RegionObserverTest extends BaseRegionObserver{
}
>>describe 'students'
alter 'students',METHOD=>'table_att_unset',Name=>'coprocessor$1'
scan 'students'
---------------------------------------------------------------------------
MapReduce on HBase
MapReduce on Hbase
1、可以使用Mapreduce的方法操作hbase数据库
2、Hadoop Mapreduce 提供相关API,可以与hbase数据库无缝连接
3、API link: http://hbase.apache.org/devapidocs/index.html
--------------------HbaseMR.java---------------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class HbaseMR {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf,"mapreduce on hbase");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob("students", scan,MyMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("students_age",MyReducer.class, job);
job.waitForCompletion(true);
}
}
class MyMapper extends TableMapper<Text,Text>{
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Context context)
throws IOException, InterruptedException {
Text k = new Text(Bytes.toString(key.get()));
Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"),Bytes.toBytes("age")));
context.write(v, k);
}
}
class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
@Override
protected void reduce(
Text key,
Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
Put put = new Put(Bytes.toBytes(key.toString()));
for(Text value: values){
put.add(Bytes.toBytes("f1"),Bytes.toBytes(value.toString()),Bytes.toBytes(value.toString()));
}
context.write(null, put);
}
}
------------------------------------------------------------------------------------------------------
数据存储格式
B+树
特点:
1、有n棵子数的结点中含有n个关键字,每个关键字不保存数据,只用来索引,所有数据都保存在叶子节点。
2、所有的叶子结点中包含了全部关键字的信息,及指向含这些关键字记录的指针,且叶子结点本身依关键字的大小自小而大顺序链接。
3、所有的非终端结点可以看成是索引部分,结点中仅含其子树(根结点)中的最大(或最小)关键字。
HFile格式
1、数据块-保存表中的数据,每一个数据块由块头和一些keyValue(record)组成,key的值是严格按照顺序存储的。块大小默认为64K(由建表时创建cf时指定或者HColumnDescriptor.setBlockSize(size)),
这一部分可以压缩存储。在查询数据时,是以数据块为单位从硬盘load到内存。查找数据时,是顺序的遍历该块中的keyValue对。
2、元数据块(可选的)-保存用户自定义的kv对,可以被压缩。比如booleanfilter就是存在元数据块中的,该块只保留value值,key值保存在元数据索引块中。每一个元数据块由块头和value值组成。可以
快速判断key是都在这个HFile中。
3、File Info-Hfile的元信息,不被压缩,用户也可以在这一部分添加自己的元信息。
4、数据索引块-Data Block的索引,每条索引的key是被索引的block的第一条记录的key(格式为:头信息,数据块offset数据块大小块第一个记录的key,.....)
----------------------------------------------------------------------------------------------------
WAL机制
WAL预写日志
1)Client向RegionServer端提交数据的时候,会优先写WAL日志(HLog),只有当WAL日志写成功以后,Client才会被告诉提交数据成功,如果写WAL失败会告知客户端提交失败
2)一个RegionServer上所有的Region都共享一个HLog,一次数据的提交是先写WAL,再写memstore
HLog类
实现了WAL的类叫做HLog。当HRegion被实例化时,HLog实例会被当做一个餐所以和到HRegion的构造器中。当一个Region接收到一个更新操作时,它可以直接把数据保存到一个共享的WAL实例中去。
HLogKey类
1、当前的WAL使用的是Hadoop的SequenceFile格式,其key是HLogKey实例。HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括sequence number和timestamp,timestamp是“写入时间”,
sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
2、HLog Sequence File的Value是Hbase的KeyValue对象,即对应HFile中的KeyValue
WALEdit类
客户端发送的每个修改都会封装成WALEdit类,一个WALEdit类包含了多个更新操作,可以说一个WALEdit就是一个原子操作,包含若干个操作的集合。
LogSyncer类
Table在创建的时候,有一个参数可以设置,是否每次写Log日志都需要往集群里的其他机器同步一次,默认是每次都同步,同步的开销是比较大的,但不及时同步又可能因为机器宕而丢日志。
同步的操作现在是通过Pipeline的方式来实现的,Pipeline是指datanode接受数据后,再传给另外一台datanode,是一种串行的方式;n-Way Wirtess是指多datanode同时接受数据,最慢的一台结束就是整个结束。
差别在于一个延迟大,一个并发高,hdfs现在正在开发中,以便可以选择是按Pipeline还是n-Way Write来实现写操作。
Table如果设置每次不同步,则写操作会被RegionServer缓存,并启动一个LogSyncer线程来定时同步日志,定时时间默认是一秒也是由hbase.regionserver.optionallogflushinterval设置。
LogRoller类
1、日志写入的大小是有限制的。LogRoller类会作为一个后台线程运行,在特定的时间间隔内滚动日志。通过hbase.regionserver.logroll.period属性控制,默认1小时。
---------------------------------------------------------
Hbase在线数据备份
Hbase Replication
HBase复制是一种不同HBase部署中复制数据的方法。它可以作为一种故障恢复的方法,并提供HBase层次的高可用性。
HBase复制中最基本的架构模式是“主推送”(master-push),因为每个region server都有自己的WAL(或HLog),所以很容易保存现在正在复制的位置。正如众所周知的解决方案-MySql的主/从复制,只使用二进制
文件来跟踪修改。一个主集群可以将数据复制到任意数目的从集群,每个region server都会参与复制自己的修改。
来自每个region server的HLog是Hbase复制的基础,并且只要它们需要将数据复制到从集群,它们就必须被保存到HDFS上。每个region server从它需要的最老的日志开始复制,同时在zookeeper中保存当前恢复的位置
来简化错误恢复。每个从集群恢复的位置可能不同,但它们处理的HLog队列内容是相同的。参与复制的集群的规模可以不对等。主集群会通过随机分配尽量均衡从集群的负载。
解决问题:
数据管理人员的失误,不可逆的DDL操作。
底层HDFS文件BLOCK块corruption
短时间过度的读数据对集群造成的压力,增加服务器应对这种情况比较浪费资源
系统升级,维护,诊断问题会造成集群不可用时间增长。
双写的原子性难以保证
不可预计的一些原因。(如机房断电,大规模硬件损坏,断网等)
离线应用的MR计算对在线读写造成的较大的延迟影响
对于数据中心的数据冗余的备份方案,目前从一致性,事务性,延迟,吞吐量,数据损失,Failover几个角度来分析有一下几种方案。
简单备份模式通过定时不定时的Dump出集群数据保证数据的安全性,通常可以通过snapshot或设置时间戳来dump数据来实现这种方案,如果方案简介,设计优雅可以做到对在线数据中心低于干扰或无干扰的数据备份。但这
中方案缺点也是显而易见的,只是对时间点之前的数据安全性得到保障,如果发生突发事件会导致不可避免的整段时间的数据丢失,为很多人无法接受。
主从模式(Maser-Slave)这种模式比起简单的备份模式多了很多优点,可以通过最终一致性保证数据的一致,数据从主集群到备集群延时较低,异步写入不会对主集群带来性能压力,基本不会产生多少性能的影响,突发
事件来临时数据丢失很少,并且主集群的事务在备集群也可以得以保证。一般通过构造较好的Log系统加上check Point来实现,可以实现读写分离,主集群可以担当读写服务,但备集群一般只承担读服务。
主主模式(Master-Master)原理总体类似于主从模式,不同的是2个集群可以互相承担写的分离,都可承担读写服务。
2阶段提交这种方案保证了强一致性和事务,服务器返回给客户端成功则表明数据一定已经成功备份,不会造成任何数据丢失。每台服务器都可以承担读写服务。但缺点是造成集群延迟较高,总体吞吐下降。
Paxos算法基于Paxos算法的实现的强一致性方案,同一客户端连接的server能保证数据的一致性。缺点是实现复杂,集群延迟和吞吐随着集群服务器增加而边差。
部署步骤
1)编辑集群中所有机器的${HBASE_HOME}/conf/hbase-site.xml文件,增加如下配置:
<property>
<name>hbase.replication</name>
<value>true</value>
</property>
修改完成后,重启HBase集群,使配置生效
2)在HBase shell中运行如下命令:
>>add_peer 'ID''CLUSTER_KEY'
>>start_replication
第一条命令是为从集群设置zookeeper集群信息,这样可以使得修改被同步到从集群上。第二条命令真正将修改过的记录发布到从集群上。为了保证工作能按照预期进行,用户必须保证已经在从集群上建立了一个相同的表的副本,
表可以为空,但必须有相同的模式和表名。
注意:hbase-0.96和hbase-0.98已经没有start_replication命令和stop_replication命令。hbase-0.98相较hbase-0.96,新增了set_peer_tableCFs、show_peer_tablesCFs命令。在设定复制时,hbase-0.98需要使用set_peer_tableCFs设置。
具体的有对应的帮助命令可供参考。
ID必须是一个短整数,CLUSTER_KEY的内容请参考以下模板:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
比如,zk.server.com:2181:/hbase
注意:如果两个集群使用相同的zookeeper集群,你不得不使用不同的zookeeper.znode.parent,因为它们不能写入相同的文件夹中。
3)一旦你有一个对等(从)集群,你需要在你的列簇上使复制可用,要想达到这样的效果,可以在Hbase shell中执行如下命令:
disable ‘you——table’
alter ‘your-table’,{NAME =>'family_name',replication_scope=>'1'}
enable 'your_table'
scope值为0(默认值)意味着它不会被复制,而scope值为1意味着它将被复制.
4)运行如下命令可以列出所有配置的对等(从)集群:
》》list_peers
5)运行如下命令将使对等(从)集群不可用:
》》disable_peer 'ID'
运行完命令后,HBase将停止将向对等(从)集群发送修改,但是它将一直跟踪所有新的WALs文件,以便当从集群可用时继续复制。
6)可以运行如下命令将使之前设置为了不可用的对等(从)集群可用:
>>enable_peer'ID'
7)运行下面的命令,可以移除一个从集群:
stop_replication
remove_peer 'ID'
需要注意的是,停止复制仍会完成所有已在队列里的修改的复制,但是之后所有的处理都被停止了。为了确认你的配置都正常工作,你可以查看任何一个regionserver的日志文件,看是否由类似下面几行的内容:
Considering 1 rs,with ratio 0.1
Getting 1 rs from peer cluster#0
Choosing peer 10.10.1.49:62020
Hbase集群数据迁移
1)静态迁移方案
Hadoop distcp
在hbase停止的状态下进行数据的迁移
distcp + add_table.rb
------------------------------------------------------
2)动态迁移方案
-Replication备份方案
-CopyTable方案
-Export and Import方案
动态迁移方案
1、Replication备份方案
修改hbase-site.xml配置,增加hbase.replication属性
增加表属性REPLICATION_SCOPE属性
add_peer
2、CopyTable方案
命令:./hbase org.apache.hadoop.hbase.mapreduce.CopyTable--peer.adr=new cluster ip:2181:/hbase_table
说明:
(1)拷贝完成,不需要重启机器,在new cluster中就可以看到该表
(2)稳定性还需要考虑
3、Export and Import方案
步骤:(1)在old cluster上执行:./hbase
org.apache.hadoop.hbase.mapreduce.Export test hdfs://new clusterip:9000/xxx
(2)在new cluster上执行:./hbase
org.apache.hadoop.hbase.mapreduce.Import test hdfs://new clusterip:9000/xxx
说明:(1)一定要写全路径,不能写相对路径
(2)在Import前,需要将表事先在new cluster中创建好
-----------------------------------------------------------------------------------
3)手动方式
把hbase元数据表打包 tar zcvf ...
hadoop fs -put studnets /hbase/data/default
1、从源hbase集群中复制出HBase数据库表到本地目录
hadoop fs -get
2、目标HBase导入
hadoop fs -put
3、修复.META表
hbase hbck -fixMeta
4、重新分配数据到各RegionServer
hbase hbck -fixAssignments
---------------------------------------------------------------------------------
数据导入方案
1、利用ImportTsv将csv文件导入到HBase 备注:表要前创建
2、利用completebulkload将数据导入到HBase
3、利用Import将数据导入到HBase
数据导入方案一
利用ImportTsv将csv文件导入到HBase
命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv-Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-001 /simple.csv
simple.csv内容如下:
1、“tom”
2、“sam”
3、“jerry”
4、“marry”
5、“john”
-----------------------------------------------------------------------------
数据导入方案二
HBase支持bulkload的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合
mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到HBase的方式来代替之前直接调用HTableOutputFormat的方法有如下好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
利用completebulkload将数据导入到HBase
1)先通过ImportTsv生成HFile
命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-001 /simple.csv
2)通过completebulkload将数据导入表hbase-tbl-002
命令:hadoop jar lib/hbase-server-0.96.0.jar completebulkload /hfile_tmp hbase-tbl-002
利用Import将数据导入到HBase
1)HBase export工具导出的数据格式是sequence file。比如,在执行完命令“bin/hbase org.apache.hadoop.hbase.mapreduce.Export hbase-tbl-002/test-output”后,hbase会启动一个MapReduce作业,作业
完成后会在hdfs上面会生成sequence file格式的数据文件
2)对于这类Sequence file格式的数据文件,HBase是可以通过Import工具直接将它导入到HBase的表里面的。执行命令“bin/hbase org.apache.hadoop.hbase.mapreduce.Import hbase-tbl-003 /test-output”,
随后hbase会启动一个MapReduce作业,然后表test会成功导入数据。
---------------------------------------------------------------------------
Hbase二级索引
MapReduce方案
IndexBuilder:利用MR的方式构建Index
优点:并发批量构建Index
缺点:不能实时构建Index
举例:
原表:row 1 f1:name zhangsan
row 2 f1:name lisi
row 3 f1:name wangwu
索引表:row zhangsan f1:id 1
row lisi f2:id 2
row wangwu f3:id 3
create 'student','f1'
put 'student','1','f1:name','lisi'
put 'student','2','f1:name','zhangsan'
put 'student','3','f1:name','wangwu'
create 'student-name','f1'
程序放到/usr/lib/hbase/lib/目录下
>>hbase com.hbase.IndexBuilder student f1 name
给表student 添加二级索引 studnet-name
示例如下:
-----------------------------------------IndexBuilder.java----------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import javax.ws.rs.PUT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class IndexBuilder {
static class MyMapper extends TableMapper<ImmutableBytesWritable,Put>{
private HashMap<byte[], ImmutableBytesWritable> indexes = new HashMap<byte[],ImmutableBytesWritable>();
private String familyName;
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Context context)
throws IOException, InterruptedException {
Set<byte[]> keys = indexes.keySet();
for(byte[] k:keys){
ImmutableBytesWritable indexTableName = indexes.get(k);
byte[] val = value.getValue(Bytes.toBytes(familyName),k);
if(val !=null){
Put put = new Put(val);
put.add(Bytes.toBytes("f1"),Bytes.toBytes("id"),key.get());
context.write(indexTableName, put);
}
}
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String tableName = conf.get("tableName");
String familyName = conf.get("familyName");
String[] qualifiers = conf.getStrings("qualifiers");
for(String q: qualifiers){
indexes.put(Bytes.toBytes(q),new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + q)));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length < 3){
System.exit(-1);
}
String tableName = args[0];
String columnFamily = args[1];
conf.set("tableName", tableName);
conf.set("columnFamily", columnFamily);
String[] qualifiers = new String[otherArgs.length -2];
for(int i =0; i < qualifiers.length;i++){
qualifiers[i] = otherArgs[i+2];
}
conf.setStrings("qualifiers", qualifiers);
Job job = new Job(conf,tableName);
job.setJarByClass(IndexBuilder.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
Scan scan = new Scan();
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, PUT.class, job);
job.waitForCompletion(true);
}
}
------------------------------Hbase二级索引------------------------------------------------------------------------------------------------
1、Mapreduce方案
2、ITHBASE方案
3、IHBASE方案
4、Coprocessor方案
5、Solr + hbase方案
IHBASE方案
优点:IHBase(Indexed HBase)是HBASE的一个扩展,用于支持更快的扫描。
缺点:需要重构hbase
原理:在Memstore满了以后刷磁盘时,IHBase会进行拦截请求并为这个memstore的数据构建索引,索引另一个CF的方式存储在表内。scan的时候,IHBase会结合索引列中的标记,来加速scan
ITHBASE方案
优点:ITHBASE(Indexed Transactional HBase)是HBase的一个事务型的带索引的扩展
缺点:需要重构hbase,几年没有更新
Coprocessor方案
HIndex-来自华为的HBase二级索引
Solr方案
Solr是一个独立的企业级搜索应用服务器,它对外提供类似于Web-service的API接口。用户可以通过http请求,向搜索引擎服务器提交一定格式的XML文件,生成索引;也可以通过Http Get操作
提出查找请求,并得到XML格式的返回结果。
Solr是一个高性能,采用Java5开发,基于Lucene的全文搜索服务器。同时对其进行扩展,提供了比Lucene更为丰富的查询语言,同时实现了可配置、可扩展并对查询性能进行了优化,并且提供了一个
完善的功能管理界面,是一款非常优秀的全文搜索引擎。
HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力
基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询快速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBase中通过
指定rowkey进行查询。
--------------------------------------------------------------------------------------------------------
Hbase快照技术
Snapshot(快照)
什么是快照
快照就是一份元信息的集合,允许管理员恢复到表的先前状态。快照不是表的复制而是一个文件名称列表,因而不会复制数据。
完全快照恢复是指恢复到之前的“表结构”以及当时的数据,快照之后发生的数据不会恢复。
快照的作用
hbase中存在的备份或克隆表的方法就是使用复制、导出表或者在关闭表之后拷贝HDFS中所有的hfile。复制/导出是通过一系列工具调用MapReduce来扫描并复制表,这样会对RegionServer有直接的影响。
关闭表会停止所有的读写操作,实际环境中往往无法接受。
相比之下Hbase快照允许管理员不拷贝数据直接克隆一张表,这对域服务器产生的影响最小。将快照导出至其他集群不会直接影响到任何域服务器;导出只是带有一些额外逻辑的群间数据同步。
快照使用场景
从用户、应用异常中还原
从一个已知的安全状态恢复、还原
查看之前的快照并有选择性的合并不同写入产品环境
当主应用程序升级或改版时保存快照
在指定时间审查和、或报告数据
按照规定捕获月度数据
生成日终、月末、季末报告
应用测试
通过快照模拟生产环境下结构或应用发生的变化,测试完成即可丢弃
例如:生成快照,利用快照中内容构建新表(原有结构+数据)并且修改新表的结构,添加或删除列之类。(原始表、快照和新表保持互相独立)
减少工作压力
生成快照,导入到其他集群,然后运行MapReduce jobs。因为导出的快照是HDFS级别,所以不会像复制表那样降低HBase主集群的效率。
快照操作
生成快照:本操作尝试对指定表生成快照。如果集群在执行数据均衡、分隔或合并等操作是,可能会引起操作失败。
克隆快照:本操作使用与指定快照相同的结构数据构建一张新表。操作结果会生成一张有完整功能的表,对该表的任意修改不会对原表或快照产生影响。
还原快照:本操作将表结构和数据恢复到生成快照时的状态。(注意:本操作会舍弃快照生成后任何改变)。
删除快照:本操作将系统中的快照删除,释放未共享的磁盘空间,而且不会影响其他克隆或快照。
导出快照:本操作将快照数据和元数据复制到其他集群。操作只会涉及HDFS,不会与Master或域服务器产生任何联系,因此HBase集群可以关闭。
快照优势
导出快照与复制、导出表除了更好的保持一致性外,主要的不同在于导出快照是在HDFS的层面操作的。
这意味着Master和域服务器与操作无关。因此不需要为不必要的数据创建缓存空间,也不会有扫描过程因为大量对象创建引起的GC暂停。对于HBase来说主要性能影响就是DataNode额外的网络和磁盘负载。
snapshot 'students','students_snapshot'
list_snapshots
clone_snapshot 'students_snapshot','students_new'
数据导入方案三
通过检查hbase-site.xml的hbase.snapshot.enable是否设置为true确认打开了快照许可。
1、获取指定表的快照使用snapshot命令(不产生文件复制)
snapshot 'tableName','snapshotName'
2、列出所有的快照,使用list_snapshot命令。会展示出快照名称,源表,以及创建日期和时间
list_snapshots
3、删除快照使用deleted_snapshot命令。删除快照不会影响到克隆表或者之后生成的快照。
delete_snapshot 'snapshotName'
4、使用clone_snapshot命令从指定的快照生成新表(克隆)。由于不会产生数据复制,所以最终用到的数据不会是之前的两倍。
clone_snapshot 'snapshotName','newTableName'
5、使用restore_snapshct命令指定快照内容替换当前表结构/数据。
disable 'students'
restore_snapshot 'snapshotName'
enable 'students'
6、使用ExportSnapshot工具将现有快照导出至其他集群。导出工具不会影响到域服务器负载,只是工作在HDFS层面所以需要指定HDFS路径(其他集群的hbase根目录)
hbase org.apache.hadoop.hbase.snapshot ExportSnapshot -snapshot
SnapshotName -copy -to hdfs://srv2:8020/hbase
delete 'students','Tom','moreInfo:tel'
-----------------------------------------------------------------------
Hbase BloomFilter(布隆过滤器)
集合表示和元素查询
下面我们具体来看BloomFilter是如何用位数组表示集合的。初始状态时,BloomFilter是一个包含m位的位数组,每一位都置为0
为了表达S=(x1,x2,x3,。。。xn)这样一个n个元素的集合,BloomFilter使用k个相互独立的哈希函数(HashFunction),它们分别将集合中的每个元素映射到(1,。。。m)的范围中。对任意一个元素x,
第i个哈希函数映射的位置h1(x)就会被置为1(1<=i<=k).注意,如果一个位置多次被置为1,那么只有第一次会起作用,后面几次将没有任何效果。在下图中,k=3,且有两个和希函数选中同一个位置(从左边数第五位)
在判断y是否属于这个集合时,我们对y应用k次哈希函数,如果所有hi(y)的位置都是1(1<=i<=k),那么我们就认为y是集合中的元素,否则就认为y不是集合中的元素,下图中y1就不是集合中的元素。y2或者属于这个集合,
或者刚好是一个false positive。
Bloomfilter在HBase中的作用?
HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化)
Bloomfilter在Hbase中的开销?
Bloomfilter是一个列族cf级别的配置属性,如果在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MeataBlock;MetaBlock与DataBlock(真实的KeyValue数据)
一起由LRUBlockCache维护。所以开启bloomfilter会有一定的存储及内存cache开销。
HBase中的Bloomfilter的类型及使用?
a)ROW,根据KeyValue中的row来过滤storefile
举例:假设有2个storefile文件sf1和sf2
sf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v)
sf2包含kv3(r3 cf:q1 v)、kv4(r4 cf:q1 v)
如果设置了CF属性中的bloomfilter为ROW,那么get(r1)时就会过滤sf2,get(r3)就会过滤sf1
b)ROWCOL,根据KeyValue中的row+qualifier来过滤storefile
举例:假设有2个storefile文件sf1和sf2,
sf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v)
sf2包含kv3(r1 cf:q2 v)、kv4(r2 cf:q2 v)
如果设置了CF属性中的bloomfilter为ROW,无论get(r1,q1)还是get(r1,q2),都会读取sf1+sf2;而如果设置了CF属性中的bloomfilter为ROWCOL,那么get(r1,q1)就会过滤sf2,get(r1,q2)就会过滤sf1
ROWCOL和ROW对比
a)ROWCOL只对指定列(Qualifier)的随机读(Get)有效,如果应用中的随机读get,只含row,而没有指定读哪个qualifier,那么设置ROWCOL是没有效果的,这种场景就应该使用ROW
b)如果随机读中指定的列(Qualifier)的数目大于等于2,在0.90版本中ROWCOL是无效的,而在0.92版本以后,HBase-2794对这一情景作了优化,是有效的(通过KeyValueScanner#seekExactly)
c)如果同一row多个列的数据在应用上是同一时间put的,那么ROW与ROWCOL的效果近似相同,而ROWCOL只对指定了列的随机读才会有效,所以设置为ROW更佳
d)ROWCOL与ROW只在名称上有联系,ROWCOL并不是ROW的扩展,不能取代ROW
--------------------------------------------------------------------------------------------
基于Solr的实时查询
实时查询方案
hbase+solr+hbase indexer
1)hbase提供海量数据存储
2)solr提供索引构建与查询
3)hbase indexer提供自动化索引构建(从hbase到solr)
----------------------------------------------------------------------------
部署与安装
CDH5.x发行版中提供了hbase、solr以及hbase indexer的下载
实验选取CDH5.0.2版本,选择需下载以下三个:
hbase-0.96.1.1-cdh5.0.2.tar.gz
hbase-solr-1.3-cdh5.0.2.tar.gz
solr-4.4.0-cdh5.0.2.tar.gz
其中hbase-solr-xxx,来源NGDATA的开源项目,使用及说明参考
hbase->hbase inexder -> solr
---------------------------------------------------------------------------------
Solr查询
利用Solr操作solr API,使用SolrJ操作Solr会比利用httpClient来操作Solr要简单
查询使用的类:
1、HttpSolrServer
2、SolrQuery
192.168.1.198:8983/solr/select?q=1astname_s:Louis
Main.java
public class Main{
public static void main(String[] args){
String url = "http://192.168.1.198:8983/solr";
HttpSolrServer serve = new HttpSolrServer(url);
server.setConnectionTimeout(1000);
server.setSoTimeout(3000);
server.setDefaultMaxConnectionPerHost(1000);
server.setFollowRedirects(false);
server.setAllowCompression(true);
server.setMaxRetries(1);
SolrQuery query = new SolrQuery();
query.setQuery("firstname s:Jim");
try{
QueryResponse resp = server.query(query);
List<Item> item = resp.getBeans(Item.class);
for(Item item: items){
System.out.println(item.id);
conn.format(conn.getData("indexdemo-user",item.id));
}
}catch(SolrServerException e){
e.printStackTrace();
}
}
}
Item.java
public class Item{
@Field
String id;
@Field
String firstname_s;
@Field
String lastname_s;
}
---------------------------------------------------------------------------------------------------
HBaseConnection.java
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseConnection{
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
public HbaseConnection(String rootDir,String zkServer,String port){
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir",rootDir);
conf.set("hbase.zookeeper.quorum",zkServer);
conf.set("hbase.zookeeper.property.clientPort",port);
try{
hConn = HConnectionManager.createConnection(conf);
}catch(IOException e){
e.printStackTrace();
}
}
public void createTable(String tableName,List<String> cols){
try{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName))
throw new IOException("table exists");
else{
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(String col : cols){
HColumnDescriptor colDesc = new HColumnDescriptor(col);
colDesc.setCompressionType(Algorithm.GZ);
colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
tableDesc.addFamily(colDesc);
}
admin.createTable(tableDesc);
}
}catch(MasterNotRunningException e){
e.printStackTrace();
}catch(ZooKeeperConnectionException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
public void saveData(String tableName,List<Put> puts){
try{
HTableInterface table = hConn.getTable(tableName);
table.put(puts);
table.setAutoFlush(false);
table.flushCommits();
}catch(IOException e){
e.printStackTrace();
}
}
public Result getData(String tableName,String rowkey){
try{
HTableInterface table = hConn.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch(IOException e){
e.printStackTrace();
}
return null;
}
public void format(Result result){
String rowkey = Bytes.toString(result.getRow());
KeyValue[] kvs = result.raw();
for(KeyValue kv: kvs){
String family = Bytes.toString(kv.getFamily());
String qualifier = Bytes.toString(kv.getQualifier());
System.out.println("rowkey->" + rowkey + "family->" + family + "qualifier->" + qualifier);
}
}
public void hbaseScan(String tableName){
Scan scan = new Scan();
scan.setCaching(1000);
try{
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res)
}
}catch(IOException e){
e.printStackTrace();
}
}
public void filterTest(String tableName){
Scan scan = new Scan():
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
///new BinaryComparator(Bytes.tobytes("Jack")));
new RegexStringComparator("J\\w+")));
PageFilter filter = new PageFilter(15);//分页显示
scan.setFilter(filter);
try{
HtableIntegerface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
}
}catch(IOException e){
e.printStackTrace();
}
}
public void pageFilterTest(String tableName){
PageFilter filter = new PageFilter(4);
byte[] lastRow = null;
int pageCount = 0;
try{
HtableInterface table = hConn.getTable(tableName);
while(++pageCount > 0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
scan.setStartRow(lastRow);
}
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
lastRow = res.getRow();
if(++count > 3)
break;
format(res);
}
if(count < 3)
break;
}
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args){
String rootDir = "hdfs://hbase";
String zkServer = "192.168.1.198";
String port = "2181";
HbaseConnection conn = new HbaseConnection(rootDir,zkServer,port);
/* List<String> cols = new LinkedList<>();
cols.add("basicInfo");
cols.add("moreInfo");
conn.createTable("students",cols);*/
List<Put> puts = new LinkedList<Put>();
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("27"));
put1.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11232"));
Put put2 = new Put(Bytes.toBytes("Jim"));
put2.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("28"));
put2.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11233"));
puts.add(put1);
puts.add(put2);
conn.saveData("students",puts);
Result result = conn.getData("students","Tom");
conn.format(result);
conn.hbaseScan("studnets");
conn.filterTest("students");
}
}
备注:因为solr和zookpeer有问题,实例没有通过,有时间在调整。
--------------------------------------------------------------------------------------------
HBase基础
1)HBase简介
2)HBase特点
3)HBase数据模型
4)HBase体系结构
5)HBase存储模型
6)HBase应用
简介
1)HBase是一个分布式的、多版本的、面向列的开源数据库
2)HBase利用Hadoop HDFS作为其文件存储系统,提供高可靠性、高性能、列存储、可伸缩、实时读写、适用于非结构化数据存储的数据库系统
3)HBase利用Hadoop MapReduce来处理HBase中的海量数据
4)HBase利用Zookeeper作为分布式协同服务
特点:
1)数据量大:一个表可以有上亿行,上百万列(列多时,插入变慢)
2)面向列:面向列(族)的存储和权限控制,列(族)独立检索
3)稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏
4)多版本:每个cell中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳
5)无类型:HBase中的数据都是字符串,没有类型
6)强一致性:同一行数据的读写只在同一台Region Server上进行
7)有限查询方式:仅支持三种查询方式(单个rowkey查询,通过rowkey的range查询,全表扫描)
8)高性能随机读写
数据模型
1)行:同一个key对应的所有数据
2)列族:相似的列数据通常被划分成一个列族,建表时确定
3)列:列名在写入时确定
4)Cell及时间戳(版本):
每个cell有任意多的版本
建表时设置每个列族可以保留多少个版本
5)三维有序
SortedMap(RowKey,List(SortedMap(Column,List(Value,TimeStamp))))
rowkey(ASC)+columnLabel(ASC)+Version(DESC)->value
|------------|--------------|--------------------|----------------------------|---------------|
|Row key |Time Stamp | Column "contents:" | Column "anchor:" | Column "mime:"|
|------------|--------------|--------------------|------------------|---------| |
|com.cnn.www |t9 | | anchor:cnnsi.com |CNN | |
|------------|--------------|--------------------|------------------|---------|---------------|
| |t8 | | achor:my.look.ca |CNN.com | |
|------------|--------------|--------------------|------------------|---------|---------------|
| |t6 | "<html>..." | | "text/html" |
|------------|--------------|--------------------|----------------------------|---------------|
| |t5 | "<html>..." | | |
|------------|--------------|--------------------|----------------------------|---------------|
| |t3 | "<html>..." | | |
|------------|--------------|--------------------|----------------------------|---------------|
--------------------------------------------------
1)面向列的存储
2)一张表可以被划分成若干个region
3)行按照rowkey进行字典序排序
4)支持随机读写
5)region是负载均衡调度的最小单位
------------------------------------------------------------------------------
体系结构
1)Client
包含访问HBase的接口并维护cache来加快对HBase的访问
2)Zookeeper
保证任何时候,集群中只有一个master
存储所有Region的寻址入口:
实时监控Region server的上线和下线信息,并实时通知给Master;
存储HBase的schema和table元数据
3)Master
为Region server分配region;
负责Region server的负载均衡
发现失效的Region server并重新分配其上的 region
管理用户对table的增删改查操作
4)Region Server
负责维护region,处理对这些region的IO请求
负责切分在运行过程中变得过大的region
5)ROOT表
记录META表中的每个region的位置,ROOT表最多只有一个region Zookeeper中记录了ROOT表的location
6)META表
记录各个表每个region所在的region server,META表可能包含多个region
------------------------------------------------------------------------------------------
HBase操作
1)flush
内存容量有限,需要定期将内存中的数据flush到磁盘
每次flush,每个region的每个column family都会产生一个HFile
读取操作,region server会把多个HFile数据归并到一起
2)compaction
flush操作产生的HFile会越来越低,需要归并来减少HFile的数量
旧数据会被清理
3)split
HFile大小增长到某个阀值就会split,同时把Region split成两个region,这两个region被分发到其他不同的region server上
4)scan
hbase原生提供的方法,顺序扫库;当然可以使用MapReduce并发扫库的方法
5)Bulk Load
快速导入大批量数据的方法
-------------------------------------------------------------------------------------------
存储模型
1)一个table中的一个region会被随机分配给一个region server
2)region是分布式存储和负载均衡的最小单位
------------------------------------------------------
最开始table只有一个region,但是随着数据的put,到达某个阀值的时候,一个大的region会split成几个小region,被分配到其他region server
------------------------------------------------
HFile结构
1)基于Block的存储结构
2)Block的索引驻留内存
---------------------------------
HBase应用
1)HBase环境搭建
配置hbase-env.sh
hbase-site.xml
regionservers
运行start-hbase.sh
启动后执行hbase shell进入交互式命令行
2)HBase API使用举例
Put/Get
Scan
.....
-----------------------------------------------------------------------------------------------------------
Hbase_test.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
public class Hbase_test {
public static void main(String[] args){
Configuration conf = new Configuration();
HBaseConfiguration hbconf = new HBaseConfiguration(conf);
/*try {
HBaseAdmin admin = new HBaseAdmin(hbconf);
HTableDescriptor tableDesc = new HTableDescriptor("test");
tableDesc.addFamily(new HColumnDescriptor("info"));
admin.createTable(tableDesc);
} catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
try {
HTable table = new HTable(hbconf,"test");
/*Put put = new Put("row1".getBytes());
put.add("info".getBytes(),"A".getBytes(),"1".getBytes());
table.put(put);
table.close();*/
Scan scan = new Scan();
ResultScanner res = table.getScanner(scan);
for(Result r:res){
for(KeyValue kv: r.raw()){
System.out.println("rowkey=> " + new String(r.getRow())
+ " family=> " + new String(kv.getFamily())
+ " qualifier=> " + new String(kv.getQualifier())
+ " timestamp=> " + kv.getTimestamp()
+ " value=> " + new String(kv.getValue()));
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------------------------------------------
HBase系统搭建与部署
1)开源社区版本下载
-http://hbase.apache.org/
2)Cloudera发行版下载
-http://archive-primary.cloudera.com/cdh5/cdh/5/
Hadoop2.0部署
1)配置hosts 确保涉及的主机名均可以解析
2)编辑hadoop-env.sh,mapred-env.sh,yarn-env.sh
3)编辑core-site.xml,hdfs-site.xml,yam-site.xml
5)编辑slaves文件
6)把Hadoop复制到其他节点
7)启动Hadoop
8)验证启动
Hadoop2.0 HA部署
采用QJM方案
1、启动JN
./sbin/hadoop-daemon.sh start jourmalnode
./sbin/hadoop-daemons.sh start journalnode
hdfs namenode -initialize SharedEdits
2、启动Active NN
hdfs namenode -format
bin/hadoop-daemon.sh start namenode
3、启动Standby NN
hdfs namenode -bootstrap Standby
./sbin/hadoop-daemon.sh start namenode
4、启动Automatic Failover
hdfs zkfc -formatZK
./sbin/hadoop-daemon.sh start zkfc
hdfs haadmin -transitionToStandby nn1
hdfs haadmin -failover nn1 nn2
hdfs haadmin -getServiceState nn1
-----------------------------------------------------------------------------------------------------------------------------
HBase Shell 操作
(1)建立一个表scores,有两个列族grad和courese
>>create 'scores','grade','course'
(2)查看Hbase中的所有表
>>list
(3)查看表结构
>>describe 'scores'
Shell操作
4、按设计的表结构插入值
put 'scores','Tom','grade:','5'
put 'scores','Tom','course:math','97'
put 'scores','Tom','course:art','87'
put 'scores','Jim','grade:','4'
put 'scores','Jim','course:','89'
put 'scores','Jim','course:','80'
put命令比较简单,只有这一种用法:
put 't1','r1','c1','value',ts1
t1指表名,r1指定键名,c1指列名,value指单元格值。ts1指时间戳,一般都省略掉了。
5、根据键值查询数据
>>get 'scores','Jim'
>>get 'scores','Jim','grade'
get 't1','r1'
get 't1','r1',{TIMERANGE=>[ts1,ts2]}
get 't1','r1',{COLUMN=>'c1'}
get 't1','r1',{COLUMN=>['c1','c2','c3']}
get 't1','r1',{COLUMN=>'c1',TIMESTAMP=>ts1}
get 't1','r1',{COLUMN=>'c1',TIMERANGE=>[ts1,ts2],VERSIONS=>4}
get 't1','r1',{COLUMN=>'c1',TIMESTAMP=>ts1,VERSION=>4}
get 't1','r1','c1'
get 't1','r1','c1','c2'
get 't1','r1',['c1','c2']
6、扫描所有数据
scan 'scores'
也可以指定一些修饰词:TIMERANGE,FILTER,LIMIT,STARTROW,STOPROW,TIMESTAMP,MAXLENGTH,or COLUMNS.没任何修饰词,就是上边例句,就会显示所有数据行
scan '.META.'
scan '.META.',{COLUMNS=>'info:regioninfo'}
scan 't1',{COLUMNS=>['c1','c2'],LIMIT=>10,STARTROW=>'xyz'}
scan 't1',{COLUMNS=>'c1',TIMERANGE=>[1303668804,1303668904]}
scan 't1',{FILTER=>"(PrefixFilter('row2')AND(QualifierFilter(>=,'binary:xyz')))AND(TimestampsFilter(123,456))"}
scan 't1',{FILTER=>org.apache,hadoop,hbase.filter,ColumnPaginationFilter.new(1,0)}
7、删除指定数据
delete 'scores','Jim','grade'
delete 'scores','JIm'
删除数据命令也没太多变化,只有一个:
delete 't1','r1','c1','ts1'
另外有一个deleteall命令,可以进行整行的范围的删除操作,慎用!
如果需要进行全表删除操作,就使用truncate命令,其实没有直接的全表删除命令,这个命令也是disable,drop,create三个命令组合出来的。
8、修改表结构
disable 'scores'
alter 'scores',NAME=>'info'
enable 'scores'
alter命令使用如下(如果无法成功的版本,需要先通过表disable):
a、改变或添加一个列族
alter 't1',NAME=>'f1',VERSIONS=>5
b、删除一个列族:
alter 't1',NAME=>'f1',METHOD=>'delete'
alter 't1','delete'=>'f1'
9、统计行数
>>count 't1'
>>count 't1',INTERVAL=>100000
>>count 't1',CACHE=>1000
>>count 't1',INTERVAL=>10,CACHE=>1000
count一般会比较耗时,使用MapReduce进行统计,统计结果会缓存,默认是10行。统计间隔默认的是1000行(INTERVAL)
10、表的删除
先禁用表 distable 'table'
删除表 drop 'table'
------------------------
客户端API使用操作
|--------------------|--------------------------------|
|Java类 | HBase数据模型 |
|--------------------|--------------------------------|
|HBaseAdmin | 数据库(DataBase) |
|HBaseConfiguration | |
|--------------------|--------------------------------|
|HTable | 表(Table) |
|--------------------|--------------------------------|
|HTableDescriptor | 列族(Column Family) |
|--------------------|--------------------------------|
|Put | |
|Get | 列修饰符(Column Qualifier)|
|Scanner | |
|--------------------|--------------------------------|
HBaseConfiguration
包名:org.apache.hadoop.hbase.HBaseConfiguration
作用: 对HBase进行配置
用法示例:
HBaseConfiguration hconfig = new HBaseConfiguration();
hconfig.set("hbase.zookeeper.property.clientPort","2181");
|--------|-----------------------------------------|------------------------------------------------------------------------|
|返回值 | 函数 | 描述 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | addResource(Path file) |通过给定的路径所指的文件来添加资源 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | deal() |清空所有设置的属性 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|string | get(String name) |获取属性名对应的值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|String | getBoolean |获取为boolean类型的属性值,如果其属性值类型部位boolean,则返回默认属性值|
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | set(String name,String value) |通过属性名来设置值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
|void | setBoolean(String name,boolean value) |设置boolean类型的属性值 |
|--------|-----------------------------------------|------------------------------------------------------------------------|
HBaseAdmin
包名:org.apache.hadoop.hbase.client.HBaseAdmin
作用:提供了一个接口来管理HBase数据库的表信息。它提供的方法包括:创建表,删除表,列出表项,使表有效或无效,以及添加或删除表列族成员等。
用法示例:
HbaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("tablename")
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|返回值 | 函数 | 描述 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|void | addColumn(String tableName,HColumnDescriptor column) | 向一个已经存在的表添加列 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | checkHBaseAvailable(HBaseConfiguration conf) | 静态函数,查看HBase是否处于运行状态| |
| |--------------------------------------------------------|-----------------------------------------------------|
| | create Table(HTableDescriptor desc) | 创建一个表,同步操作 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | delete Table(byte[] tableName) | 删除一个已经存在的表 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | enable Table(byte[] tableName) | 是表处于有效状态 |
| |--------------------------------------------------------|-----------------------------------------------------|
| | disable Table(byte[] tableName) | 使表处于无效状态 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|HTableDescription[] | list Tables() | 列出所有用户控件表项 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|void | modify Table(byte[] tableName,HTableDescriptor htd) | 修改表的模式,是异步的操作,可能需要花费一定的时间 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
|boolean | tableExists(String tableName) | 检查表是否存在 |
|------------------------|--------------------------------------------------------|-----------------------------------------------------|
HTableDescriptor
包名:org.apache.hadoop.hbase.HTableDescriptor
作用:包含了表的名字极其对应表的列族
用法示例:
HTableDescriptor htd = new HTableDescrptor(table);
htd.addFamily(new HcolumnDescriptor("family"));
|------------------|-----------------------------------|--------------|
|返回值 | 函数 | 描述 |
|------------------|-----------------------------------|--------------|
|void | addFamily(HColumnDescriptor) | 添加一个列族 |
|------------------|-----------------------------------|--------------|
|HColumnDescriptor | removeFamily(byte[] column) | 移除一个列族 |
|------------------|-----------------------------------|--------------|
|byte[] | getName() | 获取表的名字 |
|------------------|-----------------------------------|--------------|
|byte[] | getValue(byte[] key) | 获取属性的值 |
|------------------|-----------------------------------|--------------|
|void | setValue(String key,String value) | 设置属性的值 |
|------------------|-----------------------------------|--------------|
HColumnDescriptor
包名:org.apache.hadoop.hbase.HColumnDescriptor
作用:维护着关于列族的信息,例如版本号,压缩设置等。它通常在创建表或者为表添加列族的时候使用。列族被创建后不能直接修改,只能通过删除然后重新创建的方式。列族被删除的时候,列族里面的数据也会同时被删除。
用法示例:
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
|-------|-----------------------------------|------------------------|
|返回值 | 函数 | 描述 |
|-------|-----------------------------------|------------------------|
|byte[] | getName() | 获取列族的名字 |
|-------|-----------------------------------|------------------------|
|byte[] | getValue(byte[] key) | 获取对应的属性的值 |
|-------|-----------------------------------|------------------------|
|void | setValue(String key,String value)| 设置对应属性的值 |
|-------|-----------------------------------|------------------------|
HTable
包名:org.apache.hadoop.hbase.client.HTable
作用:可以用来和HBase表直接通信。此方法对于更新操作来说是非线程安全的
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
ResultScanner scanner = table.getScanner(family);
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|返回值 | 函数 | 描述 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | checkAdnPut(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put)| 自动的检查row/family/qualifier是否与给定值匹配 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | close() | 释放所有的资源或挂起内部缓冲中的更新 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|Boolean | exists(Get get) | 检查Get实例所指定的值是否存在于HTable的列中 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|Result | get(Get get) | 获取指定行的某些单元格所对应的值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|byte[][] | getEndKeys() | 获取当前一打开的表每个区域的结束键值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|ResultScanner | getScanner(byte[] family) | 获取当前给定列族的scanner实例 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|HtableDescriptor | getTableDescriptor() | 获取当前表的HTableDescriptor实例 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|byte[] | getTableName() | 获取表名 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|static boolean | isTableEnabled(HBaseConfiguration conf,String tableName) | 检查表是否有效 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
|void | put(Put put) | 向表中添加值 |
|------------------|----------------------------------------------------------------------------|--------------------------------------------------|
Put
包名:org.apache.hadoop.hbase.client.Put
作用:用来对单个行执行添加操作
用法示例:
Htable table = new HTable(conf.Bytes.toBytes(tablename));
Put p = new Put(brow);//为指定行创建一个Put操作
p.add(family.qualifier.value);
table.put(p);
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|返回值 | 函数 | 描述 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | checkAndPut(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put) |自动的检查row/family/qualifier是否与给定的值匹配 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | close() |释放所有的资源或挂起内部缓冲区中的更新 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|Boolean | exists(Get get) exists(Get get) |检查Get实例所指定的值是否存在于HTable的列中 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|Result | get(Get get) |获取指定行的某些单元格所对应的值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|byte[][] | getEndKeys() |获取当前一打开的表每个区域的结束键值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|ResultScanner | getScanner(byte[] family) |获取当前给定列族的scanner实例 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|HTableDescriptor | getTableDescriptor() |获取当前表的HTableDescription实例 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|byte[] | getTableName() |获取表名 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|static boolean | is TableEnabled(HBaseConfiguration conf,String tableName) |检查表是否有效 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
|void | put(Put put) |向表中添加值 |
|--------------------|-----------------------------------------------------------------------------------------|-----------------------------------------------------|
Get
包名:org.apache.hadoop.hbase.client.Get
作用:用来获取单个行的相关信息
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
Get g = new Get(Bytes.toBytes(row));
table.get(g);
|--------|-------------------------------------------|-----------------------------------|
|返回值 |函数 | 描述 |
|--------|-------------------------------------------|-----------------------------------|
|Get |addColumn(byte[] family,byte[] qualifier) |获取指定列族和列修饰符对应的列 |
|--------|-------------------------------------------|-----------------------------------|
|Get |addFamily(byte[] family) |通过指定的列族获取对应的所有列 |
|--------|-------------------------------------------|-----------------------------------|
|Get |setTimeRange(long minStamp,long maxStamp) |获取指定取件的列的版本号 |
|--------|-------------------------------------------|-----------------------------------|
|Get |setFilter(Filter filter) |当执行Get操作时设置服务器端的过滤器|
|--------|-------------------------------------------|-----------------------------------|
Result
包名:org.apache.hadoop.hbase.client.Result
作用:存储Get或者Scan操作后获取表的单行值。使用此类提供的方法可以直接获取值或者各种Map结构(key-value对)
|----------------------------|------------------------------------------------|-----------------------------------------------|
|返回值 | 函数 | 描述 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|boolean | containsColumn(byte[] family,byte[] qualifier) | 检查指定的列是否存在 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|NavigableMap<byte[],byte[]> | getFamilyMap(byte[] family) | 获取对应列族所包含的修饰符与值的键值对 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
|byte[] | getValue(byte[] family,byte[] qualifier) | 获取对应列的最新值 |
|----------------------------|------------------------------------------------|-----------------------------------------------|
ResultScanner
包名:org.apache.hadoop.hbase.client.ResultScanner
作用:存储Get或者Scan操作后获取表的单行值。使用此类提供的方法可以直接获取值或者各种Map结构(key-value对)
|--------|---------|----------------------------------|
|返回值 |函数 | 描述 |
|--------|---------|----------------------------------|
|void |close() | 关闭scanner并释放分配给它的资源|
|--------|---------|----------------------------------|
|Result |next() | 获取下一行的值 |
|--------|---------|----------------------------------|
HTablePool
包名:org.apache.hadoop.hbase.client.HTablePool
作用:可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象
说明:
1、HTablePool可以自动创建HTable对象,而且对客户端来说使用上是完全透明的,可以避免多线程间数据并发修改问题。
2、HTablePool中的HTable对象之间是公用Configuration连接的,能够可以减少网络开销。
HTablePool的使用很简单:每次进行操作前,通过HTablePool的getTable方法HTabelPool的putTable方法将HTable对象放回到TablePool中。
------------------------------------------------------------------------------------------------------------------
create 'students_age','f1'
-------------------------创建表程序-----------------------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseConnection{
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
public HbaseConnection(String rootDir,String zkServer,String port){
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir",rootDir);
conf.set("hbase.zookeeper.quorum",zkServer);
conf.set("hbase.zookeeper.property.clientPort",port);
try{
hConn = HConnectionManager.createConnection(conf);
}catch(IOException e){
e.printStackTrace();
}
}
public void createTable(String tableName,List<String> cols){
try{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName))
throw new IOException("table exists");
else{
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(String col : cols){
HColumnDescriptor colDesc = new HColumnDescriptor(col);
colDesc.setCompressionType(Algorithm.GZ);
colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
tableDesc.addFamily(colDesc);
}
admin.createTable(tableDesc);
}
}catch(MasterNotRunningException e){
e.printStackTrace();
}catch(ZooKeeperConnectionException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
public void saveData(String tableName,List<Put> puts){
try{
HTableInterface table = hConn.getTable(tableName);
table.put(puts);
table.setAutoFlush(false);
table.flushCommits();
}catch(IOException e){
e.printStackTrace();
}
}
public Result getData(String tableName,String rowkey){
try{
HTableInterface table = hConn.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch(IOException e){
e.printStackTrace();
}
return null;
}
public void format(Result result){
String rowkey = Bytes.toString(result.getRow());
KeyValue[] kvs = result.raw();
for(KeyValue kv: kvs){
String family = Bytes.toString(kv.getFamily());
String qualifier = Bytes.toString(kv.getQualifier());
System.out.println("rowkey->" + rowkey + "family->" + family + "qualifier->" + qualifier);
}
}
public void hbaseScan(String tableName){
Scan scan = new Scan();
scan.setCaching(1000);
try{
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res)
}
}catch(IOException e){
e.printStackTrace();
}
}
public void filterTest(String tableName){
Scan scan = new Scan():
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
///new BinaryComparator(Bytes.tobytes("Jack")));
new RegexStringComparator("J\\w+")));
PageFilter filter = new PageFilter(15);//分页显示
scan.setFilter(filter);
try{
HtableIntegerface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
}
}catch(IOException e){
e.printStackTrace();
}
}
public void pageFilterTest(String tableName){
PageFilter filter = new PageFilter(4);
byte[] lastRow = null;
int pageCount = 0;
try{
HtableInterface table = hConn.getTable(tableName);
while(++pageCount > 0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
scan.setStartRow(lastRow);
}
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
lastRow = res.getRow();
if(++count > 3)
break;
format(res);
}
if(count < 3)
break;
}
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args){
String rootDir = "hdfs://hbase";
String zkServer = "192.168.1.198";
String port = "2181";
HbaseConnection conn = new HbaseConnection(rootDir,zkServer,port);
/* List<String> cols = new LinkedList<>();
cols.add("basicInfo");
cols.add("moreInfo");
conn.createTable("students",cols);*/
List<Put> puts = new LinkedList<Put>();
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("27"));
put1.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11232"));
Put put2 = new Put(Bytes.toBytes("Jim"));
put2.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("28"));
put2.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11233"));
puts.add(put1);
puts.add(put2);
conn.saveData("students",puts);
Result result = conn.getData("students","Tom");
conn.format(result);
conn.hbaseScan("studnets");
conn.filterTest("students");
}
}
-------------------------------------------------------------------------------------------------------------------------
machine
1、公网ip
内网ip
2-4 内网ip
log4j.properties
log4j.rootLogger = info,stdout,hbase
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.hbase = org.apache.log4j.DailyRollingFileAppender
log4j.appender.hbase.File = ./log.log
log4j.appender.hbase.Append = true
log4j.appender.hbase.layout = org.apache.log4j.PatternLayout
log4j.appender.hbase.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%t:%r] - [%p] %m%n
---------------------------------------------------------------------------------------------------------
HBase 扫描器
Scanner扫描器
Hbase在扫描数据的时候,使用scanner表扫描器。
HTable通过一个Scan实例,调用getScanner(scan)来获取扫描器。可以配置扫描起止位,以及其他的过滤条件。
通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂。但是这里有一点可能被忽略的地方,就是返回的scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegionServer。这在网络不是很好的情况下,对性能的影响是很大的。
建议配置扫描器缓存。
扫描器使用
扫描器缓存
hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的而时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:
1)在HBase的conf配置文件中进行配置
2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置
3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。
Hbase过滤器
过滤器简介
1、使用过滤器可以提高操作表的效率,HBase中两种数据读取函数get()和scan()都支持过滤器,支持直接访问和通过指定起止行键来访问,但是缺少细粒度的筛选功能,如基于正则表达式对键或值进行筛选的功能。
2、可以使用预定义好的过滤器或者是实现自定义过滤器
3、过滤器在客户端创建,通过RPC传送到服务器端,在服务器端执行过滤操作,把数据返回给客户端
scan 'studnets',{STARTROW=>'J',ENDROW=>'K'}
过滤器:
1、Comparsion Filter(比较过滤器)
RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter
2、Dedicated Filter(专用过滤器)
SingleColumnValueFilter
SingleColumnValueExcluderFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
TimestampsFilter
RandomRowFilter
3、Decorating Filter(附加过滤器)
SkipFilter
WhileMatchFilters
HBase协处理器
HBase作为列数据库最经常被人的特性包括:
无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。
比如,在旧版本的(<0.92)HBase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效用于数据表的分布式计算。
然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。于是,HBase在0.92以后引入了处理器(coprocessors),
实现了一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器以及访问控制等。
HBase协处理器的灵感来自于JEFFDean 09年的演讲。它根据该演讲实现了类似于bigtable的协处理器,包括以下特性:
1)每个表服务器的任意子表都可以运行代码
2)客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用)
3)提供一个非常灵活的、可用于建立分布式服务的数据模型
4)能够自动化扩展、负载均衡、应用请求路由
HBase的协处理器灵感来自bigtable,但是实现细节不尽相同。HBase建立一个框架,它为用户提供类库和运行时环境,使得他们的代码能够在HBase region server和master上处理。
协处理器分两种类型,系统协处理器可以全局导入region servre上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。
协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。
Observer
观察者的设计意图是允许用户通过代码来重载处理器框架的upcall方法,而具体的事件触发的callback方法有HBase的核心代码来执行。处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。
以Hbase0.92版本为例,它提供了三种观察者接口:
RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
WALObserver:提供WAL相关操作钩子。
MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。
这些接口可以同时使用在同一个地方,按照不同优先级顺序执行,用户可以任意基于协调处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所改动,不同版本的接口改动比较大。
-----------------------------------------------------------------
EndPoint
终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。
用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。终端的使用,如下面流程所示:
定义一个新的protocol接口,必须继承CoprocessorProtocol。
实现终端接口,该实现会被导入region环境执行。
继承抽象类BaseEndpointCoprocessor。
在客户端,终端可以被两个新的Hbase Client API调用。单个region:
HTableInterface.coprocessorProxy(Class<T> protocol,byte[] row).regions区域:HTableInterface.coprocessorExec(Class<T> protocol,byte[] startkey,byte[] endKey,Batch.Call<T,R> callable)
有三个方法对Endpoint进行设置:
A.启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site。xml这个文件来实现,只需要添加如下代码:
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.RowCountEndpoint</value>
</property>
B.启动表aggregation,只对特定的表生效。通过HBase Shell来实现。
(1)disable指定表。
hbase>> disable 'students'
(2)添加aggregation hbase>> alter ‘students’,METHOD =>'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.RowCountEndpoint||'
alter ‘students’,'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||'
协处理器命令:
增加协处理器方法一:alter 'students','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||'
-----------------------------------------------------------------------------------------------------------------------------------------------------
增加协处理器方法二:
package coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
public class RegionObserverTest extends BaseRegionObserver{
private static byte[] fixed_rowkey = Bytes.toBytes("John");
@Override
public void preGet(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, List<KeyValue> result) throws IOException {
// TODO Auto-generated method stub
if(Bytes.equals(get.getRow(), fixed_rowkey)){
KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("time"),Bytes.toBytes("time"),Bytes.toBytes(System.currentTimeMillis()));
result.add(kv);
}
}
}
alter 'students','coprocessor'=>'hdfs://192.168.1.198:8020/coprocessor.jar|coprocessor.RegionObserverTest||'
enable 'students'
java -cp .:/usr/lib/hbase/lib/* HbaseConnection
备注:程序打包,上传到hdfs根目录下,执行alter。
----------------------------------------------------------------------------------------------------------------------------------------------------------
enable 'students'
describe 'students'
删除协处理器
alter 'students',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
sudo jar tf /usr/lib/hbase/lib/hbase-examples-1.0.0-cdh5.5.0.jar | grep RowCountEndpoint
org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.class
hadoop fs -put coprocessor.jar /
javac -cp /home/cloudera/softback/hbase-0.96.2-hadoop2/lib/* HbaseConnection.java
java -cp /usr/lib/hbase/lib/* HbaseConnection
(3)重启指定表hbase > enable 'mytable'
C.API调用
HTableDescriptor htd = new HTableDescriptor("testTable");
htd.setValue("CORPROCESSOR$1",path.toString + "|" + RowCountEndpoint.class.getCanonicalName() + "|" + Coprocessor.Priority.USER);
---------------------------------------------------------
CoprocessorRowCounter.java
public class CoprocessorRowCounter{
public static void main(String[] args) throws IOException{
Configuration conf = HbaseConfiguration.create();
HTable table = new HTable(conf,"students");
final ExampleProtos.CountRequest request = ExampleProcess.CountRequest.getDefaultInstance();
Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,null,null,new Batch.Call<ExampleProtos.RowCountService,Long>(){
public Long call(ExampleProtos.RowCountService counter) throws IOException{
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ExamplerProtos.CountResponse>();
counter.getRowCount(controller,request,rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
if(controller.failedOnException()){
throw controller.getFailedOn();
}
return (response != null && response.hasCount())?response.getCount():0;
}
});
}
}
几点说明:
1、协调处理器配置的加载顺序:先加载配置文件中定义的协处理器,后加载表描述符中的协处理器
2、COPROCESSOR$<number>中的number定义了加载的顺序
3、协处理器配置格式
Coprocess can also be configured to load on a per table basis,via a shell command “alter”
hbase > alter "t1".METHOD => 'table_alt'
'coprocessor1'=>'hdfs://foo.jar|com.foo.FooRegionObserver[100]arg1=1,arg2=2'
RegionObserverTest.java
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
public class RegionObserverTest extends BaseRegionObserver{
}
>>describe 'students'
alter 'students',METHOD=>'table_att_unset',Name=>'coprocessor$1'
scan 'students'
---------------------------------------------------------------------------
MapReduce on HBase
MapReduce on Hbase
1、可以使用Mapreduce的方法操作hbase数据库
2、Hadoop Mapreduce 提供相关API,可以与hbase数据库无缝连接
3、API link: http://hbase.apache.org/devapidocs/index.html
--------------------HbaseMR.java---------------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class HbaseMR {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf,"mapreduce on hbase");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob("students", scan,MyMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("students_age",MyReducer.class, job);
job.waitForCompletion(true);
}
}
class MyMapper extends TableMapper<Text,Text>{
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Context context)
throws IOException, InterruptedException {
Text k = new Text(Bytes.toString(key.get()));
Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"),Bytes.toBytes("age")));
context.write(v, k);
}
}
class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
@Override
protected void reduce(
Text key,
Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
Put put = new Put(Bytes.toBytes(key.toString()));
for(Text value: values){
put.add(Bytes.toBytes("f1"),Bytes.toBytes(value.toString()),Bytes.toBytes(value.toString()));
}
context.write(null, put);
}
}
------------------------------------------------------------------------------------------------------
数据存储格式
B+树
特点:
1、有n棵子数的结点中含有n个关键字,每个关键字不保存数据,只用来索引,所有数据都保存在叶子节点。
2、所有的叶子结点中包含了全部关键字的信息,及指向含这些关键字记录的指针,且叶子结点本身依关键字的大小自小而大顺序链接。
3、所有的非终端结点可以看成是索引部分,结点中仅含其子树(根结点)中的最大(或最小)关键字。
HFile格式
1、数据块-保存表中的数据,每一个数据块由块头和一些keyValue(record)组成,key的值是严格按照顺序存储的。块大小默认为64K(由建表时创建cf时指定或者HColumnDescriptor.setBlockSize(size)),
这一部分可以压缩存储。在查询数据时,是以数据块为单位从硬盘load到内存。查找数据时,是顺序的遍历该块中的keyValue对。
2、元数据块(可选的)-保存用户自定义的kv对,可以被压缩。比如booleanfilter就是存在元数据块中的,该块只保留value值,key值保存在元数据索引块中。每一个元数据块由块头和value值组成。可以
快速判断key是都在这个HFile中。
3、File Info-Hfile的元信息,不被压缩,用户也可以在这一部分添加自己的元信息。
4、数据索引块-Data Block的索引,每条索引的key是被索引的block的第一条记录的key(格式为:头信息,数据块offset数据块大小块第一个记录的key,.....)
----------------------------------------------------------------------------------------------------
WAL机制
WAL预写日志
1)Client向RegionServer端提交数据的时候,会优先写WAL日志(HLog),只有当WAL日志写成功以后,Client才会被告诉提交数据成功,如果写WAL失败会告知客户端提交失败
2)一个RegionServer上所有的Region都共享一个HLog,一次数据的提交是先写WAL,再写memstore
HLog类
实现了WAL的类叫做HLog。当HRegion被实例化时,HLog实例会被当做一个餐所以和到HRegion的构造器中。当一个Region接收到一个更新操作时,它可以直接把数据保存到一个共享的WAL实例中去。
HLogKey类
1、当前的WAL使用的是Hadoop的SequenceFile格式,其key是HLogKey实例。HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括sequence number和timestamp,timestamp是“写入时间”,
sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
2、HLog Sequence File的Value是Hbase的KeyValue对象,即对应HFile中的KeyValue
WALEdit类
客户端发送的每个修改都会封装成WALEdit类,一个WALEdit类包含了多个更新操作,可以说一个WALEdit就是一个原子操作,包含若干个操作的集合。
LogSyncer类
Table在创建的时候,有一个参数可以设置,是否每次写Log日志都需要往集群里的其他机器同步一次,默认是每次都同步,同步的开销是比较大的,但不及时同步又可能因为机器宕而丢日志。
同步的操作现在是通过Pipeline的方式来实现的,Pipeline是指datanode接受数据后,再传给另外一台datanode,是一种串行的方式;n-Way Wirtess是指多datanode同时接受数据,最慢的一台结束就是整个结束。
差别在于一个延迟大,一个并发高,hdfs现在正在开发中,以便可以选择是按Pipeline还是n-Way Write来实现写操作。
Table如果设置每次不同步,则写操作会被RegionServer缓存,并启动一个LogSyncer线程来定时同步日志,定时时间默认是一秒也是由hbase.regionserver.optionallogflushinterval设置。
LogRoller类
1、日志写入的大小是有限制的。LogRoller类会作为一个后台线程运行,在特定的时间间隔内滚动日志。通过hbase.regionserver.logroll.period属性控制,默认1小时。
---------------------------------------------------------
Hbase在线数据备份
Hbase Replication
HBase复制是一种不同HBase部署中复制数据的方法。它可以作为一种故障恢复的方法,并提供HBase层次的高可用性。
HBase复制中最基本的架构模式是“主推送”(master-push),因为每个region server都有自己的WAL(或HLog),所以很容易保存现在正在复制的位置。正如众所周知的解决方案-MySql的主/从复制,只使用二进制
文件来跟踪修改。一个主集群可以将数据复制到任意数目的从集群,每个region server都会参与复制自己的修改。
来自每个region server的HLog是Hbase复制的基础,并且只要它们需要将数据复制到从集群,它们就必须被保存到HDFS上。每个region server从它需要的最老的日志开始复制,同时在zookeeper中保存当前恢复的位置
来简化错误恢复。每个从集群恢复的位置可能不同,但它们处理的HLog队列内容是相同的。参与复制的集群的规模可以不对等。主集群会通过随机分配尽量均衡从集群的负载。
解决问题:
数据管理人员的失误,不可逆的DDL操作。
底层HDFS文件BLOCK块corruption
短时间过度的读数据对集群造成的压力,增加服务器应对这种情况比较浪费资源
系统升级,维护,诊断问题会造成集群不可用时间增长。
双写的原子性难以保证
不可预计的一些原因。(如机房断电,大规模硬件损坏,断网等)
离线应用的MR计算对在线读写造成的较大的延迟影响
对于数据中心的数据冗余的备份方案,目前从一致性,事务性,延迟,吞吐量,数据损失,Failover几个角度来分析有一下几种方案。
简单备份模式通过定时不定时的Dump出集群数据保证数据的安全性,通常可以通过snapshot或设置时间戳来dump数据来实现这种方案,如果方案简介,设计优雅可以做到对在线数据中心低于干扰或无干扰的数据备份。但这
中方案缺点也是显而易见的,只是对时间点之前的数据安全性得到保障,如果发生突发事件会导致不可避免的整段时间的数据丢失,为很多人无法接受。
主从模式(Maser-Slave)这种模式比起简单的备份模式多了很多优点,可以通过最终一致性保证数据的一致,数据从主集群到备集群延时较低,异步写入不会对主集群带来性能压力,基本不会产生多少性能的影响,突发
事件来临时数据丢失很少,并且主集群的事务在备集群也可以得以保证。一般通过构造较好的Log系统加上check Point来实现,可以实现读写分离,主集群可以担当读写服务,但备集群一般只承担读服务。
主主模式(Master-Master)原理总体类似于主从模式,不同的是2个集群可以互相承担写的分离,都可承担读写服务。
2阶段提交这种方案保证了强一致性和事务,服务器返回给客户端成功则表明数据一定已经成功备份,不会造成任何数据丢失。每台服务器都可以承担读写服务。但缺点是造成集群延迟较高,总体吞吐下降。
Paxos算法基于Paxos算法的实现的强一致性方案,同一客户端连接的server能保证数据的一致性。缺点是实现复杂,集群延迟和吞吐随着集群服务器增加而边差。
部署步骤
1)编辑集群中所有机器的${HBASE_HOME}/conf/hbase-site.xml文件,增加如下配置:
<property>
<name>hbase.replication</name>
<value>true</value>
</property>
修改完成后,重启HBase集群,使配置生效
2)在HBase shell中运行如下命令:
>>add_peer 'ID''CLUSTER_KEY'
>>start_replication
第一条命令是为从集群设置zookeeper集群信息,这样可以使得修改被同步到从集群上。第二条命令真正将修改过的记录发布到从集群上。为了保证工作能按照预期进行,用户必须保证已经在从集群上建立了一个相同的表的副本,
表可以为空,但必须有相同的模式和表名。
注意:hbase-0.96和hbase-0.98已经没有start_replication命令和stop_replication命令。hbase-0.98相较hbase-0.96,新增了set_peer_tableCFs、show_peer_tablesCFs命令。在设定复制时,hbase-0.98需要使用set_peer_tableCFs设置。
具体的有对应的帮助命令可供参考。
ID必须是一个短整数,CLUSTER_KEY的内容请参考以下模板:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
比如,zk.server.com:2181:/hbase
注意:如果两个集群使用相同的zookeeper集群,你不得不使用不同的zookeeper.znode.parent,因为它们不能写入相同的文件夹中。
3)一旦你有一个对等(从)集群,你需要在你的列簇上使复制可用,要想达到这样的效果,可以在Hbase shell中执行如下命令:
disable ‘you——table’
alter ‘your-table’,{NAME =>'family_name',replication_scope=>'1'}
enable 'your_table'
scope值为0(默认值)意味着它不会被复制,而scope值为1意味着它将被复制.
4)运行如下命令可以列出所有配置的对等(从)集群:
》》list_peers
5)运行如下命令将使对等(从)集群不可用:
》》disable_peer 'ID'
运行完命令后,HBase将停止将向对等(从)集群发送修改,但是它将一直跟踪所有新的WALs文件,以便当从集群可用时继续复制。
6)可以运行如下命令将使之前设置为了不可用的对等(从)集群可用:
>>enable_peer'ID'
7)运行下面的命令,可以移除一个从集群:
stop_replication
remove_peer 'ID'
需要注意的是,停止复制仍会完成所有已在队列里的修改的复制,但是之后所有的处理都被停止了。为了确认你的配置都正常工作,你可以查看任何一个regionserver的日志文件,看是否由类似下面几行的内容:
Considering 1 rs,with ratio 0.1
Getting 1 rs from peer cluster#0
Choosing peer 10.10.1.49:62020
Hbase集群数据迁移
1)静态迁移方案
Hadoop distcp
在hbase停止的状态下进行数据的迁移
distcp + add_table.rb
------------------------------------------------------
2)动态迁移方案
-Replication备份方案
-CopyTable方案
-Export and Import方案
动态迁移方案
1、Replication备份方案
修改hbase-site.xml配置,增加hbase.replication属性
增加表属性REPLICATION_SCOPE属性
add_peer
2、CopyTable方案
命令:./hbase org.apache.hadoop.hbase.mapreduce.CopyTable--peer.adr=new cluster ip:2181:/hbase_table
说明:
(1)拷贝完成,不需要重启机器,在new cluster中就可以看到该表
(2)稳定性还需要考虑
3、Export and Import方案
步骤:(1)在old cluster上执行:./hbase
org.apache.hadoop.hbase.mapreduce.Export test hdfs://new clusterip:9000/xxx
(2)在new cluster上执行:./hbase
org.apache.hadoop.hbase.mapreduce.Import test hdfs://new clusterip:9000/xxx
说明:(1)一定要写全路径,不能写相对路径
(2)在Import前,需要将表事先在new cluster中创建好
-----------------------------------------------------------------------------------
3)手动方式
把hbase元数据表打包 tar zcvf ...
hadoop fs -put studnets /hbase/data/default
1、从源hbase集群中复制出HBase数据库表到本地目录
hadoop fs -get
2、目标HBase导入
hadoop fs -put
3、修复.META表
hbase hbck -fixMeta
4、重新分配数据到各RegionServer
hbase hbck -fixAssignments
---------------------------------------------------------------------------------
数据导入方案
1、利用ImportTsv将csv文件导入到HBase 备注:表要前创建
2、利用completebulkload将数据导入到HBase
3、利用Import将数据导入到HBase
数据导入方案一
利用ImportTsv将csv文件导入到HBase
命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv-Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-001 /simple.csv
simple.csv内容如下:
1、“tom”
2、“sam”
3、“jerry”
4、“marry”
5、“john”
-----------------------------------------------------------------------------
数据导入方案二
HBase支持bulkload的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合
mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到HBase的方式来代替之前直接调用HTableOutputFormat的方法有如下好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
利用completebulkload将数据导入到HBase
1)先通过ImportTsv生成HFile
命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-001 /simple.csv
2)通过completebulkload将数据导入表hbase-tbl-002
命令:hadoop jar lib/hbase-server-0.96.0.jar completebulkload /hfile_tmp hbase-tbl-002
利用Import将数据导入到HBase
1)HBase export工具导出的数据格式是sequence file。比如,在执行完命令“bin/hbase org.apache.hadoop.hbase.mapreduce.Export hbase-tbl-002/test-output”后,hbase会启动一个MapReduce作业,作业
完成后会在hdfs上面会生成sequence file格式的数据文件
2)对于这类Sequence file格式的数据文件,HBase是可以通过Import工具直接将它导入到HBase的表里面的。执行命令“bin/hbase org.apache.hadoop.hbase.mapreduce.Import hbase-tbl-003 /test-output”,
随后hbase会启动一个MapReduce作业,然后表test会成功导入数据。
---------------------------------------------------------------------------
Hbase二级索引
MapReduce方案
IndexBuilder:利用MR的方式构建Index
优点:并发批量构建Index
缺点:不能实时构建Index
举例:
原表:row 1 f1:name zhangsan
row 2 f1:name lisi
row 3 f1:name wangwu
索引表:row zhangsan f1:id 1
row lisi f2:id 2
row wangwu f3:id 3
create 'student','f1'
put 'student','1','f1:name','lisi'
put 'student','2','f1:name','zhangsan'
put 'student','3','f1:name','wangwu'
create 'student-name','f1'
程序放到/usr/lib/hbase/lib/目录下
>>hbase com.hbase.IndexBuilder student f1 name
给表student 添加二级索引 studnet-name
示例如下:
-----------------------------------------IndexBuilder.java----------------------------------------------------------------------------
package com.hbase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import javax.ws.rs.PUT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class IndexBuilder {
static class MyMapper extends TableMapper<ImmutableBytesWritable,Put>{
private HashMap<byte[], ImmutableBytesWritable> indexes = new HashMap<byte[],ImmutableBytesWritable>();
private String familyName;
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Context context)
throws IOException, InterruptedException {
Set<byte[]> keys = indexes.keySet();
for(byte[] k:keys){
ImmutableBytesWritable indexTableName = indexes.get(k);
byte[] val = value.getValue(Bytes.toBytes(familyName),k);
if(val !=null){
Put put = new Put(val);
put.add(Bytes.toBytes("f1"),Bytes.toBytes("id"),key.get());
context.write(indexTableName, put);
}
}
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String tableName = conf.get("tableName");
String familyName = conf.get("familyName");
String[] qualifiers = conf.getStrings("qualifiers");
for(String q: qualifiers){
indexes.put(Bytes.toBytes(q),new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + q)));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length < 3){
System.exit(-1);
}
String tableName = args[0];
String columnFamily = args[1];
conf.set("tableName", tableName);
conf.set("columnFamily", columnFamily);
String[] qualifiers = new String[otherArgs.length -2];
for(int i =0; i < qualifiers.length;i++){
qualifiers[i] = otherArgs[i+2];
}
conf.setStrings("qualifiers", qualifiers);
Job job = new Job(conf,tableName);
job.setJarByClass(IndexBuilder.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
Scan scan = new Scan();
scan.setCaching(1000);
TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, PUT.class, job);
job.waitForCompletion(true);
}
}
------------------------------Hbase二级索引------------------------------------------------------------------------------------------------
1、Mapreduce方案
2、ITHBASE方案
3、IHBASE方案
4、Coprocessor方案
5、Solr + hbase方案
IHBASE方案
优点:IHBase(Indexed HBase)是HBASE的一个扩展,用于支持更快的扫描。
缺点:需要重构hbase
原理:在Memstore满了以后刷磁盘时,IHBase会进行拦截请求并为这个memstore的数据构建索引,索引另一个CF的方式存储在表内。scan的时候,IHBase会结合索引列中的标记,来加速scan
ITHBASE方案
优点:ITHBASE(Indexed Transactional HBase)是HBase的一个事务型的带索引的扩展
缺点:需要重构hbase,几年没有更新
Coprocessor方案
HIndex-来自华为的HBase二级索引
Solr方案
Solr是一个独立的企业级搜索应用服务器,它对外提供类似于Web-service的API接口。用户可以通过http请求,向搜索引擎服务器提交一定格式的XML文件,生成索引;也可以通过Http Get操作
提出查找请求,并得到XML格式的返回结果。
Solr是一个高性能,采用Java5开发,基于Lucene的全文搜索服务器。同时对其进行扩展,提供了比Lucene更为丰富的查询语言,同时实现了可配置、可扩展并对查询性能进行了优化,并且提供了一个
完善的功能管理界面,是一款非常优秀的全文搜索引擎。
HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力
基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询快速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBase中通过
指定rowkey进行查询。
--------------------------------------------------------------------------------------------------------
Hbase快照技术
Snapshot(快照)
什么是快照
快照就是一份元信息的集合,允许管理员恢复到表的先前状态。快照不是表的复制而是一个文件名称列表,因而不会复制数据。
完全快照恢复是指恢复到之前的“表结构”以及当时的数据,快照之后发生的数据不会恢复。
快照的作用
hbase中存在的备份或克隆表的方法就是使用复制、导出表或者在关闭表之后拷贝HDFS中所有的hfile。复制/导出是通过一系列工具调用MapReduce来扫描并复制表,这样会对RegionServer有直接的影响。
关闭表会停止所有的读写操作,实际环境中往往无法接受。
相比之下Hbase快照允许管理员不拷贝数据直接克隆一张表,这对域服务器产生的影响最小。将快照导出至其他集群不会直接影响到任何域服务器;导出只是带有一些额外逻辑的群间数据同步。
快照使用场景
从用户、应用异常中还原
从一个已知的安全状态恢复、还原
查看之前的快照并有选择性的合并不同写入产品环境
当主应用程序升级或改版时保存快照
在指定时间审查和、或报告数据
按照规定捕获月度数据
生成日终、月末、季末报告
应用测试
通过快照模拟生产环境下结构或应用发生的变化,测试完成即可丢弃
例如:生成快照,利用快照中内容构建新表(原有结构+数据)并且修改新表的结构,添加或删除列之类。(原始表、快照和新表保持互相独立)
减少工作压力
生成快照,导入到其他集群,然后运行MapReduce jobs。因为导出的快照是HDFS级别,所以不会像复制表那样降低HBase主集群的效率。
快照操作
生成快照:本操作尝试对指定表生成快照。如果集群在执行数据均衡、分隔或合并等操作是,可能会引起操作失败。
克隆快照:本操作使用与指定快照相同的结构数据构建一张新表。操作结果会生成一张有完整功能的表,对该表的任意修改不会对原表或快照产生影响。
还原快照:本操作将表结构和数据恢复到生成快照时的状态。(注意:本操作会舍弃快照生成后任何改变)。
删除快照:本操作将系统中的快照删除,释放未共享的磁盘空间,而且不会影响其他克隆或快照。
导出快照:本操作将快照数据和元数据复制到其他集群。操作只会涉及HDFS,不会与Master或域服务器产生任何联系,因此HBase集群可以关闭。
快照优势
导出快照与复制、导出表除了更好的保持一致性外,主要的不同在于导出快照是在HDFS的层面操作的。
这意味着Master和域服务器与操作无关。因此不需要为不必要的数据创建缓存空间,也不会有扫描过程因为大量对象创建引起的GC暂停。对于HBase来说主要性能影响就是DataNode额外的网络和磁盘负载。
snapshot 'students','students_snapshot'
list_snapshots
clone_snapshot 'students_snapshot','students_new'
数据导入方案三
通过检查hbase-site.xml的hbase.snapshot.enable是否设置为true确认打开了快照许可。
1、获取指定表的快照使用snapshot命令(不产生文件复制)
snapshot 'tableName','snapshotName'
2、列出所有的快照,使用list_snapshot命令。会展示出快照名称,源表,以及创建日期和时间
list_snapshots
3、删除快照使用deleted_snapshot命令。删除快照不会影响到克隆表或者之后生成的快照。
delete_snapshot 'snapshotName'
4、使用clone_snapshot命令从指定的快照生成新表(克隆)。由于不会产生数据复制,所以最终用到的数据不会是之前的两倍。
clone_snapshot 'snapshotName','newTableName'
5、使用restore_snapshct命令指定快照内容替换当前表结构/数据。
disable 'students'
restore_snapshot 'snapshotName'
enable 'students'
6、使用ExportSnapshot工具将现有快照导出至其他集群。导出工具不会影响到域服务器负载,只是工作在HDFS层面所以需要指定HDFS路径(其他集群的hbase根目录)
hbase org.apache.hadoop.hbase.snapshot ExportSnapshot -snapshot
SnapshotName -copy -to hdfs://srv2:8020/hbase
delete 'students','Tom','moreInfo:tel'
-----------------------------------------------------------------------
Hbase BloomFilter(布隆过滤器)
集合表示和元素查询
下面我们具体来看BloomFilter是如何用位数组表示集合的。初始状态时,BloomFilter是一个包含m位的位数组,每一位都置为0
为了表达S=(x1,x2,x3,。。。xn)这样一个n个元素的集合,BloomFilter使用k个相互独立的哈希函数(HashFunction),它们分别将集合中的每个元素映射到(1,。。。m)的范围中。对任意一个元素x,
第i个哈希函数映射的位置h1(x)就会被置为1(1<=i<=k).注意,如果一个位置多次被置为1,那么只有第一次会起作用,后面几次将没有任何效果。在下图中,k=3,且有两个和希函数选中同一个位置(从左边数第五位)
在判断y是否属于这个集合时,我们对y应用k次哈希函数,如果所有hi(y)的位置都是1(1<=i<=k),那么我们就认为y是集合中的元素,否则就认为y不是集合中的元素,下图中y1就不是集合中的元素。y2或者属于这个集合,
或者刚好是一个false positive。
Bloomfilter在HBase中的作用?
HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化)
Bloomfilter在Hbase中的开销?
Bloomfilter是一个列族cf级别的配置属性,如果在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MeataBlock;MetaBlock与DataBlock(真实的KeyValue数据)
一起由LRUBlockCache维护。所以开启bloomfilter会有一定的存储及内存cache开销。
HBase中的Bloomfilter的类型及使用?
a)ROW,根据KeyValue中的row来过滤storefile
举例:假设有2个storefile文件sf1和sf2
sf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v)
sf2包含kv3(r3 cf:q1 v)、kv4(r4 cf:q1 v)
如果设置了CF属性中的bloomfilter为ROW,那么get(r1)时就会过滤sf2,get(r3)就会过滤sf1
b)ROWCOL,根据KeyValue中的row+qualifier来过滤storefile
举例:假设有2个storefile文件sf1和sf2,
sf1包含kv1(r1 cf:q1 v)、kv2(r2 cf:q1 v)
sf2包含kv3(r1 cf:q2 v)、kv4(r2 cf:q2 v)
如果设置了CF属性中的bloomfilter为ROW,无论get(r1,q1)还是get(r1,q2),都会读取sf1+sf2;而如果设置了CF属性中的bloomfilter为ROWCOL,那么get(r1,q1)就会过滤sf2,get(r1,q2)就会过滤sf1
ROWCOL和ROW对比
a)ROWCOL只对指定列(Qualifier)的随机读(Get)有效,如果应用中的随机读get,只含row,而没有指定读哪个qualifier,那么设置ROWCOL是没有效果的,这种场景就应该使用ROW
b)如果随机读中指定的列(Qualifier)的数目大于等于2,在0.90版本中ROWCOL是无效的,而在0.92版本以后,HBase-2794对这一情景作了优化,是有效的(通过KeyValueScanner#seekExactly)
c)如果同一row多个列的数据在应用上是同一时间put的,那么ROW与ROWCOL的效果近似相同,而ROWCOL只对指定了列的随机读才会有效,所以设置为ROW更佳
d)ROWCOL与ROW只在名称上有联系,ROWCOL并不是ROW的扩展,不能取代ROW
--------------------------------------------------------------------------------------------
基于Solr的实时查询
实时查询方案
hbase+solr+hbase indexer
1)hbase提供海量数据存储
2)solr提供索引构建与查询
3)hbase indexer提供自动化索引构建(从hbase到solr)
----------------------------------------------------------------------------
部署与安装
CDH5.x发行版中提供了hbase、solr以及hbase indexer的下载
实验选取CDH5.0.2版本,选择需下载以下三个:
hbase-0.96.1.1-cdh5.0.2.tar.gz
hbase-solr-1.3-cdh5.0.2.tar.gz
solr-4.4.0-cdh5.0.2.tar.gz
其中hbase-solr-xxx,来源NGDATA的开源项目,使用及说明参考
hbase->hbase inexder -> solr
---------------------------------------------------------------------------------
Solr查询
利用Solr操作solr API,使用SolrJ操作Solr会比利用httpClient来操作Solr要简单
查询使用的类:
1、HttpSolrServer
2、SolrQuery
192.168.1.198:8983/solr/select?q=1astname_s:Louis
Main.java
public class Main{
public static void main(String[] args){
String url = "http://192.168.1.198:8983/solr";
HttpSolrServer serve = new HttpSolrServer(url);
server.setConnectionTimeout(1000);
server.setSoTimeout(3000);
server.setDefaultMaxConnectionPerHost(1000);
server.setFollowRedirects(false);
server.setAllowCompression(true);
server.setMaxRetries(1);
SolrQuery query = new SolrQuery();
query.setQuery("firstname s:Jim");
try{
QueryResponse resp = server.query(query);
List<Item> item = resp.getBeans(Item.class);
for(Item item: items){
System.out.println(item.id);
conn.format(conn.getData("indexdemo-user",item.id));
}
}catch(SolrServerException e){
e.printStackTrace();
}
}
}
Item.java
public class Item{
@Field
String id;
@Field
String firstname_s;
@Field
String lastname_s;
}
---------------------------------------------------------------------------------------------------
HBaseConnection.java
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseConnection{
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
public HbaseConnection(String rootDir,String zkServer,String port){
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir",rootDir);
conf.set("hbase.zookeeper.quorum",zkServer);
conf.set("hbase.zookeeper.property.clientPort",port);
try{
hConn = HConnectionManager.createConnection(conf);
}catch(IOException e){
e.printStackTrace();
}
}
public void createTable(String tableName,List<String> cols){
try{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName))
throw new IOException("table exists");
else{
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(String col : cols){
HColumnDescriptor colDesc = new HColumnDescriptor(col);
colDesc.setCompressionType(Algorithm.GZ);
colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
tableDesc.addFamily(colDesc);
}
admin.createTable(tableDesc);
}
}catch(MasterNotRunningException e){
e.printStackTrace();
}catch(ZooKeeperConnectionException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
public void saveData(String tableName,List<Put> puts){
try{
HTableInterface table = hConn.getTable(tableName);
table.put(puts);
table.setAutoFlush(false);
table.flushCommits();
}catch(IOException e){
e.printStackTrace();
}
}
public Result getData(String tableName,String rowkey){
try{
HTableInterface table = hConn.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowkey));
return table.get(get);
}catch(IOException e){
e.printStackTrace();
}
return null;
}
public void format(Result result){
String rowkey = Bytes.toString(result.getRow());
KeyValue[] kvs = result.raw();
for(KeyValue kv: kvs){
String family = Bytes.toString(kv.getFamily());
String qualifier = Bytes.toString(kv.getQualifier());
System.out.println("rowkey->" + rowkey + "family->" + family + "qualifier->" + qualifier);
}
}
public void hbaseScan(String tableName){
Scan scan = new Scan();
scan.setCaching(1000);
try{
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res)
}
}catch(IOException e){
e.printStackTrace();
}
}
public void filterTest(String tableName){
Scan scan = new Scan():
scan.setCaching(1000);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
///new BinaryComparator(Bytes.tobytes("Jack")));
new RegexStringComparator("J\\w+")));
PageFilter filter = new PageFilter(15);//分页显示
scan.setFilter(filter);
try{
HtableIntegerface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
}
}catch(IOException e){
e.printStackTrace();
}
}
public void pageFilterTest(String tableName){
PageFilter filter = new PageFilter(4);
byte[] lastRow = null;
int pageCount = 0;
try{
HtableInterface table = hConn.getTable(tableName);
while(++pageCount > 0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
scan.setStartRow(lastRow);
}
ResultScanner scanner = table.getScanner(scan);
for(Result res: scanner){
format(res);
lastRow = res.getRow();
if(++count > 3)
break;
format(res);
}
if(count < 3)
break;
}
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args){
String rootDir = "hdfs://hbase";
String zkServer = "192.168.1.198";
String port = "2181";
HbaseConnection conn = new HbaseConnection(rootDir,zkServer,port);
/* List<String> cols = new LinkedList<>();
cols.add("basicInfo");
cols.add("moreInfo");
conn.createTable("students",cols);*/
List<Put> puts = new LinkedList<Put>();
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("27"));
put1.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11232"));
Put put2 = new Put(Bytes.toBytes("Jim"));
put2.add(Bytes.toBytes("basicInfo"),Bytes.toBytes("age"),Bytes.toBytes("28"));
put2.add(Bytes.toBytes("moreInfo"),Bytes.toBytes("tel"),Bytes.toBytes("11233"));
puts.add(put1);
puts.add(put2);
conn.saveData("students",puts);
Result result = conn.getData("students","Tom");
conn.format(result);
conn.hbaseScan("studnets");
conn.filterTest("students");
}
}
备注:因为solr和zookpeer有问题,实例没有通过,有时间在调整。
--------------------------------------------------------------------------------------------
更多推荐
已为社区贡献1条内容
所有评论(0)