本文推荐两种读写 HBase 的方式:

  • happybase 库 + thrift 接口
  • newAPIHadoopRDD 接口

1. newAPIHadoopRDD 读写 HBase

1.1 前置准备

1、相关依赖:

HBaselib 目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:

  • 所有hbase开头的jar文件
  • guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
cd  /usr/local/spark/jars
mkdir  hbase
cd  hbase
cp  /usr/local/hbase/lib/hbase*.jar  ./
cp  /usr/local/hbase/lib/guava-12.0.1.jar  ./
cp  /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar  ./
cp  /usr/local/hbase/lib/protobuf-java-2.5.0.jar  ./

spark/jars 目录中新建一个目录 hbase/,并将 spark-example-1.6.0.jar 拷贝进去:

mkdir -p /usr/local/spark/jars/hbase/
mv ~/下载/spark-examples* /usr/local/spark/jars/hbase/

注意:在Spark 2.0版本上缺少相关把 hbase 的数据转换 python 可读取的 jar 包,需另行下载 spark-example-1.6.0.jar

2、修改 spark-env.sh 文件:

设置 Sparkspark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:

# 在 spark-env.sh 最前面增加下面一行内容
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

1.2 读取 HBase

# coding=utf-8


from pyspark import SparkContext, SparkConf


def read_for_hbase(spark_context):
    host = '192.168.131.131'
    table = 'student'
    conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
    hbase_rdd = spark_context.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                                   "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                                   "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv, valueConverter=valueConv,
                                   conf=conf)
    count = hbase_rdd.count()
    hbase_rdd.cache()
    output = hbase_rdd.collect()
    for (k, v) in output:
        print(k, v)


def main():
    con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
    sc = SparkContext(conf=con)

    read_for_hbase(sc)
    write_to_hbase(sc)
    sc.stop()

    
if __name__ == '__main__':
    main()

1.3 向 HBase 写入数据

# coding=utf-8
# Description:读取、保存数据到 HBase


from pyspark import SparkContext, SparkConf


def write_to_hbase(spark_context):
    host = 'localhost'
    table = 'student'
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table,
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

    rawData = ['3,info,name,Rongcheng', '4,info,name,Guanhua']
    #  (rowkey, [row key, column family, column name, value])
    spark_context.parallelize(rawData).map(lambda x: (x[0], x.split(','))).saveAsNewAPIHadoopDataset(conf=conf, keyConverter=keyConv, valueConverter=valueConv)

def main():
    con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
    sc = SparkContext(conf=con)

    read_for_hbase(sc)
    write_to_hbase(sc)
    sc.stop()


if __name__ == '__main__':
    main()

注意:向 HBase 写入数据会报错:pyspark.sql.utils.IllegalArgumentException: 'Can not create a Path from a null string',不影响数据写入

参考:Spark2.1.0+入门:读写HBase数据(Python版)

2. happybase + Thrift 服务读写 HBase

2.1 依赖安装

由于需要安装第三方库 happybase,所以需要将 happybase 打包上传到 spark,这里采用 Anaconda

1、打包依赖:

# 虚拟环境创建
conda create --name spark_env python=3.6

# 进入虚拟环境
conda activate spark_env

# 安装 happybase
pip install happybase

# 退出虚拟环境
conda deactivate spark_env

# 查看虚拟环境所在目录
conda env list

# 打包虚拟环境为 zip
cd /usr/local/anaconda3/envs
zip -r spark_env.zip spark_env/

2、将依赖上传到 HDFS:

# 上传后的 hdfs 路径为:/spark/spark_env.zip
hadoop fs -put -f spark_env.zip /spark

3、将依赖上传到 spark,使得每个节点都能使用,编辑 spark-submit

#!/bin/bash

num_executors=20    # 启动的 executor 数量,默认 20
executor_memory=10g # 每个 executor 的内存,默认 1 G
executor_cores=20   # 每个 executor 的核数

# hdfs 路径 #spark_pack 为 spark 访问依赖时解压缩的路径,可自定义, # 必不可少
anaconda_archive=/spark/spark_env.zip#spark_pack

pyspark_python="./spark_pack/bin/python"    # 依赖中 python 路径


nohup spark-submit --master yarn
    --deploy-mode cluster
    --archives $anaconda_archive \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=$pyspark_python \
    --conf spark.executorEnv.PYSPARK_PYTHON=$pyspark_python \

上面是 cluster 集群模式的配置方式,client 模式配置为:

spark-submit --master yarn 
--deploy-mode client \
--conf spark.yarn.dist.archives=$anaconda_archive \
--conf spark.pyspark.driver.python=/root/anaconda3/envs/spark_env/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=$pyspark_python
save_to_hbase.py


# 若是 --master 为 local
spark-submit --master local 

--conf spark.yarn.dist.archives=/root/anaconda3/envs/spark_env \
--conf spark.pyspark.driver.python=/root/anaconda3/envs/spark_env/bin/python \
--conf spark.pyspark.python=/root/anaconda3/envs/spark_env/bin/python  
save_to_hbase.py

注意:client 模式,driver 端需要使用本地的 Python 环境,spark.executorEnv.PYSPARK_PYTHON 使用 hdfs 上的 Python 环境

参考:pyspark打包依赖包&使用python虚拟环境

2.2 读写 HBase

启动 Thrift 服务可参考:Hbase基础(9):python操作hbase之happybase

# coding=utf-8

import happybase
from pyspark import SparkContext, SparkConf



def main():
    con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
    sc = SparkContext(conf=con)
    stocksRaw = sc.textFile("/user/mapr/stocks")
    stocks = stocksRaw.filter(lambda r: not r.startswith("date"))
    
    # 创建表
    conn = happybase.Connection(server, port=9090)
    table_name = 'mytable'
    families = {
        'cf': dict(max_versions=1)
    }
    # 创建表
    conn.create_table(table_name, families)


    def bulk_insert(batch):
        """批量插入,每个 partition 新建一个链接,避免没插入一条就要连接一次"""
        table = happybase.Connection(server, port=9090).table(table_name)
        for r in batch:
            tokens = r.split(",")
            key = tokens[0] + "-" + tokens[7]
            value = {"info:date": tokens[0]
                , "info:open": tokens[1]
                , "info:high": tokens[2]
                , "info:low": tokens[3]
                , "info:close": tokens[4]
                , "info:volume": tokens[5]
                , "info:adjclose": tokens[6]
                , "info:symbol": tokens[0]
                     }
            # Look at jupyter console to see the print output
            print(key, value)
            table.put(key, value)

    stocks.foreachPartition(bulk_insert)
    conn.close()
    sc.stop()



if __name__ == '__main__':
    main()

参考:

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐