1 Introduce

Hbase is a column-oriented database management system that runs on top of the Hadoop Distributed File System.

Web UI :master-ip:16010/master-status
From the web ui, we can get many userful information includes basic information of hbase cluster, table details and so on. From the table details we can get table’s schema, table’s region information and can trigger action includes compact, split and merge.

HBase is an ideal big data solution if the application requires random read or random write operations or both. If the application requires to access some data in real-time then it can be stored in a NoSQL database. HBase has its own set of wonderful API’s that can be used to pull or push data. HBase can also be integrated perfectly with Hadoop MapReduce for bulk operations like analytics, indexing, etc. The best way to use HBase is to make Hadoop the repository for static data and HBase the data store for data that is going to change in real-time after some processing.

2 Schema Design

HBase table can scale to billions of rows and many number of column based on your requirements. This table allows you to store terabytes of data in it. The HBase table supports the high read and write throughput at low latency. A single value in each row is indexed; this value is known as the row key.

2.1 General Concepts

  • Row key: Each table in HBase table is indexed on row key. Data is sorted lexicographically by this row key. There are no secondary indices available on HBase table.
  • Automaticity: Avoid designing table that requires atomacity across all rows. All operations on HBase rows are atomic at row level.
  • Even distribution: Read and write should uniformly distributed across all nodes available in cluster. Design row key in such a way that, related entities should be stored in adjacent rows to increase read efficacy.

2.2 Size Limit

  • Row keys: 4 KB per key
  • Column families: not more than 10 column families per table
  • Column qualifiers: 16 KB per qualifier
  • Individual values: less than 10 MB per cell
  • All values in a single row: max 10 MB

2.3 Row Key Design

2.3.1 Reverse Domain Names

If you are storing data that is represented by the domain names then consider using reverse domain name as a row keys for your HBase Tables. For example, com.company.name.

This technique works perfectly fine when you have data spread across multiple reverse domains. If you have very few reverse domain then you may end up storing data on single node causing hotspotting.

2.3.2 Hashing

When you have the data which is represented by the string identifier, then that is good choice for your Hbase table row key. Use hash of that string identifier as a row key instead of raw string. For example, if you are storing user data that is identified by user ID’s then hash of user ID is better choice for your row key.

2.3.3 Timestamps

When you retrieve data based on time when it was stored, it is best to include the timestamp in your row key. For example, you are trying to store the machine log identified by machine number then append the timestamp to the machine number when designing row key, machine001#1435310751234.

2.3.4 Combines Row Key

You can combine multiple key to design row key for your HBase table based on your requirements.

2.4 Architecture Components

Zookeeper provides assistance services for the HBase cluster, and HMaster is mainly used to monitor and operate all RegionServers in the cluster. RegionServer is mainly used to manage partitions
在这里插入图片描述

2.4.1 Client

  • Use HBase’s RPC mechanism to communicate with HMaster and HRegionServer
  • For management operations: Client performs RPC with HMaster
  • For data read and write operations: Client performs RPC with HRegionServer

2.4.2 Zookeeper

HBase Zookeeper is a centralized monitoring server which maintains configuration information and provides distributed synchronization. Distributed synchronization is to access the distributed applications running across the cluster with the responsibility of providing coordination services between nodes. If the client wants to communicate with regions, the server’s client has to approach ZooKeeper first.
在这里插入图片描述

  • Through election, it is guaranteed that there is only one master in the cluster at any time, and the Master and RegionServers will register with ZooKeeper when they start
  • Monitor the online and offline information of the Region server in real time, and notify the Master in real time
  • Store the addressing entry of all Regions. For example the table locate on which server
  • Store the schema of HBase, including which tables on it and which column families on each table.
    在这里插入图片描述
  • After HMaster and HRegionServer are connected to ZooKeeper, an Ephemeral node is created, and the Heartbeat mechanism is used to maintain the survival state of this node. If an Ephemeral node gone, HMaster will receive a notification and take corresponding processing.
  • HMaster monitors the joining and downtime of HRegionServer by monitoring the Ephemeral nodes in ZooKeeper (default: /hbase/rs/*).
  • When the first HMaster connects to ZooKeeper, an Ephemeral node (default: /hbasae/master) will be created to represent the Active HMaster, and subsequent HMasters will monitor the Ephemeral node. If the current Active HMaster is down, it’s Ephemeral node would disappear, so other HMasters are notified and converts itself into an Active HMaster. Before becoming an Active HMaster, it will create it’s own Ephemeral node under /hbase/back-masters/
    在这里插入图片描述

2.4.3 HMaster

在这里插入图片描述
Coordinate all region servers

  • Manage HRegionServer to achieve it’s load balancing.
  • Manage and assign HRegions, such as assigning new HRegions when HRegion splits; migrate HRegions within them to other HRegionServers when HRegionServer exits.
  • Monitor the status of all HRegionServers in the cluster by Heartbeat and monitor the status in ZooKeeper

Administrator function

  1. Provides interfaces for create and delete HBase Tables

2.4.4 HRegionServer

The HBase table will be split into multiple Regions according to the start and end ranges of the RowKey. Each Region contains all rows between StartKey and EndKey. Each Region is assigned to a RegionServer in the cluster. RegionServer can manage more than 1000 Regions.
在这里插入图片描述
2. The RegionServer maintains the regions which are assigned by HMaster and handles IO requests to these regions.
3. The RegionServer is responsible for segmenting regions that become too large during operation.

2.4.5 HRegion

在这里插入图片描述
Region is the basic unit of HBase data management. Each HRegion consists of multiple Stores. Each Store save a Column Family. If a table has several column families, there are several Stores. Each Store consists of a MemStore and many StoreFiles. MemStore is the content of Store in memory. After writing to the file, it is StoreFile. The bottom layer of StoreFile is stored in HFile.

3 Read and Writer Schema

3.1 Write process

在这里插入图片描述

  1. The client first query the RegionServer which the Meta table located from Zookeeper.
  2. Access the RegionServer corresponding to the Meta table, and query the meta table to find out which region of the RegionServer the target data is located in according to the requested information (namespace: table/rowkey). The region information of the table and the location information of the meta table are cached in the client’s meta cache to facilitate the next access.
  3. Communicate with the RegionServer of the target data.
  4. Write data to WAL. The new content will be appended to the end of the WAL file (stored on disk).
  5. Write the data to the corresponding memstore.
  6. Send a successful write message to the client.
  7. After reaching the refresh time of memstore, refresh the data to HFILE (Region Flush)

3.1.1 HBase Meta table

The META table is an HBase table that stores all Regions information in the system. A META table is like a B-tree. The META table structure is as follows:

  • Key:table, region start key, region id
  • Value:region server
    在这里插入图片描述

3.1.2 RegionServer component

  • WAL(HLog): The write ahead log is a file on a distributed file system. Used to store new data that is not yet persistently stored and can be recovered in the event of a failure.
  • BlockCache: Read cache which stores frequently read data in memory. Delete least recently used data when memory is low.
  • MemStore: Write cache which stores new data that has not been written to disk. Sort it before writing to disk. Each Region has a MemStore for each column family.
  • HFile: Stores rows on disk as an ordered KeyValue.
    在这里插入图片描述

3.1.3 MemStore

MemStore caches HBase data updates in memory, in the form of ordered KeyValues, which is the same as storage in HFile. Each Column Family has a MemStore and all updates are sorted by Column Family.
在这里插入图片描述

3.1.4 Region Flush

After meetting certain conditions (such as MemStore exceeds 128M which checked every 10 seconds by default or Periodically 1 hour, etc), MemStore will be written to a new HFile file on HDFS. HBase creates an HFile for each Column Family, which stores the specific Cell, that is KeyValue data. Over time, HFiles will continue to be generated, because KeyValue will continue to be flushed from the MemStore to the hard disk.

Note that this is one reason why HBase limits the number of Column Family. Each Column Family has a MemStore; if a MemStore is full, all MemStores will be flushed to disk. It also records the maximum sequence number of the last written data, so that the system knows which data has been persisted so far.

The maximum sequence number is a meta information that is stored in each HFile to indicate which data the persistence has progressed to and where it should continue. When the region is started, these serial numbers will be read, and the largest one will be taken as the base serial number. The subsequent new data updates will increment based on this value to generate a new serial number.
在这里插入图片描述

3.1.5 Region split

Region data more then the config size (default:hbase.hregion.max.filesize=10G). The region will trigger split.

If the Region is too large, the reading efficiency will be too low, and the traversal time will be too long. By splitting the big data into different machines, querying and aggregating separately, Hbase is also known as “a database that will automatically shard”.

3.1.6 HFile combine

3.1.6.1 Minor Compaction

Picks up some small, adjacent StoreFiles and merges them into a larger StoreFile, without processing Deleted or Expired Cells in the process. The result of a Minor Compaction is fewer and larger StoreFiles.
在这里插入图片描述

3.1.6.2 Major Compaction

Merge all StoreFiles into one StoreFile. This process also cleans up three types of meaningless data: deleted data, TTL expired data, and data whose version number exceeds the set version number. In addition, under normal circumstances, the Major Compaction time will last for a long time, and the whole process will consume a lot of system resources, which will have a relatively large impact on the upper-layer business. Therefore, online businesses will turn off the automatic triggering of the Major Compaction function and manually trigger it during low business peak periods.
在这里插入图片描述

3.1.6.3 compaction condition

Memstore Flush: After each Flush operation is performed, the number of files in the current Store will be judged. Once the number of files is greater than the configuration, compaction will be triggered. It should be noted that compaction is performed in units of Stores, and under Flush triggering conditions, all Stores in the entire Region will perform compaction, so multiple compactions will be performed in a short period of time.
Background thread periodic check: The background thread periodically triggers to check whether compaction needs to be performed, and the check period is configurable. The thread first check whether the number of files is greater than the configuration, and if it is greater than it will trigger compaction. If not, it will then check whether the major compaction condition is meet. In short, if the earliest update time of the HFile in the current store is earlier than a certain value mcTime, major compaction will be triggered (the default is triggered once every 7 days, and manual triggering can be configured. )
Manual trigger

3.2 Read process

  1. The client first query the RegionServer which the Meta table located from Zookeeper.
  2. Access the RegionServer corresponding to the Meta table, and query the meta table to find out which region of the RegionServer the target data is located in according to the requested information (namespace: table/rowkey). The region information of the table and the location information of the meta table are cached in the client’s meta cache to facilitate the next access.
  3. Communicate with the RegionServer where the target table is located
  4. Query the target data in Block Cache (read cache), MemStore and Store File respectively, and merge the found data. All data here refers to different versions (time stamp) or different types (Put/Delete) of the same data.
  5. Cache the data blocks which queried from the file to the block cache.
  6. Return the merged data to the client.

3.2.1 read load balance

3.2.2 HDFS data backup

HDFS backs up WAL(HLog) and HFile data blocks.
在这里插入图片描述

3.2.3 Recovery

When the HMaster detects that the RegionServer has crashed, the HMaster reassigns the Region in the crashed RegionServer to the Active RegionServer. To restore MemStore contents in a crashed RegionServer (not yet flushed to disk). The HMaster splits the WAL belonging to the crashed RegionServer into different files and stores these files in the datanodes of the new RegionServer. Each RegionServer then replays the split WAL it got to rebuild the MemStore.
在这里插入图片描述

4 Writer to Hbase

4.1 Write by Spark

There are three methods writing to hbase by spark.

4.1.1 Spark Api

This method is simple to write and the main code are as follows. It is suitable for writing small amounts of data.

val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test:company2")
val familyName = "data"
df.rdd.map(data=>{
  val rowkey = MD5Encode(data.getString(0))
  val put = new Put(rowkey.getBytes())
  put.addColumn(familyName.getBytes(), "ent_name".getBytes(), Bytes.toBytes(data.getString(0)))
  put.addColumn(familyName.getBytes(), "cn_shortname".getBytes(), Bytes.toBytes(data.getString(0)))
  (new ImmutableBytesWritable,put)
}).saveAsHadoopDataset(jobConf)

4.1.2 Table Api

val hbaseConn = ConnectionFactory.createConnection(conf)
val hbaseTable: Table = hbaseConn.getTable(TableName.valueOf(tableName))
for (data <- dataList) {
  val rowkey = MD5Encode(data.getString(0))
  val put = new Put(rowkey.getBytes())
  put.addColumn(familyName.getBytes(), "ent_name".getBytes(), Bytes.toBytes(data.getString(0)))
  put.addColumn(familyName.getBytes(), "cn_shortname".getBytes(), Bytes.toBytes(data.getString(0)))
  hbaseTable.put(put)
}

4.1.3 HFile Load

val hfileRDD: RDD[(HbaseSortKey, KeyValue)] = df.rdd.repartition(30).mapPartitions(it => {
  val list = new ListBuffer[(HbaseSortKey, KeyValue)]
  it.foreach(f => {
	val rowkey: String = MD5Encode(f.getString(0))
	val w: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
	val kv1: KeyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("data"), Bytes.toBytes("ent_name"), Bytes.toBytes(f.getString(0)))
	val kv2: KeyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("data"), Bytes.toBytes("cn_shortname"), Bytes.toBytes(f.getString(1)))
	list += ((new HbaseSortKey(w, kv1), kv1))
	list += ((new HbaseSortKey(w, kv2), kv2))
  })
  list.iterator
})

// 通过 PutSortReducer 分析发现 输出的 (ImmutableBytesWritable, KeyValue)都需要排序,
// 所以就搞个二次排序key com.clj.HbaseSortKey 实现二次排序逻辑
// rdd[(com.clj.HbaseSortKey, KeyValue)].sortByKey
val writeHfileRdd: RDD[(ImmutableBytesWritable, KeyValue)] = hfileRDD.sortByKey().map(f =>(f._1.rowkey, f._2))
// 写入文件
val outputPath:String = "/test/hbase_bulk_output"
// 创建带有Hbase配置的Configuration对象
val hbaseConf: Configuration = HBaseConfiguration.create()
// 用job来设置参数
val job: Job = Job.getInstance(hbaseConf)
val conn: Connection = ConnectionFactory.createConnection(hbaseConf)
val hbaseTableName:String = "test:company"
val table: HTable = conn.getTable(TableName.valueOf(hbaseTableName)).asInstanceOf[HTable]
// 通过调用这个方法,给job对象里面的配置对象设置了生成hfile文件的参数
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor, table.getRegionLocator)
// 写入hfile文件
writeHfileRdd.saveAsNewAPIHadoopFile(outputPath,
  classOf[ImmutableBytesWritable],
  classOf[KeyValue],
  classOf[HFileOutputFormat2],
  job.getConfiguration)
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐