目录

一 什么是HBASE

二 安装HBASE

三 hbase初体验

四 HBASE客户端API操作

五 HBASE运行原理

5.1 master职责

5.2 Region Server 职责

5.3 zookeeper集群所起作用

5.4 HBASE读写数据流程

5.4.1 写数据流程

5.4.2 读数据流程

5.4.3 数据flush过程

5.4.4 数据合并过程

5.5 hbase:meta表

5.6 Region Server内部机制

六.HBASE优化

6.1 高可用

6.2 预分区

6.3 RowKey设计

6.4 内存优化

6.5 基础优化


一 什么是HBASE

HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。

HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。

HBASE与mysql、oralce、db2、sqlserver等关系型数据库不同,它是一个NoSQL数据库(非关系型数据库)

  1. Hbase的表模型与关系型数据库的表模型不同:
  2. Hbase的表没有固定的字段定义;
  3. Hbase的表中每行存储的都是一些key-value对
  4. Hbase的表中有列族的划分,用户可以指定将哪些kv插入哪个列族
  5. Hbase的表在物理存储上,是按照列族来分割的,不同列族的数据一定存储在不同的文件中
  6. Hbase的表中的每一行都固定有一个行键,而且每一行的行键在表中不能重复
  7. Hbase中的数据,包含行键,包含key,包含value,都是byte[ ]类型,hbase不负责为用户维护数据类型
  8. HBASE对事务的支持很差

HBASE相比于其他nosql数据库(mongodb、redis、cassendra、hazelcast)的特点:

Hbase的表数据存储在HDFS文件系统中

从而,hbase具备如下特性:存储容量可以线性扩展; 数据存储的安全性可靠性极高!

二 安装HBASE

HBASE是一个分布式系统

其中有一个管理角色:  HMaster(一般2台,一台active,一台backup)

其他的数据节点角色:  HRegionServer(很多台,看数据容量)

2.1 安装准备

需要先有一个java环境

首先,要有一个HDFS集群,并正常运行; regionserver应该跟hdfs中的datanode在一起

其次,还需要一个zookeeper集群,并正常运行

然后,安装HBASE

角色分配如下:

Hdp01:  namenode  datanode  regionserver  hmaster  zookeeper

Hdp02:  datanode   regionserver  zookeeper

Hdp03:  datanode   regionserver  zookeeper

2.2 安装步骤

解压hbase安装包

修改hbase-env.sh

export JAVA_HOME=/root/apps/jdk1.7.0_67

export HBASE_MANAGES_ZK=false

修改hbase-site.xml

<configuration>

<!-- 指定hbase在HDFS上存储的路径 -->

        <property>

                <name>hbase.rootdir</name>

                <value>hdfs://hdp01:9000/hbase</value>

        </property>

<!-- 指定hbase是分布式的 -->

        <property>

                <name>hbase.cluster.distributed</name>

                <value>true</value>

        </property>

<!-- 指定zk的地址,多个用“,”分割 -->

        <property>

                <name>hbase.zookeeper.quorum</name>

                <value>hdp01:2181,hdp02:2181,hdp03:2181</value>

        </property>

</configuration>

修改 regionservers

hdp01

hdp02

hdp03

2.3 启动hbase集群

bin/start-hbase.sh

启动完后,还可以在集群中找任意一台机器启动一个备用的master

bin/hbase-daemon.sh start master

新启的这个master会处于backup状态

三 hbase初体验

3.1 启动hbase命令行客户端

bin/hbase shell

Hbase> list     // 查看表

Hbase> status   // 查看集群状态

Hbase> version  // 查看集群版本

3.2 hbase表模型的特点

  1. 一个表,有表名
  2. 一个表可以分为多个列族(不同列族的数据会存储在不同文件中)
  3. 表中的每一行有一个“行键rowkey”,而且行键在表中不能重复
  4. 表中的每一对kv数据称作一个cell
  5. hbase可以对数据存储多个历史版本(历史版本数量可配置)
  6. 整张表由于数据量过大,会被横向切分成若干个region(用rowkey范围标识),不同region的数据也存储在不同文件中
  7. hbase会对插入的数据按顺序存储:

     要点一:首先会按行键排序

     要点二:同一行里面的kv会按列族排序,再按k排序

3.3 hbase的表中能存储什么数据类型

hbase中只支持byte[]

此处的byte[] 包括了: rowkey,key,value,列族名,表名

3.4 hbase命令行客户端操作

名称

命令表达式

创建表

create '表名', '列族名1','列族名2','列族名N'

查看所有表

list

描述表

describe  ‘表名’

判断表存在

exists  '表名'

判断是否禁用启用表

is_enabled '表名'

is_disabled ‘表名’

添加记录      

put  ‘表名’, ‘rowKey’, ‘列族 : 列‘  ,  '值'

查看记录rowkey下的所有数据

get  '表名' , 'rowKey'

查看表中的记录总数

count  '表名'

获取某个列族

get '表名','rowkey','列族'

获取某个列族的某个列

get '表名','rowkey','列族:列’

删除记录

delete  ‘表名’ ,‘行名’ , ‘列族:列'

删除整行

deleteall '表名','rowkey'

删除一张表

先要屏蔽该表,才能对该表进行删除

第一步 disable ‘表名’ ,第二步  drop '表名'

清空表

truncate '表名'

查看所有记录

scan "表名"  

查看某个表某个列中所有数据

scan "表名" , {COLUMNS=>'列族名:列名'}

更新记录

就是重写一遍,进行覆盖,hbase没有修改,都是追加

3.4.1 建表

create 't_user_info','base_info','extra_info'

                      表名      列族名   列族名

3.4.2 插入数据

hbase(main):011:0> put 't_user_info','001','base_info:username','zhangsan'

0 row(s) in 0.2420 seconds

hbase(main):012:0> put 't_user_info','001','base_info:age','18'

0 row(s) in 0.0140 seconds

hbase(main):013:0> put 't_user_info','001','base_info:sex','female'

0 row(s) in 0.0070 seconds

hbase(main):014:0> put 't_user_info','001','extra_info:career','it'

0 row(s) in 0.0090 seconds

hbase(main):015:0> put 't_user_info','002','extra_info:career','actoress'

0 row(s) in 0.0090 seconds

hbase(main):016:0> put 't_user_info','002','base_info:username','liuyifei'

0 row(s) in 0.0060 seconds

3.4.3 查询方式一 scan扫描

hbase(main):017:0> scan 't_user_info'

ROW                               COLUMN+CELL                                                                                     

 001                              column=base_info:age, timestamp=1496567924507, value=18                                         

 001                              column=base_info:sex, timestamp=1496567934669, value=female                                     

 001                              column=base_info:username, timestamp=1496567889554, value=zhangsan                              

 001                              column=extra_info:career, timestamp=1496567963992, value=it                                     

 002                              column=base_info:username, timestamp=1496568034187, value=liuyifei                              

 002                              column=extra_info:career, timestamp=1496568008631, value=actoress    

3.4.4 查询方式二 get单行数据

hbase(main):020:0> get 't_user_info','001'

COLUMN                            CELL                                                                                            

 base_info:age                    timestamp=1496568160192, value=19                                                               

 base_info:sex                    timestamp=1496567934669, value=female                                                           

 base_info:username               timestamp=1496567889554, value=zhangsan                                                         

 extra_info:career                timestamp=1496567963992, value=it                                                               

4 row(s) in 0.0770 seconds

3.4.5 删除一个kv数据

hbase(main):021:0> delete 't_user_info','001','base_info:sex'

0 row(s) in 0.0390 seconds

删除整行数据

hbase(main):024:0> deleteall 't_user_info','001'

0 row(s) in 0.0090 seconds

hbase(main):025:0> get 't_user_info','001'

COLUMN                            CELL                                                                                            

0 row(s) in 0.0110 seconds

3.4.6 删除整个表

hbase(main):028:0> disable 't_user_info'

0 row(s) in 2.3640 seconds

hbase(main):029:0> drop 't_user_info'

0 row(s) in 1.2950 seconds

hbase(main):030:0> list

TABLE                                                                                                                             

0 row(s) in 0.0130 seconds

=> []

3.5 Hbase重要特性-排序特性(行键)

与nosql数据库们一样,row key是用来检索记录的主键。访问HBASE table中的行,只有三种方式:

1.通过单个row key访问

2.通过row key的range(正则)

3.全表扫描

Row key行键 (Row key)可以是任意字符串(最大长度 是 64KB,实际应用中长度一般为 10-100bytes),在HBASE内部,row key保存为字节数组。存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

插入到hbase中去的数据,hbase会自动排序存储:

排序规则:  首先看行键,然后看列族名,然后看列(key)名; 按字典顺序

Hbase的这个特性跟查询效率有极大的关系

比如:一张用来存储用户信息的表,有名字,户籍,年龄,职业....等信息

然后,在业务系统中经常需要:

查询某个省的所有用户

经常需要查询某个省的指定姓的所有用户

思路:如果能将相同省的用户在hbase的存储文件中连续存储,并且能将相同省中相同姓的用户连续存储,那么,上述两个查询需求的效率就会提高!!!

做法:将查询条件拼到rowkey内

四 HBASE客户端API操作

4.1 简洁版

HbaseClientDDL 

package cn.hbase.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.junit.Before;
import org.junit.Test;


/**
 *  
 *  1、构建连接
 *  2、从连接中取到一个表DDL操作工具admin
 *  3、admin.createTable(表描述对象);
 *  4、admin.disableTable(表名);
	5、admin.deleteTable(表名);
	6、admin.modifyTable(表名,表描述对象);	
 *  
 *
 */
public class HbaseClientDDL {
	Connection conn = null;
	
	@Before
	public void getConn() throws Exception{
		// 构建一个连接对象
		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
		
		conn = ConnectionFactory.createConnection(conf);
	}
	
	
	
	/**
	 * DDL
	 * @throws Exception 
	 */
	@Test
	public void testCreateTable() throws Exception{

		// 从连接中构造一个DDL操作器
		Admin admin = conn.getAdmin();
		
		// 创建一个表定义描述对象
		HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("user_info"));
		
		// 创建列族定义描述对象
		HColumnDescriptor hColumnDescriptor_1 = new HColumnDescriptor("base_info");
		hColumnDescriptor_1.setMaxVersions(3); // 设置该列族中存储数据的最大版本数,默认是1
		
		HColumnDescriptor hColumnDescriptor_2 = new HColumnDescriptor("extra_info");
		
		// 将列族定义信息对象放入表定义对象中
		hTableDescriptor.addFamily(hColumnDescriptor_1);
		hTableDescriptor.addFamily(hColumnDescriptor_2);
		
		
		// 用ddl操作器对象:admin 来建表
		admin.createTable(hTableDescriptor);
		
		// 关闭连接
		admin.close();
		conn.close();
		
	}
	
	
	/**
	 * 删除表
	 * @throws Exception 
	 */
	@Test
	public void testDropTable() throws Exception{
		
		Admin admin = conn.getAdmin();
		
		// 停用表
		admin.disableTable(TableName.valueOf("user_info"));
		// 删除表
		admin.deleteTable(TableName.valueOf("user_info"));
		
		
		admin.close();
		conn.close();
	}
	
	// 修改表定义--添加一个列族
	@Test
	public void testAlterTable() throws Exception{
		
		Admin admin = conn.getAdmin();
		
		// 取出旧的表定义信息
		HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("user_info"));
		
		
		// 新构造一个列族定义
		HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("other_info");
		hColumnDescriptor.setBloomFilterType(BloomType.ROWCOL); // 设置该列族的布隆过滤器类型
		
		// 将列族定义添加到表定义对象中
		tableDescriptor.addFamily(hColumnDescriptor);
		
		
		// 将修改过的表定义交给admin去提交
		admin.modifyTable(TableName.valueOf("user_info"), tableDescriptor);
		
		
		admin.close();
		conn.close();
		
	}
	
	
	/**
	 * DML -- 数据的增删改查
	 */
	
	

}

HbaseClientDML

package cn.hbase.demo;


import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class HbaseClientDML {
	Connection conn = null;
	
	@Before
	public void getConn() throws Exception{
		// 构建一个连接对象
		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
		
		conn = ConnectionFactory.createConnection(conf);
	}
	
	
	/**
	 * 增
	 * 改:put来覆盖
	 * @throws Exception 
	 */
	@Test
	public void testPut() throws Exception{
		
		// 获取一个操作指定表的table对象,进行DML操作
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		// 构造要插入的数据为一个Put类型(一个put对象只能对应一个rowkey)的对象
		Put put = new Put(Bytes.toBytes("001"));
		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("张三"));
		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
		put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("北京"));
		
		
		Put put2 = new Put(Bytes.toBytes("002"));
		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("李四"));
		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("28"));
		put2.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("上海"));
	
		
		ArrayList<Put> puts = new ArrayList<>();
		puts.add(put);
		puts.add(put2);
		
		
		// 插进去
		table.put(puts);
		
		table.close();
		conn.close();
		
	}
	
	
	/**
	 * 循环插入大量数据
	 * @throws Exception 
	 */
	@Test
	public void testManyPuts() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		ArrayList<Put> puts = new ArrayList<>();
		
		for(int i=0;i<100000;i++){
			Put put = new Put(Bytes.toBytes(""+i));
			put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("张三"+i));
			put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes((18+i)+""));
			put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("北京"));
			
			puts.add(put);
		}
		
		table.put(puts);
		
	}
	
	/**
	 * 删
	 * @throws Exception 
	 */
	@Test
	public void testDelete() throws Exception{
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		// 构造一个对象封装要删除的数据信息
		Delete delete1 = new Delete(Bytes.toBytes("001"));
		
		Delete delete2 = new Delete(Bytes.toBytes("002"));
		delete2.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"));
		
		ArrayList<Delete> dels = new ArrayList<>();
		dels.add(delete1);
		dels.add(delete2);
		
		table.delete(dels);
		
		
		table.close();
		conn.close();
	}
	
	/**
	 * 查
	 * @throws Exception 
	 */
	@Test
	public void testGet() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		Get get = new Get("002".getBytes());
		
		Result result = table.get(get);
		
		// 从结果中取用户指定的某个key的value
		byte[] value = result.getValue("base_info".getBytes(), "age".getBytes());
		System.out.println(new String(value));
		
		System.out.println("-------------------------");
		
		// 遍历整行结果中的所有kv单元格
		CellScanner cellScanner = result.cellScanner();
		while(cellScanner.advance()){
			Cell cell = cellScanner.current();
			
			byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
			byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
			byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
			byte[] valueArray = cell.getValueArray(); // value的字节数组
			
			System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
			System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
			System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
			System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
			
		}
		
		table.close();
		conn.close();
		
	}
	
	
	/**
	 * 按行键范围查询数据
	 * @throws Exception 
	 */
	@Test
	public void testScan() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		// 包含起始行键,不包含结束行键,但是如果真的想查询出末尾的那个行键,那么,可以在末尾行键上拼接一个不可见的字节(\000)
		Scan scan = new Scan("10".getBytes(), "10000\001".getBytes());
		
		ResultScanner scanner = table.getScanner(scan);
		
		Iterator<Result> iterator = scanner.iterator();
		
		while(iterator.hasNext()){
			
			Result result = iterator.next();
			// 遍历整行结果中的所有kv单元格
			CellScanner cellScanner = result.cellScanner();
			while(cellScanner.advance()){
				Cell cell = cellScanner.current();
				
				byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
				byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
				byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
				byte[] valueArray = cell.getValueArray(); // value的字节数组
				
				System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
				System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
				System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
				System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
			}
			System.out.println("----------------------");
		}
	}
	
	@Test
	public void test(){
		String a = "000";
		String b = "000\0";
		
		System.out.println(a);
		System.out.println(b);
		
		
		byte[] bytes = a.getBytes();
		byte[] bytes2 = b.getBytes();
		
		System.out.println("");
		
	}
	
	

}

4.2 完整版

package com.zgcbank.hbase;

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HbaseTest {

	/**
	 * 配置ss
	 */
	static Configuration config = null;
	private Connection connection = null;
	private Table table = null;

	@Before
	public void init() throws Exception {
		config = HBaseConfiguration.create();// 配置
		config.set("hbase.zookeeper.quorum", "master,work1,work2");// zookeeper地址
		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
		connection = ConnectionFactory.createConnection(config);
		table = connection.getTable(TableName.valueOf("user"));
	}

	/**
	 * 创建一个表
	 * 
	 * @throws Exception
	 */
	@Test
	public void createTable() throws Exception {
		// 创建表管理类
		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
		// 创建表描述类
		TableName tableName = TableName.valueOf("test3"); // 表名称
		HTableDescriptor desc = new HTableDescriptor(tableName);
		// 创建列族的描述类
		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
		// 将列族添加到表中
		desc.addFamily(family);
		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
		// 将列族添加到表中
		desc.addFamily(family2);
		// 创建表
		admin.createTable(desc); // 创建表
	}

	@Test
	@SuppressWarnings("deprecation")
	public void deleteTable() throws MasterNotRunningException,
			ZooKeeperConnectionException, Exception {
		HBaseAdmin admin = new HBaseAdmin(config);
		admin.disableTable("test3");
		admin.deleteTable("test3");
		admin.close();
	}

	/**
	 * 向hbase中增加数据
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings({ "deprecation", "resource" })
	@Test
	public void insertData() throws Exception {
		table.setAutoFlushTo(false);
		table.setWriteBufferSize(534534534);
		ArrayList<Put> arrayList = new ArrayList<Put>();
		for (int i = 21; i < 50; i++) {
			Put put = new Put(Bytes.toBytes("1234"+i));
			put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("wangwu"+i));
			put.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes(1234+i));
			arrayList.add(put);
		}
		
		//插入数据
		table.put(arrayList);
		//提交
		table.flushCommits();
	}

	/**
	 * 修改数据
	 * 
	 * @throws Exception
	 */
	@Test
	public void uodateData() throws Exception {
		Put put = new Put(Bytes.toBytes("1234"));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("namessss"), Bytes.toBytes("lisi1234"));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes(1234));
		//插入数据
		table.put(put);
		//提交
		table.flushCommits();
	}

	/**
	 * 删除数据
	 * 
	 * @throws Exception
	 */
	@Test
	public void deleteDate() throws Exception {
		Delete delete = new Delete(Bytes.toBytes("1234"));
		table.delete(delete);
		table.flushCommits();
	}

	/**
	 * 单条查询
	 * 
	 * @throws Exception
	 */
	@Test
	public void queryData() throws Exception {
		Get get = new Get(Bytes.toBytes("1234"));
		Result result = table.get(get);
		System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
		System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("namessss"))));
		System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex"))));
	}

	/**
	 * 全表扫描
	 * 
	 * @throws Exception
	 */
	@Test
	public void scanData() throws Exception {
		Scan scan = new Scan();
		//scan.addFamily(Bytes.toBytes("info"));
		//scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("password"));
		scan.setStartRow(Bytes.toBytes("wangsf_0"));
		scan.setStopRow(Bytes.toBytes("wangwu"));
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
		}
	}

	/**
	 * 全表扫描的过滤器
	 * 列值过滤器
	 * 
	 * @throws Exception
	 */
	@Test
	public void scanDataByFilter1() throws Exception {

		// 创建全表扫描的scan
		Scan scan = new Scan();
		//过滤器:列值过滤器
		SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),
				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
				Bytes.toBytes("zhangsan2"));
		// 设置过滤器
		scan.setFilter(filter);

		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
		}

	}
	/**
	 * rowkey过滤器
	 * @throws Exception
	 */
	@Test
	public void scanDataByFilter2() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//匹配rowkey以wangsenfeng开头的
		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^12341"));
		// 设置过滤器
		scan.setFilter(filter);
		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
		}

		
	}
	
	/**
	 * 匹配列名前缀
	 * @throws Exception
	 */
	@Test
	public void scanDataByFilter3() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//匹配rowkey以wangsenfeng开头的
		ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));
		// 设置过滤器
		scan.setFilter(filter);
		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println("rowkey:" + Bytes.toString(result.getRow()));
			System.out.println("info:name:"
					+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
							Bytes.toBytes("name"))));
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
				System.out.println("info:age:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
								Bytes.toBytes("age"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
				System.out.println("infi:sex:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
								Bytes.toBytes("sex"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
				System.out
				.println("info2:name:"
						+ Bytes.toString(result.getValue(
								Bytes.toBytes("info2"),
								Bytes.toBytes("name"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
				System.out.println("info2:age:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
								Bytes.toBytes("age"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
				System.out.println("info2:sex:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
								Bytes.toBytes("sex"))));
			}
		}
		
	}
	/**
	 * 过滤器集合
	 * @throws Exception
	 */
	@Test
	public void scanDataByFilter4() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
		FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
		//匹配rowkey以wangsenfeng开头的
		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^wangsenfeng"));
		//匹配name的值等于wangsenfeng
		SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info"),
				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
				Bytes.toBytes("zhangsan"));
		filterList.addFilter(filter);
		filterList.addFilter(filter2);
		// 设置过滤器
		scan.setFilter(filterList);
		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println("rowkey:" + Bytes.toString(result.getRow()));
			System.out.println("info:name:"
					+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
							Bytes.toBytes("name"))));
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
				System.out.println("info:age:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
								Bytes.toBytes("age"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
				System.out.println("infi:sex:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
								Bytes.toBytes("sex"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
				System.out
				.println("info2:name:"
						+ Bytes.toString(result.getValue(
								Bytes.toBytes("info2"),
								Bytes.toBytes("name"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
				System.out.println("info2:age:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
								Bytes.toBytes("age"))));
			}
			// 判断取出来的值是否为空
			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
				System.out.println("info2:sex:"
						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
								Bytes.toBytes("sex"))));
			}
		}
		
	}

	@After
	public void close() throws Exception {
		table.close();
		connection.close();
	}

}

4.3 MapReduce操作Hbase

4.3.1 实现方法

Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。

1、写个mapper继承TableMapper<Text, IntWritable>

参数:Text:mapper的输出key类型; IntWritable:mapper的输出value类型。

      其中的map方法如下:

map(ImmutableBytesWritable key, Result value,Context context)

 参数:key:rowKey;value: Result ,一行数据; context上下文

2、写个reduce继承TableReducer<Text, IntWritable, ImmutableBytesWritable>

参数:Text:reducer的输入key; IntWritable:reduce的输入value;

 ImmutableBytesWritable:reduce输出到hbase中的rowKey类型。

      其中的reduce方法如下:

reduce(Text key, Iterable<IntWritable> values,Context context)

参数: key:reduce的输入key;values:reduce的输入value;

4.3.2 准备表

1、建立数据来源表‘word’,包含一个列族‘content’

向表中添加数据,在列族中放入列‘info’,并将短文数据放入该列中,如此插入多行,行键为不同的数据即可

2、建立输出表‘stat’,包含一个列族‘content’

3、通过Mr操作Hbase的‘word’表,对‘content:info’中的短文做词频统计,并将统计结果写入‘stat’表的‘content:info中’,行键为单词

4.3.3 实现

package com.zgcbank.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
/**
 * mapreduce操作hbase
 *
 */
public class HBaseMr {
	/**
	 * 创建hbase配置
	 */
	static Configuration config = null;
	static {
		config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
		config.set("hbase.zookeeper.property.clientPort", "2181");
	}
	/**
	 * 表信息
	 */
	public static final String tableName = "word";//表名1
	public static final String colf = "content";//列族
	public static final String col = "info";//列
	public static final String tableName2 = "stat";//表名2
	/**
	 * 初始化表结构,及其数据
	 */
	public static void initTB() {
		HTable table=null;
		HBaseAdmin admin=null;
		try {
			admin = new HBaseAdmin(config);//创建表管理
			/*删除表*/
			if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {
				System.out.println("table is already exists!");
				admin.disableTable(tableName);
				admin.deleteTable(tableName);
				admin.disableTable(tableName2);
				admin.deleteTable(tableName2);
			}
			/*创建表*/
				HTableDescriptor desc = new HTableDescriptor(tableName);
				HColumnDescriptor family = new HColumnDescriptor(colf);
				desc.addFamily(family);
				admin.createTable(desc);
				HTableDescriptor desc2 = new HTableDescriptor(tableName2);
				HColumnDescriptor family2 = new HColumnDescriptor(colf);
				desc2.addFamily(family2);
				admin.createTable(desc2);
			/*插入数据*/
				table = new HTable(config,tableName);
				table.setAutoFlush(false);
				table.setWriteBufferSize(500);
				List<Put> lp = new ArrayList<Put>();
				Put p1 = new Put(Bytes.toBytes("1"));
				p1.add(colf.getBytes(), col.getBytes(),	("The Apache Hadoop software library is a framework").getBytes());
				lp.add(p1);
				Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());
				lp.add(p2);
				Put p3 = new Put(Bytes.toBytes("3"));
				p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());
				lp.add(p3);
				Put p4 = new Put(Bytes.toBytes("4"));
				p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());
				lp.add(p4);
				Put p5 = new Put(Bytes.toBytes("5"));
				p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());
				lp.add(p5);
				table.put(lp);
				table.flushCommits();
				lp.clear();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if(table!=null){
					table.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	/**
	 * MyMapper 继承 TableMapper
	 * TableMapper<Text,IntWritable> 
	 * Text:输出的key类型,
	 * IntWritable:输出的value类型
	 */
	public static class MyMapper extends TableMapper<Text, IntWritable> {
		private static IntWritable one = new IntWritable(1);
		private static Text word = new Text();
		@Override
		//输入的类型为:key:rowKey; value:一行数据的结果集Result
		protected void map(ImmutableBytesWritable key, Result value,
				Context context) throws IOException, InterruptedException {
			//获取一行数据中的colf:col
			String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表里面只有一个列族,所以我就直接获取每一行的值
			//按空格分割
			String itr[] = words.toString().split(" ");
			//循环输出word和1
			for (int i = 0; i < itr.length; i++) {
				word.set(itr[i]);
				context.write(word, one);
			}
		}
	}
	/**
	 * MyReducer 继承 TableReducer
	 * TableReducer<Text,IntWritable> 
	 * Text:输入的key类型,
	 * IntWritable:输入的value类型,
	 * ImmutableBytesWritable:输出类型,表示rowkey的类型
	 */
	public static class MyReducer extends
			TableReducer<Text, IntWritable, ImmutableBytesWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//对mapper的数据求和
			int sum = 0;
			for (IntWritable val : values) {//叠加
				sum += val.get();
			}
			// 创建put,设置rowkey为单词
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 封装数据
			put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
			//写到hbase,需要指定rowkey、put
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
		}
	}
	
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		config.set("df.default.name", "hdfs://master:9000/");//设置hdfs的默认路径
		config.set("hadoop.job.ugi", "hadoop,hadoop");//用户名,组
		config.set("mapred.job.tracker", "master:9001");//设置jobtracker在哪
		//初始化表
		initTB();//初始化表
		//创建job
		Job job = new Job(config, "HBaseMr");//job
		job.setJarByClass(HBaseMr.class);//主类
		//创建scan
		Scan scan = new Scan();
		//可以指定查询某一列
		scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));
		//创建查询hbase的mapper,设置表名、scan、mapper类、mapper的输出key、mapper的输出value
		TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);
		//创建写入hbase的reducer,指定表名、reducer类、job
		TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

五 HBASE运行原理

5.1 master职责

1.管理监控HRegionServer,实现其负载均衡。

2.处理region的分配或转移,比如在HRegion split时分配新的HRegion;在HRegionServer退出时迁移其负责的HRegion到其他        HRegionServer上。

3.处理元数据的变更

4.管理namespace和table的元数据(实际存储在HDFS上)。

5.权限控制(ACL)。

6.监控集群中所有HRegionServer的状态(通过Heartbeat和监听ZooKeeper中的状态)。

5.2 Region Server 职责

  1. 管理自己所负责的region数据的读写。
  2. 读写HDFS,管理Table中的数据。
  3. Client直接通过HRegionServer读写数据(从HMaster中获取元数据,找到RowKey所在的HRegion/HRegionServer后)。
  4. 刷新缓存到HDFS
  5. 维护Hlog
  6. 执行压缩
  7. 负责处理Region分片

5.3 zookeeper集群所起作用

  1. 存放整个HBase集群的元数据以及集群的状态信息。
  2. 实现HMaster主从节点的failover。

注: HMaster通过监听ZooKeeper中的Ephemeral节点(默认:/hbase/rs/*)来监控HRegionServer的加入和宕机。

在第一个HMaster连接到ZooKeeper时会创建Ephemeral节点(默认:/hbasae/master)来表示Active的HMaster,其后加进来的HMaster则监听该Ephemeral节点

如果当前Active的HMaster宕机,则该节点消失,因而其他HMaster得到通知,而将自身转换成Active的HMaster,在变为Active的HMaster之前,它会在/hbase/masters/下创建自己的Ephemeral节点。

5.4 HBASE读写数据流程

5.4.1 写数据流程

客户端现在要插入一条数据,rowkey=r000001, 这条数据应该写入到table表中的那个region中呢?

1/ 客户端要连接zookeeper, 从zk的/hbase节点找到hbase:meta表所在的regionserver(host:port);

2/ regionserver扫描hbase:meta中的每个region的起始行健,对比r000001这条数据在那个region的范围内;

3/ 从对应的 info:server key中存储了region是有哪个regionserver(host:port)在负责的;

4/ 客户端直接请求对应的regionserver;

5/ regionserver接收到客户端发来的请求之后,就会将数据写入到region中

5.4.2 读数据流程

客户端现在要查询rowkey=r000001这条数据,那么这个流程是什么样子的呢?

1/ 首先Client连接zookeeper, 找到hbase:meta表所在的regionserver;

2/ 请求对应的regionserver,扫描hbase:meta表,根据namespace、表名和rowkey在meta表中找到r00001所在的region是由那个regionserver负责的;

3/找到这个region对应的regionserver

4/ regionserver收到了请求之后,扫描对应的region返回数据到Client

(先从MemStore找数据,如果没有,再到BlockCache里面读;BlockCache还没有,再到StoreFile上读(为了读取的效率);

如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。)

注:客户会缓存这些位置信息,然而第二步它只是缓存当前RowKey对应的HRegion的位置,因而如果下一个要查的RowKey不在同一个HRegion中,则需要继续查询hbase:meta所在的HRegion,然而随着时间的推移,客户端缓存的位置信息越来越多,以至于不需要再次查找hbase:meta Table的信息,除非某个HRegion因为宕机或Split被移动,此时需要重新查询并且更新缓存。

5.4.3 数据flush过程

1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;

2)并将数据存储到HDFS中;

3)在HLog中做标记点。

5.4.4 数据合并过程

1)当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并;

2)当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理;

3)当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.;

4)注意:HLog会同步到HDFS。

5.5 hbase:meta表

hbase:meta表存储了所有用户HRegion的位置信息:

Rowkey:tableName,regionStartKey,regionId,replicaId等;

info列族:这个列族包含三个列,他们分别是:

info:regioninfo列:

regionId,tableName,startKey,endKey,offline,split,replicaId;

info:server列:HRegionServer对应的server:port;

info:serverstartcode列:HRegionServer的启动时间戳。

5.6 Region Server内部机制

  • WAL即Write Ahead Log,在早期版本中称为HLog,它是HDFS上的一个文件,如其名字所表示的,所有写操作都会先保证将数据写入这个Log文件后,才会真正更新MemStore,最后写入HFile中。WAL文件存储在/hbase/WALs/${HRegionServer_Name}的目录中
  • BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。
  • HRegion是一个Table中的一个Region在一个HRegionServer中的表达。一个Table可以有一个或多个Region,他们可以在一个相同的HRegionServer上,也可以分布在不同的HRegionServer上,一个HRegionServer可以有多个HRegion,他们分别属于不同的Table。HRegion由多个Store(HStore)构成,每个HStore对应了一个Table在这个HRegion中的一个Column Family,即每个Column Family就是一个集中的存储单元,因而最好将具有相近IO特性的Column存储在一个Column Family,以实现高效读取(数据局部性原理,可以提高缓存的命中率)。HStore是HBase中存储的核心,它实现了读写HDFS功能,一个HStore由一个MemStore 和0个或多个StoreFile组成。
  • MemStore是一个写缓存(In Memory Sorted Buffer),所有数据的写在完成WAL日志写后,会 写入MemStore中,由MemStore根据一定的算法将数据Flush到地层HDFS文件中(HFile),通常每个HRegion中的每个 Column Family有一个自己的MemStore。
  • HFile(StoreFile) 用于存储HBase的数据(Cell/KeyValue)。在HFile中的数据是按RowKey、Column Family、Column排序,对相同的Cell(即这三个值都一样),则按timestamp倒序排列。
  • FLUSH详述
  1. 每一次Put/Delete请求都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile(底层实现是HFile),即一个HStore(Column Family)可以有0个或多个StoreFile(HFile)。
  2. 当一个HRegion中的所有MemStore的大小总和超过了hbase.hregion.memstore.flush.size的大小,默认128MB。此时当前的HRegion中所有的MemStore会Flush到HDFS中。
  3. 当全局MemStore的大小超过了hbase.regionserver.global.memstore.upperLimit的大小,默认40%的内存使用量。此时当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush顺序是MemStore大小的倒序(一个HRegion中所有MemStore总和作为该HRegion的MemStore的大小还是选取最大的MemStore作为参考?有待考证),直到总体的MemStore使用量低于hbase.regionserver.global.memstore.lowerLimit,默认38%的内存使用量。
  4. 当前HRegionServer中WAL的大小超过了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush使用时间顺序,最早的MemStore先Flush直到WAL的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs这里说这两个相乘的默认大小是2GB,查代码,hbase.regionserver.max.logs默认值是32,而hbase.regionserver.hlog.blocksize默认是32MB。但不管怎么样,因为这个大小超过限制引起的Flush不是一件好事,可能引起长时间的延迟

六.HBASE优化

6.1 高可用

在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。

1.关闭HBase集群(如果没有开启则跳过此步)

[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh

2.在conf目录下创建backup-masters文件

[atguigu@hadoop102 hbase]$ touch conf/backup-masters

3.在backup-masters文件中配置高可用HMaster节点

[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters

4.将整个conf目录scp到其他节点

[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/

[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/

6.2 预分区

每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。

1.手动设定预分区

hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']

2.生成16进制序列预分区

create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

3.按照文件中设置的规则预分区

创建splits.txt文件内容如下:

aaaa

bbbb

cccc

dddd

然后执行:

create 'staff3','partition3',SPLITS_FILE => 'splits.txt'

4.使用JavaAPI创建预分区

//自定义算法,产生一系列Hash散列值存储在二维数组中

byte[][] splitKeys = 某个散列值函数

//创建HBaseAdmin实例

HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());

//创建HTableDescriptor实例

HTableDescriptor tableDesc = new HTableDescriptor(tableName);

//通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表

hAdmin.createTable(tableDesc, splitKeys);

6.3 RowKey设计

一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。

1.生成随机数、hash、散列值

比如:

原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7

原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd

原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913

在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。

2.字符串反转

20170524000001转成10000042507102

20170524000002转成20000042507102

3.字符串拼接

20170524000001_a12e

20170524000001_93i7

6.4 内存优化

HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

6.5 基础优化

1.允许在HDFS的文件中追加内容

hdfs-site.xml、hbase-site.xml

属性:dfs.support.append

解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。

2.优化DataNode允许的最大文件打开数

hdfs-site.xml

属性:dfs.datanode.max.transfer.threads

解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096

3.优化延迟高的数据操作的等待时间

hdfs-site.xml

属性:dfs.image.transfer.timeout

解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。

4.优化数据的写入效率

mapred-site.xml

属性:

mapreduce.map.output.compress

mapreduce.map.output.compress.codec

解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。

5.设置RPC监听数量

hbase-site.xml

属性:hbase.regionserver.handler.count

解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

6.优化HStore文件大小

hbase-site.xml

属性:hbase.hregion.max.filesize

解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。

7.优化hbase客户端缓存

hbase-site.xml

属性:hbase.client.write.buffer

解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。

8.指定scan.next扫描HBase所获取的行数

hbase-site.xml

属性:hbase.client.scanner.caching

解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。

9.flush、compact、split机制

当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。

涉及属性:

128M就是Memstore的默认阈值

hbase.hregion.memstore.flush.size:134217728

即:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。

hbase.regionserver.global.memstore.upperLimit:0.4

hbase.regionserver.global.memstore.lowerLimit:0.38

即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐