Hbase入门


官方地址:http://hbase.apache.org/

1.概述

HBase–HadoopDatabase,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库,利用HadoopHDFS作为其文件存储系统,利用HadoopMapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存NoSQL数据库)
文档:http://hbase.apache.org/book.html#_architecture

2.HBase体系架构

hbase体系结构图片

http://www.blogjava.net/DLevin/archive/2015/08/22/426877.html
http://blog.csdn.net/cnbird2008/article/details/9151585

  • client
    • 包含访问HBase的接口并维护cache来加快对HBase的访问
  • Zookeeper
    • 保证任何时候,集群中只有一个master
    • 存贮所有Region的寻址入口。
    • 实时监控Regionserver的上线和下线信息。并实时通知Master
    • 存储HBase的schema和table元数据
  • Master
    • 为Regionserver分配region
    • 负责Regionserver的负载均衡
    • 发现失效的Regionserver并重新分配其上的region
    • 管理用户对table的增删改操作
  • RegionServer
    • Regionserver维护region,处理对这些region的IO请求
    • Regionserver负责切分在运行过程中变得过大的region

3.HBase数据架构


hbase逻辑视图

文档 Conceptual View此处输入图片的描述

Region

HBase自动把表水平划分成多个区域(region),每个region会保存一个表
里面某段连续的数据;每个表一开始只有一个region,随着数据不断插
入表,region不断增大,当增大到一个阀值的时候,region就会等分会
两个新的region(裂变);
当table中的行不断增多,就会有越来越多的region。这样一张完整的表
被保存在多个Regionserver上

Memstore与storefile

一个region由多个store组成,一个store对应一个CF(列族)
store包括位于内存中的memstore和位于磁盘的storefile写操作先写入
memstore,当memstore中的数据达到某个阈值,hregionserver会启动
flashcache进程写入storefile,每次写入形成单独的一个storefile
当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、
major compaction),在合并过程中会进行版本合并和删除工作
(majar),形成更大的storefile。
当一个region所有storefile的大小和超过一定阈值后,会把当前的region
分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载
均衡客户端检索数据,先在memstore找,找不到再找storefile

4.HBase安装

以下使用hbase-1.1.3
单点、集群
http://blog.csdn.net/zwx19921215/article/details/41820199

0.99 伪分布式报错 升级1.1x

2015-04-29 08:58:14,967 FATAL [main] regionserver.RSRpcServices: The hostname of regionserver cannot be set to localhost in a fully-distributed setup...

hmaster ha
http://blog.csdn.net/u014516601/article/details/50252381

控制台
http://192.168.6.109:50070/explorer.html
http://192.168.6.109:16010/master-status

5.HBase命令行

http://blog.csdn.net/hanlipenghanlipeng/article/details/52742999
http://blog.csdn.net/qq_24908345/article/details/53230169

help
list_namespace
list
create 'student2','info'
put 'student2' ,'1','info:name','zhangsan'
put 'student2' ,'1','info:age',18
scan 'student2'
get 'student2','1'

文档:18 Shell Tricks

6.HBase java api

http://www.2cto.com/database/201503/381955.html

客户端无法连接错误

org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Mon Aug 14 18:08:08 CST 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=75627: row 'user,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=localhost,16201,1502704160313, seqNum=0

参考:http://www.cnblogs.com/jxhd1/p/6528621.html

解决方案:

服务器对应的host :
去掉hostname 对应的127.0.0.1的配置
修改为ip hostname
java客户端增加hostname的配置

文档例子

72. Examples
    package hbasetest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class test {
    public static String tableName = "user";
    public static Random ra = new Random();
    public static Configuration conf = null;

    @Before
    public void before() throws IOException {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.6.109");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

    @Test
    public void create() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf);
                Admin admin = connection.getAdmin()) {
            HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));
            table.addFamily(new HColumnDescriptor("cf1"));
            table.addFamily(new HColumnDescriptor("cf2"));

            System.out.print("Creating table. ");
            if (admin.tableExists(table.getTableName())) {
                admin.disableTable(table.getTableName());
                admin.deleteTable(table.getTableName());
            }
            admin.createTable(table);
            System.out.println(" Done.");
        }
    }

    public String getRowKey(String pre){
        return pre+ ra.nextInt(99999999)+"_2016"+ra.nextInt(12)+ra.nextInt(30)+ra.nextInt(24)+ra.nextInt(60)+ra.nextInt(60);
    }

    @Test
    public void insert() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf);
                Admin admin = connection.getAdmin()) {
            Table table = connection.getTable(TableName.valueOf(tableName));
            List<Put> list = new ArrayList<Put>();
            for(int i=0;i<10000;i++){
                Put put = new Put(getRowKey("138").getBytes());
                put.addColumn("cf1".getBytes(),"address".getBytes(),"北京".getBytes());
                put.addColumn("cf1".getBytes(),"type".getBytes(),String.valueOf(ra.nextInt(2)).getBytes());
                list.add(put);
            }
            System.out.println(" start.");
            table.put(list);
            System.out.println(" Done.");
        }
    }

    @Test
    public void search() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf);
                Admin admin = connection.getAdmin()) {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get("13820463159_201611812442".getBytes());
            get.addColumn("cf1".getBytes(),"address".getBytes());
            Result result = table.get(get);

            //for(KeyValue kv: result.list()){
                //System.out.println(Bytes.toString(kv.getFamily()));
               // System.out.println(Bytes.toString(kv.getQualifier()));
               // System.out.println(Bytes.toString(kv.getValue()));
               // System.out.println(kv.getTimestamp());
            //}
            Cell cell = result.getColumnLatestCell("cf1".getBytes(),
                "address".getBytes());

            System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
            System.out.println(" Done.");
        }

    }

    @Test
    public void find() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf);
                Admin admin = connection.getAdmin()) {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan("13825555024_201660221143".getBytes(),"13825682370_20165211124".getBytes());
            ResultScanner scanner = table.getScanner(scan);
            Iterator<Result> it = scanner.iterator();
            while(it.hasNext()){
                Result next = it.next();
                byte[] value = next.getValue("cf1".getBytes(), "address".getBytes());
                System.out.println(new String(value,"utf-8"));
            }
        }
    }

    @Test
    public void find2() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf);
                Admin admin = connection.getAdmin()) {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            PrefixFilter pf = new PrefixFilter("1389".getBytes());
            SingleColumnValueFilter sf = new SingleColumnValueFilter("cf1".getBytes(),"type".getBytes(), CompareOp.EQUAL,"1".getBytes());
            //过滤器的顺序影响效率
            fl.addFilter(pf);
            fl.addFilter(sf);
            scan.setFilter(fl);
            ResultScanner scanner = table.getScanner(scan);
            Iterator<Result> it = scanner.iterator();
            while(it.hasNext()){
                Result next = it.next();
                byte[] value = next.getValue("cf1".getBytes(), "address".getBytes());
                System.out.println(new String(value,"utf8"));
            }
            System.out.println(" Done.");
        }


    }

}

mapreduce读取写入hbase

8.读写流程

写数据流程

zookeeper中存储了meta表的region信息,从meta表获取相应region信息,然后找到meta表的数据
根据namespace、表名和rowkey根据meta表的数据找到写入数据对应的region信息
找到对应的regionserver
把数据分别写到HLog和MemStore上一份
MemStore达到一个阈值后则把数据刷成一个StoreFile文件。若MemStore中的数据有丢失,则可以总HLog上恢复
当多个StoreFile文件达到一定的大小后,会触发Compact合并操作,合并为一个StoreFile,这里同时进行版本的合并和数据删除。
当Compact后,逐步形成越来越大的StoreFIle后,会触发Split操作,把当前的StoreFile分成两个,这里相当于把一个大的region分割成两个region。如下图:

读数据流程

zookeeper中存储了meta表的region信息,所以先从zookeeper中找到meta表region的位置,然后读取meta表中的数据。meta中又存储了用户表的region信息。
根据namespace、表名和rowkey在meta表中找到对应的region信息
找到这个region对应的regionserver
查找对应的region
先从MemStore找数据,如果没有,再到StoreFile上读(为了读取的效率)。

更多见
http://www.blogjava.net/DLevin/archive/2015/08/22/426877.html

RegionServer的内存,在设置的时候,一般这样配置:
(1)MemStore ,约占40%的内存空间(主要用于写):
写请求会先写入memstore,RegionServer会给每个region提供一个memstore, memstore写满以后,会启动flush刷新到磁盘。当memstore的总大小超过限制时,会强行启动flush进程,从最大的memstore开始flush知道低于限制
(2)BlockCache,约占40%的内存空间(主要用于读):
读请求先到memstore中查数据,查不到就到blockCache中查,再查不到就到磁盘上读,并把读的结果放入blockCache。Blockcache采用lru算法,当blockcache达到上限值时,淘汰掉最近最久未使用的一批数据淘汰掉,每个regionserver只有一个blockcache
(3)其他,约占20%的内存空间。
在注重读响应时间的应用场景下,可以将blockcache设置的大一些,memstore设置的小一些,以加大缓存的命中率。
blockCache分级思想:
(1)首先通过inmemory类型cache,可以由选择地将inmemory的column famlies放到RegionServer内存中,例如meta元数据信息;
(2)通过区分Single和Multi类型的cache,可以防止由于Scan操作带来的频繁颠簸,将最少使用的block加入到淘汰算法中去。
默认配置下。对于整个BlockCache的内存,按照以下百分比分给Single、Multi、InMemory使用:0.25,0.50和0.25
http://blog.csdn.net/jollyjumper/article/details/19566829

Hbase容错和恢复
HLogFile
  HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是“写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
  HLog Sequece File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue.
该机制用于数据的容错和恢复:

每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中(HLog文件格式见后续),HLog文件定期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知到,HMaster首先会处理遗留的 HLog文件,将其中不同Region的Log数据进行拆分,分别放到相应region的目录下,然后再将失效的region重新分配,领取 到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。
HBase容错性
Master容错:Zookeeper重新选择一个新的Master
*无Master过程中,数据读取仍照常进行;
*无master过程中,region切分、负载均衡等无法进行;
RegionServer容错:定时向Zookeeper汇报心跳,如果一旦时间内未出现心跳,Master将该RegionServer上的Region重新分配到其他RegionServer上,失效服务器上“预写”日志由主服务器进行分割并派送给新的RegionServer
Zookeeper容错:Zookeeper是一个可靠地服务,一般配置3或5个Zookeeper实例

8.HBase sql查询

hive 、phoenix、impala
http://www.cnblogs.com/haoxinyue/p/6832964.html

8.1. 协处理器

http://blog.csdn.net/lifuxiangcaohui/article/details/39991183/

8.2. 二级索引

http://www.cnblogs.com/end/archive/2012/08/21/2648785.html
https://github.com/Huawei-Hadoop/hindex 华为hbase二级索引
360二级索引

Base的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。
常见的二级索引方案有以下几种:
1.MapReduce方案 延时性高
2.ITHBASE方案 不在维护
3.IHBASE方案
http://kabike.iteye.com/blog/2095696
4.Coprocessor方案
http://www.cnblogs.com/liuwei6/p/6837674.html
5.Solr/es+hbase方案
方案描述
ES+Hbase对接大致有两种方式,需要根据当前的业务场景做相应的选择,
方案1:
如果是对写入数据性能要求高的业务场景,那么一份数据先写到Hbase,然后再写到ES中,两个写入流程独立,这样可以达到性能最大,目前某公安厅使用该方案,每天需要写入数据200亿,6T数据,每个记录建20左右的索引。
缺点:可能存在数据的不一致性。
方案2:
这也是目前网上比较流行的方案,使用hbase的协处理监听数据在Hbase中的变动,实时的更新ES中的索引,
缺点是协处理器会影响Hbase的性能

与es
http://blog.csdn.net/xj90314/article/details/52817513
与solr
http://blog.csdn.net/u011462328/article/details/53008434

8.3. hive与hbase结合使用

实现:
http://blog.csdn.net/linlinv3/article/details/46534109

命令:
http://www.cnblogs.com/MOBIN/p/5704001.html
http://www.cnblogs.com/1130136248wlxk/articles/5517726.html

create external table hive_hbase(id int,name string,age int) stored
by ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’ with
serdeproperties (“hbase.columns.mapping” = “:key,f:name,f:age”)
tblproperties (“hbase.table.name” = “t_person”);

8.4. phoenix使用

Phoenix中SQL Query Plan的执行,基本上是通过构建一系列的Hbase scan来完成。

为了尽可能减少数据传输,在Region Server使用Coprocessor来尽可能的执行Aggregate相关工作,基本思想是使用RegionObserver在PostScannerOpen hook中将RegionScanner替换成支持Aggregation工作的定制化的Scanner,具体的Aggregate操作通过custom的scan属性传递给RegionScanner。与基于MapReduce的框架执行Plan的思想比较,基本上就是通过Coprocessor,使用RegionServer自身来在各个节点上执行Aggregation。

http://www.cnblogs.com/laov/p/4137136.html 安装与使用
http://blog.csdn.net/u011491148/article/details/45749807 phoenix二级索引使用

9.hbase优化

分区实现:
http://blog.csdn.net/linuxheik/article/details/52440431 命令
http://blog.csdn.net/javajxz008/article/details/51913471 代码
hbase rowkey设计:
http://blog.csdn.net/iwantknowwhat/article/details/51397815 热点key代码
http://blog.csdn.net/moxiaomomo/article/details/12993663
http://blog.csdn.net/javajxz008/article/details/51892967

问题?hash、salt之后如果利用scan特效
答:当对rowkey进行Hash散列后,rowkey就损失了原先的检索性能了。 对于rowkey中存在时间信息的数据,HBase权威指南是推荐参考OpenTSDB的案例的。 rowkey还是得针对自身数据的特点来设计的

major耗时,关闭自动major:
1、 关闭自动major compaction
2、 手动编程major compaction
contab脚本在低谷时期执行

htable:
HTable是HBase客户端与HBase服务端通讯的Java API对象,客户端可以通过HTable对象与服务端进行CRUD操作(增删改查)。它的创建很简单:
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, “tablename”);
//TODO CRUD Operation……
HTable使用时的一些注意事项:

规避HTable对象的创建开销

因为客户端创建HTable对象后,需要进行一系列的操作:检查.META.表确认指定名称的HBase表是否存在,表是否有效等等,整个时间开销比较重,可能会耗时几秒钟之长,因此最好在程序启动时一次性创建完成需要的HTable对象,如果使用Java API,一般来说是在构造函数中进行创建,程序启动后直接重用。

HTable对象不是线程安全的
HTable对象对于客户端读写数据来说不是线程安全的,因此多线程时,要为每个线程单独创建复用一个HTable对象,不同对象间不要共享HTable对象使用,特别是在客户端auto flash被置为false时,由于存在本地write buffer,可能导致数据不一致。

HTable对象之间共享Configuration
HTable对象共享Configuration对象,这样的好处在于:
共享ZooKeeper的连接:每个客户端需要与ZooKeeper建立连接,查询用户的table regions位置,这些信息可以在连接建立后缓存起来共享使用;
共享公共的资源:客户端需要通过ZooKeeper查找-ROOT-和.META.表,这个需要网络传输开销,客户端缓存这些公共资源后能够减少后续的网络传输开销,加快查找过程速度。
因此,与以下这种方式相比:
HTable table1 = new HTable(“table1”);
HTable table2 = new HTable(“table2”);
下面的方式更有效些:
Configuration conf = HBaseConfiguration.create();
HTable table1 = new HTable(conf, “table1”);
HTable table2 = new HTable(conf, “table2”);
备注:即使是高负载的多线程程序,也并没有发现因为共享Configuration而导致的性能问题;如果你的实际情况中不是如此,那么可以尝试不共享Configuration。

使用POOL:
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, 10);

TablePool的使用很简单:每次进行操作前,通过HTablePool的getTable方法取得一个HTable对象,然后进行put/get/scan/delete等操作,最后通过HTablePool的putTable方法将HTable对象放回到HTablePool中。

待学习
protocol buffer
https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
https://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
hbase宕机恢复


问题:
不能put –》》解决方法:重启 stop-hbase.sh 有时候无法关闭进程,必须手动kill

hbase(main):005:0> put ‘t_person’,’1’,’f:name’,’zhn’ 2017-08-15
11:50:09,359 ERROR [main] client.AsyncProcess: Failed to get region
location org.apache.hadoop.hbase.client.NoServerForRegionException:
No server address listed in hbase:meta for region
t_person,,1502768817681.7f18a0f86a01939abccd0ff5b81c6112. containing
row 1

hbase shell: list命令报错 –》解决方法:重启

hbase日志
2017-08-15 12:32:26,434 ERROR [main] zookeeper.RecoverableZooKeeper:
ZooKeeper exists failed after 4 attempts 2017-08-15 12:32:26,436 WARN
[main] zookeeper.ZKUtil: hconnection-0x2d330b0a0x0,
quorum=hadooptest:2181, baseZNode=/hbase Unable to set watcher on
znode (/hbase/hbaseid)
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss for /hbase/hbaseid

hive-base 执行: select * from hive_hbase where id =2 卡主 没有where可以
count 可以 order by 一样错误 没解决????

hive日志 =====================================
2017-08-15 12:47:57,813 WARN [main-SendThread(javacheng7:2181)]:
zookeeper.ClientCnxn (ClientCnxn.java:run(1102)) - Session 0x0 for
server javacheng7/192.168.6.109:2181, unexpected error, closing socket
connection and attempting reconnect java.io.IOException: Connection
reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at
sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at
sun.nio.ch.IOUtil.read(IOUtil.java:192) at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at
org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐