HBase的介绍

HBase是基于hadoop的数据库,分布式可伸缩(可任意增加或减少节点)大型数据存储,随机、实时读写数据,可以存储十亿行,百万列,版本化、非关系型,面向列(列不固定)的数据库,非关系型数据库还有Redis和mongodb,存储形式都是kv对。

HBase的特征

  • 线性模块化扩展方式。
  • 严格一致性读写
  • 自动可配置表切割
  • 区域服务器之间自动容灾
  • HBase支持Hadoop MR作业
  • 易于使用的Java API
  • 块缓存和布隆过滤器用于实时查询
  • 通过服务器端过滤器实现查询预测
  • 支持XML, Protobuf, and binary data等串行化数据技术
  • 可扩展的shell
  • 可视化

HBase的存储机制

面向列存储 ,table是按行排序,HBase的存储结构如下:
在这里插入图片描述
因此定位一个数据需要该数据的行号,列族和列,版本号

HBase的搭建

HBase有两种节点,master(管理节点)和RegionServer(区域服务器)
这里我选择Centos100作为master,Centos101和Centos102作为RS(一般来说应该选择奇数个服务器作为RS,但是本人电脑资源有限只能选择两台)

  1. 安装jdk

  2. 安装hadoop

  3. 安装HBase

    	#解压
    	tar -zvxf hbase-2.0.0-bin.tar.gz -C /soft/
    	
    	#分发
    	xsync.sh hbase-2.0.0
    
    	#配置环境变量
    	export HBASE_HOME=/soft/hbase-2.0.0
    	export PATH=$PATH:$HBASE_HOME/bin
    	source /etc/profile
    
    	#验证环境变量是否配置成功
    	hbase version
    
  4. 修改hbase-2.0.0/conf/hbase-env.sh,然后分发

    	#修改JAVA_HOME路径
    	export JAVA_HOME=/soft/jdk1.8.0_65/
    	#修改zk管理,hbase自己已经集成了zk,如果我们想用自己搭建的zk去管理Hbase,就要把下面的属性设置成false
    	export HBASE_MANAGES_ZK=false
    
  5. HBase本地模式
    hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性

    	<property>
    		<name>hbase.rootdir</name>
    		<value>file:/home/hadoop/HBase/HFiles</value>
    	</property>
    
  6. HBase伪分布式
    hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性

    	<!--启用分布式-->
    	<property>
    		<name>hbase.cluster.distributed</name>
    		<value>true</value>
    	</property>
    	<property>
    		<name>hbase.rootdir</name>
    		<value>hdfs://localhost:8030/hbase</value>
    	</property>
    
  7. HBase完全分布式
    hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性,配置完后分发

    	<!--启用分布式-->
    	<property>
    		<name>hbase.cluster.distributed</name>
    		<value>true</value>
    	</property>
    	
    	<!-- 指定hbase数据在hdfs上的存放路径,注意这里如果配置的是hadoopHA高可用的dfs.nameservices属性的值时需要将hadoop中的hdfs-sit.xml和core-site.xml复制到hbase-2.0.0/conf/路径下,否则不识别-->
    	<property>
    		<name>hbase.rootdir</name>
    		<value>hdfs://mycluster/hbase</value>
    	</property>
    	<!-- 配置zk地址 -->
    	<property>
    		<name>hbase.zookeeper.quorum</name>
    		<value>Centos100:2181,Centos101:2181,Centos102:2181</value>
    	</property>
    	<!-- zk的本地目录 -->
    	<property>
    		<name>hbase.zookeeper.property.dataDir</name>
    		<value>/home/zk/zookeeper</value>
    	</property>
    

    配置hbase-2.0.0/conf/regionservers,指定区域服务器主机名(配置完分发)

    	Centos101
    	Centos102
    
  8. 启动集群(在master节点上启动,注意:完全分布式启动之前需要先启动hadoop和zk)

    	#启动zk
    	zkServer.sh start
    	#启动hadoop
    	start-all.sh
    	#启动hbase
    	start-hbase.sh
    
  9. 启动成功后master节点主机上会有一个进程HMaster,RS节点上会有一个进程HRegionServer
    在这里插入图片描述

  10. 访问hbase的webui,端口是16010
    在这里插入图片描述

  11. 查看zk

    	zkCli.sh
    	ls /
    	#查看RS节点
    	ls /hbase/rs
    	#查看master
    	get /hbase/master
    
  12. 启用备份master

    	#启用备份master直接去其他主机上执行以下命令即可在webui上的Backup Masters中看到(一般启动两个master就可以了)
    	hbase-daemon.sh start master
    
  13. 校时

    	#hbase是以时间戳来作为数据的版本,所以说hbase集群的主机时间必须同步,否则容易发生故障
    

Hbase在第三方软件中的体现

Hbase数据在Hadoop中存储的位置
  1. 名称空间
    在这里插入图片描述


  2. 在这里插入图片描述

  3. 表数据的分区
    在这里插入图片描述

  4. 列族
    在这里插入图片描述

  5. 列族中的数据
    在这里插入图片描述

Hbase在zookeeper中的存储位置
  1. 名称空间
    在这里插入图片描述

  2. 在这里插入图片描述
  3. meta表所在的服务器
    在这里插入图片描述
    查询数据一般都是先通过zk查到meta所在的服务器,然后查询meta分区表,根据提供的rowKey找到对应分区所在的服务器,查到数据
Hbase在Hbase中的体现
Hbase在webui中的体现
  1. 表格
    在这里插入图片描述
  2. 表格分区信息
    在这里插入图片描述
Hbase在shell中的体现

查询名称空间表和分区
在这里插入图片描述

Hbase命令

	#启动master的命令
	hbase-daemon.sh start master

	#启动RS的命令(这个命令也是在master主机上执行)
	hbase-daemons.sh start regionserver

	#全部启动的命令
	start-hbase.sh

	#关闭hbase集群
	stop-hbase.sh

	#进入hbase的shell命令状态
	hbase shell

	#查看命令帮助
	help

命令分组:

  1. general(常规组)

    	#查看当前系统登录用户
    	whoami
    	
    	#查看版本
    	version
    
  2. ddl(数据定义语言)

    	#在名字空间mydb1下创建表t1,拥有三个列族f1,f2,f3
    	create 'mydb1:t1','f1','f2','f3'
    	
    	#查看表结构
    	describe 'mydb1:t1'
    	desc 'mydb1:t1'
    
    	#清空表
    	truncate 'mybd1:t1'
    	
    	#删除表,需要先禁用表然后才能删除表
    	disable 'mydb1:t1'
    	drop 'mydb1:t1'
    
    	#解禁表,表格禁用后查询插入都不行
    	enable 'mydb1:t1'
    
  3. namespace(名字空间)

    	#列出名字空间,最初会有两个空间,一个是default,一个是hbase,hbase属于系统库
    	list_namespace
    	
    	#列出指定名字空间下的表,注意:hbase中所有的参数都要加上单引号
    	list_namespace_tables 'default'
    	
    	#创建名字空间mydb1
    	create_namespace 'mydb1'	
    
  4. dml

    	#插入数据id=100,name='tom',age=12,三级坐标对应一个值,名称空间和表,行号,列族和列
    	put 'mydb1:t1','row1','f1:id',100
    	put 'mydb1:t1','row1','f1:name','tom'
    	put 'mydb1:t1','row1','f1:age',12
    		
    	#查询指定行的数据,查询必须指定row,因为hbase是按照rowKey切割表的,只能指定了rowkey,hbase才能找到对应的区域
    	get 'mydb1:t1','row1'
    
    	#扫描表t1(上亿数据谨慎使用)
    	scan 'mydb1:t1'
    
    	#统计表的行数
    	count 'mydb1:t1'
    
  5. tools

    	#清理内存缓冲区到磁盘文件
    	flush 'mydb1:t1'
    
    	#切割表,不带参数,默认是从中间切割
    	split 'mydb1:t1'
    
    	#切割表,不带参数,从指定的rowKey切割
    	split 'mydb1:t1','row008888'
    
    	#移动分区,格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码)
    	move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341'
    
    	#合并分区,参数为两个分区id,只能合并两个分区
    	merge_region 'e15b2d06eb033f19934c26f5af3eab3d','8bf5e09451b8b5f516dff93f834927dd'
    

javaAPI操作Hbase

  1. 将服务器上的hbase-site.xml配置文件拷到项目中的src根目录下,注意如果服务器上的配置文件写的是主机名,那你这边不能把主机名变成ip,需要去修改hosts文件ip映射,如果服务器上配置的ip,那就写ip,总之不要去更改从服务器上考下来的配置文件
  2. maven环境搭建
    	<dependency>
    		<groupId>org.apache.hbase</groupId>
    		<artifactId>hbase-client</artifactId>
    		<version>2.0.0</version>
    	</dependency>
    
  3. 代码实现(注意:当前代码都是在2.0.0版本的基础上写的,不同版本的hbase代码会有差异)
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.NamespaceDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.text.DecimalFormat;
    	/**
         * 插入数据
         */
        public static void put() throws Exception {
            //创建配置对象
            Configuration conf = HBaseConfiguration.create();
            //创建连接
            Connection conn = ConnectionFactory.createConnection(conf);
            //创建表名对象
            TableName tName = TableName.valueOf("mydb1:t1");
            //获得表
            Table table = conn.getTable(tName);
            //将字符串转化成字节数组
            byte[] rowId = Bytes.toBytes("row2");
    
            Put put = new Put(rowId);
    
            byte[] f1 = Bytes.toBytes("f1");
            byte[] id = Bytes.toBytes("id");
            byte[] valueId = Bytes.toBytes(101);
            put.addColumn(f1,id,valueId);
    
            byte[] name = Bytes.toBytes("name");
            byte[] valueName = Bytes.toBytes("zhangsan");
            put.addColumn(f1,name,valueName);
    
            byte[] hobby = Bytes.toBytes("hobby");
            byte[] valueHobby = Bytes.toBytes("打篮球");
            put.addColumn(f1,hobby,valueHobby);
    
            //执行插入
            table.put(put);
        	conn.close();
        }
    
        /**
         * 查询数据
         * @throws Exception
         */
        public static void get() throws Exception {
            //创建配置对象
            Configuration conf = HBaseConfiguration.create();
            //创建连接
            Connection conn = ConnectionFactory.createConnection(conf);
            //创建表名对象
            TableName tName = TableName.valueOf("mydb1:t1");
            //获得表
            Table table = conn.getTable(tName);
            //将字符串转化成字节数组
            byte[] rowId = Bytes.toBytes("row1");
            Get get = new Get(rowId);
            Result r = table.get(get);
            byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(value));
        	conn.close();
        }
    
    	/**
         * 批处理
         * @throws Exception
         */
        public static void batch() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf("mydb1:t5"));
            
            //该集合里可以放get,put,delete操作
            List<Row> rows = new ArrayList<Row>();
    
            Put put = new Put(Bytes.toBytes("row003"));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"));
            rows.add(put);
    
            Get get = new Get(Bytes.toBytes("row001"));
            get.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            rows.add(get);
    
            Delete delete = new Delete(Bytes.toBytes("row002"));
            delete.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            rows.add(delete);
    
            //接收处理结果
            Object[] results = new Object[rows.size()];
            //处理
            table.batch(rows,results);
            
            //get返回有结果,put和delete都是无返回结果的
            Result r = (Result)results[1];
            byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(value));
        	conn.close();
        }
    	
    	/**
         * HBase批处理存储
         * @throws Exception
         */
    	public static void putAll() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            TableName tName = TableName.valueOf("mydb1:t1");
            Table table = conn.getTable(tName);
            //设置不要自动提交缓冲区的内容
            List<Put> puts = new ArrayList<Put>();
            //设置rowKey的格式,因为rowKey是按位排序的
            DecimalFormat df = new DecimalFormat();
        	df.applyPattern("0000000");
            for(int i = 1; i < 100000; i++){
                Put put = new Put(Bytes.toBytes("row" + df.format(i)));
                put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("id"),Bytes.toBytes(i));
                put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("name"),Bytes.toBytes("Tom" + i));
                put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("age"),Bytes.toBytes(i % 100));
                puts.add(put);
            }
            table.put(puts);
        	conn.close();
        }
    
    	/**
         * 创建名称空间
         * @throws Exception
         */
        public static void createNameSpace() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            //创建名称空间
            NamespaceDescriptor nd = NamespaceDescriptor.create("mydb2").build();
            admin.createNamespace(nd);
        	conn.close();
        }
    
    	/**
         * 查询名称空间
         * @throws Exception
         */
        public static void listNameSpaces() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            NamespaceDescriptor[] nds = admin.listNamespaceDescriptors();
            for(NamespaceDescriptor nd:nds){
                System.out.println(nd.getName());
            }
        	conn.close();
        }
    
    	/**
         * 创建表
         * @throws Exception
         */
        public static void createTable() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            TableName tn = TableName.valueOf("mydb2:t2");
            TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn);
            //添加列族f1
            ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family1 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1"));
            table.setColumnFamily(family1);
            //添加列族f2
            ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family2 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f2"));
            table.setColumnFamily(family2);
            //创建表
            admin.createTable(table);
        	conn.close();
        }
    
    	/**
         * 删除数据
         * @throws Exception
         */
        public static void deleteData() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf("mydb2:t2"));
            Delete delete = new Delete(Bytes.toBytes("row000001"));
            delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"));
            delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"));
            //删除第一行的f1列族的id和name
            table.delete(delete);
        	conn.close();
        }
    
    	/**
         * 删除表
         * @throws Exception
         */
        public static void deleteTable() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            //先禁用表
            admin.disableTable(TableName.valueOf("mydb2:t2"));
            //再删除表
            admin.deleteTable(TableName.valueOf("mydb2:t2"));
        	conn.close();
        }
    
    	/**
         * 查询
         * @throws Exception
         */
        public static void scan() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
            //设置查询范围
            Scan scan = new Scan();
            //设置每行返回的列数,如果超过这个列数会自动截断,在下一个迭代会返回(调优手段,对结果没有任何影响)
        	scan.setBatch(3);
            //开启扫描器缓存,默认是-1(不开启),如果不开启,下面迭代数据时每一次调用next()都会向服务器发送一个RPC请求,影响性能(调优手段,对结果没有任何影响)
        	scan.setCaching(1500);
            //默认包含首不包含尾,如果特别指定第二个参数为true就包含
            scan.withStartRow(Bytes.toBytes("row004748"),true);
            scan.withStopRow(Bytes.toBytes("row004758"),true);
            ResultScanner rs = table.getScanner(scan);
            Iterator<Result> iterator = rs.iterator();
            while (iterator.hasNext()){
                Result result = iterator.next();
                Map<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes("f1"));
                for(byte[] key:map.keySet()){
                    String k = Bytes.toString(key);
                    //注意:所有的值最好在存储时都转为字符串,否则在查询的时候动态查询无法准确定位它的数据类型,都转为字符串会乱码
                    String v= Bytes.toString(map.get(key));
                    System.out.print(k + "===" + v + "......");
                }
                System.out.println();
            }
            rs.close();
        	conn.close();
        }
    
    	/**
         * 更加细致的动态查询
         * @throws Exception
         */
        public static void scan2() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
            //设置查询范围
            Scan scan = new Scan();
            //默认包含首不包含尾,如果特别指定第二个参数为true就包含
            scan.withStartRow(Bytes.toBytes("row004748"),true);
            scan.withStopRow(Bytes.toBytes("row004758"),true);
            ResultScanner rs = table.getScanner(scan);
            Iterator<Result> iterator = rs.iterator();
            while (iterator.hasNext()){
                Result result = iterator.next();
                //最外层的map的key是列族,下面一层的map的key是列,最里面一层的map的key是数据版本(时间戳)
                Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
                for(byte[] fBytes : map.keySet()){
                    //得到列族
                    String f = Bytes.toString(fBytes);
                    NavigableMap<byte[], NavigableMap<Long, byte[]>> map2 = map.get(fBytes);
                    for(byte[] cBytes : map2.keySet()){
                        //得到列
                        String c = Bytes.toString(cBytes);
                        NavigableMap<Long, byte[]> map3 = map2.get(cBytes);
                        for(Long e : map3.keySet()){
                            String val = Bytes.toString(map3.get(e));
                            System.out.print(f + ":" + c + ":" + e + ":" + val);
                        }
                    }
                }
                System.out.println();
            }
            rs.close();
        	conn.close();
        }
    
    	/**
         * 获取多版本数据
         * @throws Exception
         */
        public static void getVersions() throws Exception {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            TableName tName = TableName.valueOf("mydb1:t2");
            Table table = conn.getTable(tName);
            Get get = new Get(Bytes.toBytes("row001"));
            //获取所有版本的数据
            get.readAllVersions();
            //获取指定版本个数的数据
            //get.readVersions(2);
            Result r = table.get(get);
            List<Cell> cells = r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            for(Cell cell:cells){
                String f = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
                String q = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
                long timestamp = cell.getTimestamp();
                String val = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
                System.out.println("列族:" + f + ";列:" + q + ";版本:" + timestamp + ";值:" + val);
            }
       	 	conn.close();
        }
    

client交互hbase的过程

  1. hbase集群启动时,master负责分配区域到指定区域服务器。
  2. 联系zk,找出meta表所在rs(regionserver)
    /hbase/meta-region-server
  3. 定位row key,找到对应region server
  4. 缓存信息在本地。
  5. 联系RegionServer
  6. HRegionServer负责open HRegion对象,为每个列族创建Store对象,Store包含多个StoreFile实例,
    他们是对HFile的轻量级封装。每个Store还对应了一个MemStore,用于内存存储数据。

Hbase切割文件

Hbase的分区默认是当分区文件的大小超过hbase.hregion.max.filesize属性值时,就会新开辟一个分区存储,默认是10G

	<property>
		<name>hbase.hregion.max.filesize</name>
		<value>10737418240</value>
		<source>hbase-default.xml</source>
	</property>
手动分区
	#切割表,不带参数,默认是从中间切割
	split 'mydb1:t1'

	#切割分区,在指定的rowKey切割
	split 'mydb1:t1,row004927,1602595549576.ea72ecf8920a3525e226c786a3538847.','row008000'

在这里插入图片描述
在这里插入图片描述
注意: 原数据信息是存在缓冲区里面一段时间,不会长时间保留,只有切割后的分区信息数据会永久保留在磁盘上

	#使用手动清理缓冲区可以清理掉原数据的信息,并将新的分区信息存储到磁盘上
	flush 'hbase:meta'
手动移动分区
	#格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码)
	move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341'

分区id:
在这里插入图片描述

需要分配到的服务器的名称,端口,启动码:
在这里插入图片描述
meta表中显示:
在这里插入图片描述

预切割

预切割指的是在创建表的时候就对表划分好分区,这样的话就可以省的我们去不断的给表进行切割

	#创建表时指定切割rowKey
	create 'mydb1:t2','f1',SPLITS=>['row3000','row6000']

Hbase数据版本控制

	#创建t2表,指定列族f1,数据允许保存3个版本
	create 'mydb1:t2',{NAME=>'f1',VERSIONS=>'3'}
	#查询指定需要看的版本个数
	get 'mydb1:t2','row001',{COLUMN=>'f1',VERSIONS=>4}
	#查询指定版本的数据
	get 'mydb1:t2','row001',{COLUMN=>'f1',TIMESTAMP=>1603076828293}
	#查询指定区域版本的数据,左闭右开区间
	get 'mydb1:t2','row001',{COLUMN=>'f1',TIMERANGE=>[1603075388967,1603076828293],VERSIONS=>10}
	#删除指定版本的数据(不指定默认删除最新版本的数据)
	delete 'mydb1:t2','row001','f1:name',1603076828293
	#原生扫描(可以把表格指定以外的版本都扫描出来,并且包含已删除的数据)
	scan 'mydb1:t2',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}

创表参数

TTL

TTL是创建表的参数,用于指定数据的存活时间,单位秒,不指定默认是永久存活,如果超过这个时间使用get命令是无法查询到数据,但是可以使用原生扫描查询到(低版本的Hbase原生扫描同样查询不到)

	#创建一个表,并指定表里的数据可以存活的时间为10秒
	create 'mydb1:t3' , {NAME=>'f1',TTL=>10}
KEEP_DELETED_CELLS

KEEP_DELETED_CELLS是创建表的参数,用于指定删除数据的所有版本是否全部删除,布尔值,默认值false,不全部删除,指定true,表示删除之后通过get操作将无法获得表里的数据,但是原生扫描还是可以查询到的

	#创建一个表,指定删除操作时删除全部版本的数据
	create 'mydb1:t4' , {NAME=>'f1',KEEP_DELETED_CELLS=>true}
	#删除数据
	delete 'mydb1:t4','row001','f1:name'
	#注意删除数据的时候需要清下缓存,否则还能查询到
	flush 'mydb1:t4'

过滤器

过滤器的分类
比较过滤器

Hbase的所有比较过滤器都是继承org.apache.hadoop.hbase.filter.CompareFilter这个抽象类,Hbase分以下五种比较过滤器

  • 行过滤器(org.apache.hadoop.hbase.filter.RowFilter)
  • 列族过滤器(org.apache.hadoop.hbase.filter.FamilyFilter)
  • 列过滤器(org.apache.hadoop.hbase.filter.QualifierFilter)
  • 值过滤器(org.apache.hadoop.hbase.filter.ValueFilter)
  • 依赖过滤器(org.apache.hadoop.hbase.filter.DependentColumnFilter)
专用过滤器

Hbase提供的专用过滤器都是直接继承org.apache.hadoop.hbase.filter.FilterBase,Hbase有以下几种专用过滤器

  • 单列值过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueFilter)
  • 单列排除过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter)
  • 前缀过滤器(org.apache.commons.io.filefilter.PrefixFileFilter)
  • 分页过滤器(org.apache.hadoop.hbase.filter.PageFilter)
  • 行键过滤器(org.apache.hadoop.hbase.filter.KeyOnlyFilter)
  • 首次行键过滤器(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter)
  • 包含结束的过滤器(org.apache.hadoop.hbase.filter.InclusiveStopFilter)
  • 时间戳过滤器(org.apache.hadoop.hbase.filter.TimestampsFilter)
  • 列计数过滤器(org.apache.hadoop.hbase.filter.ColumnCountGetFilter)
  • 列分页过滤(org.apache.hadoop.hbase.filter.ColumnPaginationFilter)
过滤器列表

org.apache.hadoop.hbase.filter.FilterList用于组合过滤器,他有两个枚举值

  • MUST_PASS_ALL:and衔接过滤器
  • MUST_PASS_ONE:or衔接过滤器
比较运算符集合类

org.apache.hadoop.hbase.CompareOperator类是Hbase过滤器比较运算符的类,该类下面有七个常量:

常量名称描述
LESS小于
LESS_OR_EQUAL小于等于
EQUAL等于
NOT_EQUAL不等于
GREATER_OR_EQUAL大于等于
GREATER大于
NO_OP排除一切值
比较器

HBase的比较器都继承了org.apache.hadoop.hbase.filter.ByteArrayComparable这个抽象类,Hbase共有以下几种比较器

  • org.apache.hadoop.hbase.filter.BinaryComparator(按照阈值比较)
  • org.apache.hadoop.hbase.filter.BinaryPrefixComparator(匹配前缀)
  • org.apache.hadoop.hbase.filter.BitComparator(按位比较字节数组,只能与EQUAL 和NOT_EQUAL 配合使用)
  • org.apache.hadoop.hbase.filter.NullComparator(判断当前值是否为null)
  • org.apache.hadoop.hbase.filter.RegexStringComparator(与正则表达式匹配,只能与EQUAL 和NOT_EQUAL 配合使用)(正则表达式中的有两个常用的符号:(tom$:表示以tom结尾,^tom:表示以tom开头,相当于sql语句里面的like))
  • org.apache.hadoop.hbase.filter.SubstringComparator(匹配子字符串,只能与EQUAL 和NOT_EQUAL 配合使用)
过滤器的用法
	/**
     * 行级过滤
     * @throws Exception
     */
    public static void rowFilter() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        //获取小于row000100的数据
        RowFilter rf = new RowFilter(CompareOperator.LESS ,new BinaryComparator(Bytes.toBytes("row000100")));
        scan.setFilter(rf);
        Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
        ResultScanner rs = table.getScanner(scan);
        for(Result r : rs){
            //String val = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
            String row = Bytes.toString(r.getRow());
            System.out.println(row);
        }
        rs.close();
        conn.close();
    }

	/**
     * 列族过滤
     * @throws Exception
     */
    public static void familyFilter() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        //获取列族=f1的数据
        FamilyFilter ff = new FamilyFilter(CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("f1")));
        scan.setFilter(ff);
        Table table = conn.getTable(TableName.valueOf("mydb1:t6"));
        ResultScanner rs = table.getScanner(scan);
        for(Result r:rs){
            System.out.println(Bytes.toString(r.value()));
        }
    }

	/**
     * 列过滤
     * @throws Exception
     */
    public static void qualifierFilter() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Get get = new Get(Bytes.toBytes("row000001"));
        QualifierFilter qf = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("name")));
        get.setFilter(qf);
        Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
        Result r = table.get(get);
        System.out.println(Bytes.toString(r.value()));
        conn.close();
    }

	/**
     * 依赖过滤器,根据某个列的值过滤一整行记录,相当于sql语句的条件表达式(重要)
     * @param drop
     * @param co
     * @param bc
     * @throws Exception
     */
    public static void dependentFilter(boolean drop,CompareOperator co,ByteArrayComparable bc) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        //参数:参考列族,参考列,返回结果是否包含参考列的值(boolean值),比较运算符,比较值
        DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop,CompareOperator.EQUAL ,new BinaryPrefixComparator(Bytes.toBytes("Tom1")));
        //后面两个参数不填表示返回所有
        //DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop);
        scan.setFilter(df);
        Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
        ResultScanner rs = table.getScanner(scan);
        for(Result r : rs){
            int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
            String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
            int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
            System.out.println("id:" + id + ";name:" + name + ";age:" + age);
        }
        rs.close();
        conn.close();
    }

	/**
     * 单列值过滤器,根据某一列的值过滤一行记录
     * @throws Exception
     */
    public static void singleValue() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        //
        SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("Tom10")));
        scan.setFilter(sf);
        Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
        ResultScanner rs = table.getScanner(scan);
        for(Result r : rs){
            int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
            String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
            int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
            System.out.println("id:" + id + ";name:" + name + ";age:" + age);
        }
        rs.close();
        conn.close();
    }

	//过滤器组合
    public static void fileList() throws Exception {
        //代码逻辑用sql表示:select * from mydb1:t1 where name like  '^Tom1' and age = 55
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("name"), CompareOperator.EQUAL, new RegexStringComparator("^Tom1"));
        SingleColumnValueFilter ageFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("age"), CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(55)));
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        filterList.addFilter(nameFilter);
        filterList.addFilter(ageFilter);
        scan.setFilter(filterList);
        Table table = conn.getTable(TableName.valueOf("mydb1:t1"));
        ResultScanner rs = table.getScanner(scan);
        for(Result r : rs){
            int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id")));
            String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")));
            int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")));
            System.out.println("id:" + id + ";name:" + name + ";age:" + age);
        }
        rs.close();
        conn.close();
    }

Hbase计数器

Hbase计数器一般用于收集统计信息或点击量,他能做到实时统计,并且没有高并发的情况,相对于延时比较高的批处理操作更好

计数器的命令
	#执行以下命令每执行一次会在hits上加1,对表格并没有什么要求,第一次执行以下命令,就会把该列的默认值设为0,然后再加一
	incr 'mydb1:t7','row001','dayily:hits'
	#查看计数器列的值(get也可以,但是计数器的数据类型是int类型,在命令行显示的是16进制,可读性比较差,使用get_counter可以很清晰的显示)
	get_counter 'mydb1:t7','row001','dayily:hits'
	#指定增加的步长,这个步长可以为负数
	incr 'mydb1:t7','row001','dayily:hits',10
javaAPI操作计数器
	/**
     * 计数器
     * @throws Exception
     */
    public static void incr() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        //创建计数器对象指定行
        Increment increment = new Increment(Bytes.toBytes("row001"));
        //指定列族,列和步长
        increment.addColumn(Bytes.toBytes("dayily"),Bytes.toBytes("hits"),1);
        increment.addColumn(Bytes.toBytes("monthly"),Bytes.toBytes("hits"),1);
        increment.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),1);
        Table table = conn.getTable(TableName.valueOf("mydb1:t7"));
        table.increment(increment);
        conn.close();
    }

rowKey的设计

假设我们对全市的过车数据进行设计rowkey,总共有10台服务器

  1. 过车数据的数据业务一般跟过车时间,车牌号码,卡口ID有关,我们用过车时间,车牌号码,卡口ID设计rowKey
  2. 计算分区编号,共十台服务器,我们把过车时间,车牌号,卡口ID组合起来,求出hash值,然后用hash值与10求余,得到0-9的编号,根据编号可以把数据对应放在不同服务器上,防止热点问题
  3. rowkey组合:分区编号,过车时间,车牌号,卡口ID
  4. 上面我们要查询过车时间,车牌号都是可以的,但是如果要查询卡口的过车数据就很麻烦,那么我们可以创建一个索引表,rowKey也按照上面的组合,但是要卡口ID方在分区编号的后面,然后把主表对应的rowKey作为值放在索引表里面

布隆过滤器

布隆过滤器是优化查询的一种手段,可以通过布隆过滤器判断一个分区中是否包含你查询的rowKey,如果这个文件不包含,它会给予你明确的答复,如果包含,它会给你一个可能性的答复,因此他可以提神get查询的性能,避免了全表扫描
创建一个带有布隆过滤器的表:

	public static void createBloomFilter() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        TableName tn = TableName.valueOf("mydb2:t3");
        TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn);
        //添加列族f1
        ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1"));
        //设置布隆过滤器,NONE:不启用布隆过滤器(默认值),ROW:行级别布隆过滤器(推荐使用),ROWCOL:行和列级别(消耗资源比较大,不推荐使用)
        family.setBloomFilterType(BloomType.ROW);
        table.setColumnFamily(family);
        admin.createTable(table);
        conn.close();
    }

创建完成后,用desc查看表格描述,可以看到BLOOMFILTER => 'ROW'这样一段代码

sql可视化界面

  1. 下载apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz
  2. 将其上传到服务器,并解压
  3. 复制phoenix-5.0.0-HBase-2.0-server.jar到Hbase的lib的目录下,并且分发
  4. 重启Hbase
  5. 进入/apache-phoenix-5.0.0-HBase-2.0-bin/bin目录下输入./sqlline.py Centos100连接到zk服务器,进入phoenix客户端,输入以下命令
    	#查看表
    	!tables
    	#查看帮助
    	!help
    	#创建表,必须指定primary key,这里主键就是rowKey
    	!sql create table mydb1.person(id varchar(10) primary key,name varchar(100),age Integer);
    	#删除表
    	!sql drop table mydb1.person;
    	#查看表结构
    	!describe TEST;
    	#插入数据修改数据
    	upsert into TEST(id,name) values ('row005','lisi');
    	#删除数据
    	delete from TEST where id = 'row005';
    	#退出
    	!quit
    
  6. 查看表格在Hbase中的位置以及表的描述
    在这里插入图片描述
  7. 下载squirrel-sql-4.1.0-standard.jar
  8. 双击squirrel-sql-4.1.0-standard.jar安装squirrel(直接下一步到底)
  9. 将phoenix-5.0.0-HBase-2.0-client.jar复制到squirrel的安装目录下
  10. 打开 squirrel
  11. 点击下方按钮安装驱动
    在这里插入图片描述
  12. 添加数据库连接
    在这里插入图片描述
  13. 结构
    在这里插入图片描述
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐