dsg_10 Hbase
HbaseHBase的介绍HBase的特征HBase的存储机制HBase的搭建HBase的介绍HBase是基于hadoop的数据库,分布式可伸缩(可任意增加或减少节点)大型数据存储,随机、实时读写数据,可以存储十亿行,百万列,版本化、非关系型,面向列的数据库。HBase的特征线性模块化扩展方式。严格一致性读写自动可配置表切割区域服务器之间自动容灾HBase支持Hadoop MR作业易于使用的Jav
Hbase
HBase的介绍
HBase是基于hadoop的数据库,分布式可伸缩(可任意增加或减少节点)大型数据存储,随机、实时读写数据,可以存储十亿行,百万列,版本化、非关系型,面向列(列不固定)的数据库,非关系型数据库还有Redis和mongodb,存储形式都是kv对。
HBase的特征
- 线性模块化扩展方式。
- 严格一致性读写
- 自动可配置表切割
- 区域服务器之间自动容灾
- HBase支持Hadoop MR作业
- 易于使用的Java API
- 块缓存和布隆过滤器用于实时查询
- 通过服务器端过滤器实现查询预测
- 支持XML, Protobuf, and binary data等串行化数据技术
- 可扩展的shell
- 可视化
HBase的存储机制
面向列存储 ,table是按行排序,HBase的存储结构如下:
因此定位一个数据需要该数据的行号,列族和列,版本号
HBase的搭建
HBase有两种节点,master(管理节点)和RegionServer(区域服务器)
这里我选择Centos100作为master,Centos101和Centos102作为RS(一般来说应该选择奇数个服务器作为RS,但是本人电脑资源有限只能选择两台)
-
安装jdk
-
安装hadoop
-
安装HBase
#解压 tar -zvxf hbase-2.0.0-bin.tar.gz -C /soft/ #分发 xsync.sh hbase-2.0.0 #配置环境变量 export HBASE_HOME=/soft/hbase-2.0.0 export PATH=$PATH:$HBASE_HOME/bin source /etc/profile #验证环境变量是否配置成功 hbase version
-
修改hbase-2.0.0/conf/hbase-env.sh,然后分发
#修改JAVA_HOME路径 export JAVA_HOME=/soft/jdk1.8.0_65/ #修改zk管理,hbase自己已经集成了zk,如果我们想用自己搭建的zk去管理Hbase,就要把下面的属性设置成false export HBASE_MANAGES_ZK=false
-
HBase本地模式
hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性<property> <name>hbase.rootdir</name> <value>file:/home/hadoop/HBase/HFiles</value> </property>
-
HBase伪分布式
hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性<!--启用分布式--> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:8030/hbase</value> </property>
-
HBase完全分布式
hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性,配置完后分发<!--启用分布式--> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 指定hbase数据在hdfs上的存放路径,注意这里如果配置的是hadoopHA高可用的dfs.nameservices属性的值时需要将hadoop中的hdfs-sit.xml和core-site.xml复制到hbase-2.0.0/conf/路径下,否则不识别--> <property> <name>hbase.rootdir</name> <value>hdfs://mycluster/hbase</value> </property> <!-- 配置zk地址 --> <property> <name>hbase.zookeeper.quorum</name> <value>Centos100:2181,Centos101:2181,Centos102:2181</value> </property> <!-- zk的本地目录 --> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/home/zk/zookeeper</value> </property>
配置hbase-2.0.0/conf/regionservers,指定区域服务器主机名(配置完分发)
Centos101 Centos102
-
启动集群(在master节点上启动,注意:完全分布式启动之前需要先启动hadoop和zk)
#启动zk zkServer.sh start #启动hadoop start-all.sh #启动hbase start-hbase.sh
-
启动成功后master节点主机上会有一个进程HMaster,RS节点上会有一个进程HRegionServer
-
访问hbase的webui,端口是16010
-
查看zk
zkCli.sh ls / #查看RS节点 ls /hbase/rs #查看master get /hbase/master
-
启用备份master
#启用备份master直接去其他主机上执行以下命令即可在webui上的Backup Masters中看到(一般启动两个master就可以了) hbase-daemon.sh start master
-
校时
#hbase是以时间戳来作为数据的版本,所以说hbase集群的主机时间必须同步,否则容易发生故障
Hbase在第三方软件中的体现
Hbase数据在Hadoop中存储的位置
-
名称空间
-
表
-
表数据的分区
-
列族
-
列族中的数据
Hbase在zookeeper中的存储位置
- 名称空间
- 表
- meta表所在的服务器
查询数据一般都是先通过zk查到meta所在的服务器,然后查询meta分区表,根据提供的rowKey找到对应分区所在的服务器,查到数据
Hbase在Hbase中的体现
Hbase在webui中的体现
- 表格
- 表格分区信息
Hbase在shell中的体现
查询名称空间表和分区
Hbase命令
#启动master的命令
hbase-daemon.sh start master
#启动RS的命令(这个命令也是在master主机上执行)
hbase-daemons.sh start regionserver
#全部启动的命令
start-hbase.sh
#关闭hbase集群
stop-hbase.sh
#进入hbase的shell命令状态
hbase shell
#查看命令帮助
help
命令分组:
-
general(常规组)
#查看当前系统登录用户 whoami #查看版本 version
-
ddl(数据定义语言)
#在名字空间mydb1下创建表t1,拥有三个列族f1,f2,f3 create 'mydb1:t1','f1','f2','f3' #查看表结构 describe 'mydb1:t1' desc 'mydb1:t1' #清空表 truncate 'mybd1:t1' #删除表,需要先禁用表然后才能删除表 disable 'mydb1:t1' drop 'mydb1:t1' #解禁表,表格禁用后查询插入都不行 enable 'mydb1:t1'
-
namespace(名字空间)
#列出名字空间,最初会有两个空间,一个是default,一个是hbase,hbase属于系统库 list_namespace #列出指定名字空间下的表,注意:hbase中所有的参数都要加上单引号 list_namespace_tables 'default' #创建名字空间mydb1 create_namespace 'mydb1'
-
dml
#插入数据id=100,name='tom',age=12,三级坐标对应一个值,名称空间和表,行号,列族和列 put 'mydb1:t1','row1','f1:id',100 put 'mydb1:t1','row1','f1:name','tom' put 'mydb1:t1','row1','f1:age',12 #查询指定行的数据,查询必须指定row,因为hbase是按照rowKey切割表的,只能指定了rowkey,hbase才能找到对应的区域 get 'mydb1:t1','row1' #扫描表t1(上亿数据谨慎使用) scan 'mydb1:t1' #统计表的行数 count 'mydb1:t1'
-
tools
#清理内存缓冲区到磁盘文件 flush 'mydb1:t1' #切割表,不带参数,默认是从中间切割 split 'mydb1:t1' #切割表,不带参数,从指定的rowKey切割 split 'mydb1:t1','row008888' #移动分区,格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码) move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341' #合并分区,参数为两个分区id,只能合并两个分区 merge_region 'e15b2d06eb033f19934c26f5af3eab3d','8bf5e09451b8b5f516dff93f834927dd'
javaAPI操作Hbase
- 将服务器上的hbase-site.xml配置文件拷到项目中的src根目录下,注意如果服务器上的配置文件写的是主机名,那你这边不能把主机名变成ip,需要去修改hosts文件ip映射,如果服务器上配置的ip,那就写ip,总之不要去更改从服务器上考下来的配置文件
- maven环境搭建
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.0.0</version> </dependency>
- 代码实现(注意:当前代码都是在2.0.0版本的基础上写的,不同版本的hbase代码会有差异)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.text.DecimalFormat; /** * 插入数据 */ public static void put() throws Exception { //创建配置对象 Configuration conf = HBaseConfiguration.create(); //创建连接 Connection conn = ConnectionFactory.createConnection(conf); //创建表名对象 TableName tName = TableName.valueOf("mydb1:t1"); //获得表 Table table = conn.getTable(tName); //将字符串转化成字节数组 byte[] rowId = Bytes.toBytes("row2"); Put put = new Put(rowId); byte[] f1 = Bytes.toBytes("f1"); byte[] id = Bytes.toBytes("id"); byte[] valueId = Bytes.toBytes(101); put.addColumn(f1,id,valueId); byte[] name = Bytes.toBytes("name"); byte[] valueName = Bytes.toBytes("zhangsan"); put.addColumn(f1,name,valueName); byte[] hobby = Bytes.toBytes("hobby"); byte[] valueHobby = Bytes.toBytes("打篮球"); put.addColumn(f1,hobby,valueHobby); //执行插入 table.put(put); conn.close(); } /** * 查询数据 * @throws Exception */ public static void get() throws Exception { //创建配置对象 Configuration conf = HBaseConfiguration.create(); //创建连接 Connection conn = ConnectionFactory.createConnection(conf); //创建表名对象 TableName tName = TableName.valueOf("mydb1:t1"); //获得表 Table table = conn.getTable(tName); //将字符串转化成字节数组 byte[] rowId = Bytes.toBytes("row1"); Get get = new Get(rowId); Result r = table.get(get); byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")); System.out.println(Bytes.toString(value)); conn.close(); } /** * 批处理 * @throws Exception */ public static void batch() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t5")); //该集合里可以放get,put,delete操作 List<Row> rows = new ArrayList<Row>(); Put put = new Put(Bytes.toBytes("row003")); put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan")); rows.add(put); Get get = new Get(Bytes.toBytes("row001")); get.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name")); rows.add(get); Delete delete = new Delete(Bytes.toBytes("row002")); delete.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name")); rows.add(delete); //接收处理结果 Object[] results = new Object[rows.size()]; //处理 table.batch(rows,results); //get返回有结果,put和delete都是无返回结果的 Result r = (Result)results[1]; byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")); System.out.println(Bytes.toString(value)); conn.close(); } /** * HBase批处理存储 * @throws Exception */ public static void putAll() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tName = TableName.valueOf("mydb1:t1"); Table table = conn.getTable(tName); //设置不要自动提交缓冲区的内容 List<Put> puts = new ArrayList<Put>(); //设置rowKey的格式,因为rowKey是按位排序的 DecimalFormat df = new DecimalFormat(); df.applyPattern("0000000"); for(int i = 1; i < 100000; i++){ Put put = new Put(Bytes.toBytes("row" + df.format(i))); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("id"),Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("name"),Bytes.toBytes("Tom" + i)); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("age"),Bytes.toBytes(i % 100)); puts.add(put); } table.put(puts); conn.close(); } /** * 创建名称空间 * @throws Exception */ public static void createNameSpace() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //创建名称空间 NamespaceDescriptor nd = NamespaceDescriptor.create("mydb2").build(); admin.createNamespace(nd); conn.close(); } /** * 查询名称空间 * @throws Exception */ public static void listNameSpaces() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); NamespaceDescriptor[] nds = admin.listNamespaceDescriptors(); for(NamespaceDescriptor nd:nds){ System.out.println(nd.getName()); } conn.close(); } /** * 创建表 * @throws Exception */ public static void createTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); TableName tn = TableName.valueOf("mydb2:t2"); TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn); //添加列族f1 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family1 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1")); table.setColumnFamily(family1); //添加列族f2 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family2 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f2")); table.setColumnFamily(family2); //创建表 admin.createTable(table); conn.close(); } /** * 删除数据 * @throws Exception */ public static void deleteData() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb2:t2")); Delete delete = new Delete(Bytes.toBytes("row000001")); delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id")); delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name")); //删除第一行的f1列族的id和name table.delete(delete); conn.close(); } /** * 删除表 * @throws Exception */ public static void deleteTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //先禁用表 admin.disableTable(TableName.valueOf("mydb2:t2")); //再删除表 admin.deleteTable(TableName.valueOf("mydb2:t2")); conn.close(); } /** * 查询 * @throws Exception */ public static void scan() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); //设置查询范围 Scan scan = new Scan(); //设置每行返回的列数,如果超过这个列数会自动截断,在下一个迭代会返回(调优手段,对结果没有任何影响) scan.setBatch(3); //开启扫描器缓存,默认是-1(不开启),如果不开启,下面迭代数据时每一次调用next()都会向服务器发送一个RPC请求,影响性能(调优手段,对结果没有任何影响) scan.setCaching(1500); //默认包含首不包含尾,如果特别指定第二个参数为true就包含 scan.withStartRow(Bytes.toBytes("row004748"),true); scan.withStopRow(Bytes.toBytes("row004758"),true); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); Map<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes("f1")); for(byte[] key:map.keySet()){ String k = Bytes.toString(key); //注意:所有的值最好在存储时都转为字符串,否则在查询的时候动态查询无法准确定位它的数据类型,都转为字符串会乱码 String v= Bytes.toString(map.get(key)); System.out.print(k + "===" + v + "......"); } System.out.println(); } rs.close(); conn.close(); } /** * 更加细致的动态查询 * @throws Exception */ public static void scan2() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); //设置查询范围 Scan scan = new Scan(); //默认包含首不包含尾,如果特别指定第二个参数为true就包含 scan.withStartRow(Bytes.toBytes("row004748"),true); scan.withStopRow(Bytes.toBytes("row004758"),true); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); //最外层的map的key是列族,下面一层的map的key是列,最里面一层的map的key是数据版本(时间戳) Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap(); for(byte[] fBytes : map.keySet()){ //得到列族 String f = Bytes.toString(fBytes); NavigableMap<byte[], NavigableMap<Long, byte[]>> map2 = map.get(fBytes); for(byte[] cBytes : map2.keySet()){ //得到列 String c = Bytes.toString(cBytes); NavigableMap<Long, byte[]> map3 = map2.get(cBytes); for(Long e : map3.keySet()){ String val = Bytes.toString(map3.get(e)); System.out.print(f + ":" + c + ":" + e + ":" + val); } } } System.out.println(); } rs.close(); conn.close(); } /** * 获取多版本数据 * @throws Exception */ public static void getVersions() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tName = TableName.valueOf("mydb1:t2"); Table table = conn.getTable(tName); Get get = new Get(Bytes.toBytes("row001")); //获取所有版本的数据 get.readAllVersions(); //获取指定版本个数的数据 //get.readVersions(2); Result r = table.get(get); List<Cell> cells = r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name")); for(Cell cell:cells){ String f = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()); String q = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()); long timestamp = cell.getTimestamp(); String val = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); System.out.println("列族:" + f + ";列:" + q + ";版本:" + timestamp + ";值:" + val); } conn.close(); }
client交互hbase的过程
- hbase集群启动时,master负责分配区域到指定区域服务器。
- 联系zk,找出meta表所在rs(regionserver)
/hbase/meta-region-server - 定位row key,找到对应region server
- 缓存信息在本地。
- 联系RegionServer
- HRegionServer负责open HRegion对象,为每个列族创建Store对象,Store包含多个StoreFile实例,
他们是对HFile的轻量级封装。每个Store还对应了一个MemStore,用于内存存储数据。
Hbase切割文件
Hbase的分区默认是当分区文件的大小超过hbase.hregion.max.filesize属性值时,就会新开辟一个分区存储,默认是10G
<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value>
<source>hbase-default.xml</source>
</property>
手动分区
#切割表,不带参数,默认是从中间切割
split 'mydb1:t1'
#切割分区,在指定的rowKey切割
split 'mydb1:t1,row004927,1602595549576.ea72ecf8920a3525e226c786a3538847.','row008000'
注意: 原数据信息是存在缓冲区里面一段时间,不会长时间保留,只有切割后的分区信息数据会永久保留在磁盘上
#使用手动清理缓冲区可以清理掉原数据的信息,并将新的分区信息存储到磁盘上
flush 'hbase:meta'
手动移动分区
#格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码)
move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341'
分区id:
需要分配到的服务器的名称,端口,启动码:
meta表中显示:
预切割
预切割指的是在创建表的时候就对表划分好分区,这样的话就可以省的我们去不断的给表进行切割
#创建表时指定切割rowKey
create 'mydb1:t2','f1',SPLITS=>['row3000','row6000']
Hbase数据版本控制
#创建t2表,指定列族f1,数据允许保存3个版本
create 'mydb1:t2',{NAME=>'f1',VERSIONS=>'3'}
#查询指定需要看的版本个数
get 'mydb1:t2','row001',{COLUMN=>'f1',VERSIONS=>4}
#查询指定版本的数据
get 'mydb1:t2','row001',{COLUMN=>'f1',TIMESTAMP=>1603076828293}
#查询指定区域版本的数据,左闭右开区间
get 'mydb1:t2','row001',{COLUMN=>'f1',TIMERANGE=>[1603075388967,1603076828293],VERSIONS=>10}
#删除指定版本的数据(不指定默认删除最新版本的数据)
delete 'mydb1:t2','row001','f1:name',1603076828293
#原生扫描(可以把表格指定以外的版本都扫描出来,并且包含已删除的数据)
scan 'mydb1:t2',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}
创表参数
TTL
TTL是创建表的参数,用于指定数据的存活时间,单位秒,不指定默认是永久存活,如果超过这个时间使用get命令是无法查询到数据,但是可以使用原生扫描查询到(低版本的Hbase原生扫描同样查询不到)
#创建一个表,并指定表里的数据可以存活的时间为10秒
create 'mydb1:t3' , {NAME=>'f1',TTL=>10}
KEEP_DELETED_CELLS
KEEP_DELETED_CELLS是创建表的参数,用于指定删除数据的所有版本是否全部删除,布尔值,默认值false,不全部删除,指定true,表示删除之后通过get操作将无法获得表里的数据,但是原生扫描还是可以查询到的
#创建一个表,指定删除操作时删除全部版本的数据
create 'mydb1:t4' , {NAME=>'f1',KEEP_DELETED_CELLS=>true}
#删除数据
delete 'mydb1:t4','row001','f1:name'
#注意删除数据的时候需要清下缓存,否则还能查询到
flush 'mydb1:t4'
过滤器
过滤器的分类
比较过滤器
Hbase的所有比较过滤器都是继承org.apache.hadoop.hbase.filter.CompareFilter这个抽象类,Hbase分以下五种比较过滤器
- 行过滤器(org.apache.hadoop.hbase.filter.RowFilter)
- 列族过滤器(org.apache.hadoop.hbase.filter.FamilyFilter)
- 列过滤器(org.apache.hadoop.hbase.filter.QualifierFilter)
- 值过滤器(org.apache.hadoop.hbase.filter.ValueFilter)
- 依赖过滤器(org.apache.hadoop.hbase.filter.DependentColumnFilter)
专用过滤器
Hbase提供的专用过滤器都是直接继承org.apache.hadoop.hbase.filter.FilterBase,Hbase有以下几种专用过滤器
- 单列值过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueFilter)
- 单列排除过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter)
- 前缀过滤器(org.apache.commons.io.filefilter.PrefixFileFilter)
- 分页过滤器(org.apache.hadoop.hbase.filter.PageFilter)
- 行键过滤器(org.apache.hadoop.hbase.filter.KeyOnlyFilter)
- 首次行键过滤器(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter)
- 包含结束的过滤器(org.apache.hadoop.hbase.filter.InclusiveStopFilter)
- 时间戳过滤器(org.apache.hadoop.hbase.filter.TimestampsFilter)
- 列计数过滤器(org.apache.hadoop.hbase.filter.ColumnCountGetFilter)
- 列分页过滤(org.apache.hadoop.hbase.filter.ColumnPaginationFilter)
过滤器列表
org.apache.hadoop.hbase.filter.FilterList用于组合过滤器,他有两个枚举值
- MUST_PASS_ALL:and衔接过滤器
- MUST_PASS_ONE:or衔接过滤器
比较运算符集合类
org.apache.hadoop.hbase.CompareOperator类是Hbase过滤器比较运算符的类,该类下面有七个常量:
常量名称 | 描述 |
---|---|
LESS | 小于 |
LESS_OR_EQUAL | 小于等于 |
EQUAL | 等于 |
NOT_EQUAL | 不等于 |
GREATER_OR_EQUAL | 大于等于 |
GREATER | 大于 |
NO_OP | 排除一切值 |
比较器
HBase的比较器都继承了org.apache.hadoop.hbase.filter.ByteArrayComparable这个抽象类,Hbase共有以下几种比较器
- org.apache.hadoop.hbase.filter.BinaryComparator(按照阈值比较)
- org.apache.hadoop.hbase.filter.BinaryPrefixComparator(匹配前缀)
- org.apache.hadoop.hbase.filter.BitComparator(按位比较字节数组,只能与EQUAL 和NOT_EQUAL 配合使用)
- org.apache.hadoop.hbase.filter.NullComparator(判断当前值是否为null)
- org.apache.hadoop.hbase.filter.RegexStringComparator(与正则表达式匹配,只能与EQUAL 和NOT_EQUAL 配合使用)(正则表达式中的有两个常用的符号:(tom$:表示以tom结尾,^tom:表示以tom开头,相当于sql语句里面的like))
- org.apache.hadoop.hbase.filter.SubstringComparator(匹配子字符串,只能与EQUAL 和NOT_EQUAL 配合使用)
过滤器的用法
/**
* 行级过滤
* @throws Exception
*/
public static void rowFilter() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Scan scan = new Scan();
//获取小于row000100的数据
RowFilter rf = new RowFilter(CompareOperator.LESS ,new BinaryComparator(Bytes.toBytes("row000100")));
scan.setFilter(rf);
Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
ResultScanner rs = table.getScanner(scan);
for(Result r : rs){
//String val = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
String row = Bytes.toString(r.getRow());
System.out.println(row);
}
rs.close();
conn.close();
}
/**
* 列族过滤
* @throws Exception
*/
public static void familyFilter() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Scan scan = new Scan();
//获取列族=f1的数据
FamilyFilter ff = new FamilyFilter(CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("f1")));
scan.setFilter(ff);
Table table = conn.getTable(TableName.valueOf("mydb1:t6"));
ResultScanner rs = table.getScanner(scan);
for(Result r:rs){
System.out.println(Bytes.toString(r.value()));
}
}
/**
* 列过滤
* @throws Exception
*/
public static void qualifierFilter() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Get get = new Get(Bytes.toBytes("row000001"));
QualifierFilter qf = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("name")));
get.setFilter(qf);
Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
Result r = table.get(get);
System.out.println(Bytes.toString(r.value()));
conn.close();
}
/**
* 依赖过滤器,根据某个列的值过滤一整行记录,相当于sql语句的条件表达式(重要)
* @param drop
* @param co
* @param bc
* @throws Exception
*/
public static void dependentFilter(boolean drop,CompareOperator co,ByteArrayComparable bc) throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Scan scan = new Scan();
//参数:参考列族,参考列,返回结果是否包含参考列的值(boolean值),比较运算符,比较值
DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop,CompareOperator.EQUAL ,new BinaryPrefixComparator(Bytes.toBytes("Tom1")));
//后面两个参数不填表示返回所有
//DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop);
scan.setFilter(df);
Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
ResultScanner rs = table.getScanner(scan);
for(Result r : rs){
int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
System.out.println("id:" + id + ";name:" + name + ";age:" + age);
}
rs.close();
conn.close();
}
/**
* 单列值过滤器,根据某一列的值过滤一行记录
* @throws Exception
*/
public static void singleValue() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Scan scan = new Scan();
//
SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("Tom10")));
scan.setFilter(sf);
Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
ResultScanner rs = table.getScanner(scan);
for(Result r : rs){
int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
System.out.println("id:" + id + ";name:" + name + ";age:" + age);
}
rs.close();
conn.close();
}
//过滤器组合
public static void fileList() throws Exception {
//代码逻辑用sql表示:select * from mydb1:t1 where name like '^Tom1' and age = 55
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Scan scan = new Scan();
SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("name"), CompareOperator.EQUAL, new RegexStringComparator("^Tom1"));
SingleColumnValueFilter ageFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("age"), CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(55)));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(nameFilter);
filterList.addFilter(ageFilter);
scan.setFilter(filterList);
Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
ResultScanner rs = table.getScanner(scan);
for(Result r : rs){
int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
System.out.println("id:" + id + ";name:" + name + ";age:" + age);
}
rs.close();
conn.close();
}
Hbase计数器
Hbase计数器一般用于收集统计信息或点击量,他能做到实时统计,并且没有高并发的情况,相对于延时比较高的批处理操作更好
计数器的命令
#执行以下命令每执行一次会在hits上加1,对表格并没有什么要求,第一次执行以下命令,就会把该列的默认值设为0,然后再加一
incr 'mydb1:t7','row001','dayily:hits'
#查看计数器列的值(get也可以,但是计数器的数据类型是int类型,在命令行显示的是16进制,可读性比较差,使用get_counter可以很清晰的显示)
get_counter 'mydb1:t7','row001','dayily:hits'
#指定增加的步长,这个步长可以为负数
incr 'mydb1:t7','row001','dayily:hits',10
javaAPI操作计数器
/**
* 计数器
* @throws Exception
*/
public static void incr() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
//创建计数器对象指定行
Increment increment = new Increment(Bytes.toBytes("row001"));
//指定列族,列和步长
increment.addColumn(Bytes.toBytes("dayily"),Bytes.toBytes("hits"),1);
increment.addColumn(Bytes.toBytes("monthly"),Bytes.toBytes("hits"),1);
increment.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),1);
Table table = conn.getTable(TableName.valueOf("mydb1:t7"));
table.increment(increment);
conn.close();
}
rowKey的设计
假设我们对全市的过车数据进行设计rowkey,总共有10台服务器
- 过车数据的数据业务一般跟过车时间,车牌号码,卡口ID有关,我们用过车时间,车牌号码,卡口ID设计rowKey
- 计算分区编号,共十台服务器,我们把过车时间,车牌号,卡口ID组合起来,求出hash值,然后用hash值与10求余,得到0-9的编号,根据编号可以把数据对应放在不同服务器上,防止热点问题
- rowkey组合:分区编号,过车时间,车牌号,卡口ID
- 上面我们要查询过车时间,车牌号都是可以的,但是如果要查询卡口的过车数据就很麻烦,那么我们可以创建一个索引表,rowKey也按照上面的组合,但是要卡口ID方在分区编号的后面,然后把主表对应的rowKey作为值放在索引表里面
布隆过滤器
布隆过滤器是优化查询的一种手段,可以通过布隆过滤器判断一个分区中是否包含你查询的rowKey,如果这个文件不包含,它会给予你明确的答复,如果包含,它会给你一个可能性的答复,因此他可以提神get查询的性能,避免了全表扫描
创建一个带有布隆过滤器的表:
public static void createBloomFilter() throws Exception {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
TableName tn = TableName.valueOf("mydb2:t3");
TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn);
//添加列族f1
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1"));
//设置布隆过滤器,NONE:不启用布隆过滤器(默认值),ROW:行级别布隆过滤器(推荐使用),ROWCOL:行和列级别(消耗资源比较大,不推荐使用)
family.setBloomFilterType(BloomType.ROW);
table.setColumnFamily(family);
admin.createTable(table);
conn.close();
}
创建完成后,用desc查看表格描述,可以看到BLOOMFILTER => 'ROW'
这样一段代码
sql可视化界面
- 下载apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz
- 将其上传到服务器,并解压
- 复制phoenix-5.0.0-HBase-2.0-server.jar到Hbase的lib的目录下,并且分发
- 重启Hbase
- 进入/apache-phoenix-5.0.0-HBase-2.0-bin/bin目录下输入
./sqlline.py Centos100
连接到zk服务器,进入phoenix客户端,输入以下命令#查看表 !tables #查看帮助 !help #创建表,必须指定primary key,这里主键就是rowKey !sql create table mydb1.person(id varchar(10) primary key,name varchar(100),age Integer); #删除表 !sql drop table mydb1.person; #查看表结构 !describe TEST; #插入数据修改数据 upsert into TEST(id,name) values ('row005','lisi'); #删除数据 delete from TEST where id = 'row005'; #退出 !quit
- 查看表格在Hbase中的位置以及表的描述
- 下载squirrel-sql-4.1.0-standard.jar
- 双击squirrel-sql-4.1.0-standard.jar安装squirrel(直接下一步到底)
- 将phoenix-5.0.0-HBase-2.0-client.jar复制到squirrel的安装目录下
- 打开 squirrel
- 点击下方按钮安装驱动
- 添加数据库连接
- 结构
更多推荐
所有评论(0)