pyspark 读取 hbase
本文推荐两种读写 HBase 的方式:1、相关依赖:将的目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:在目录中新建一个目录 ,并将拷贝进去:2、修改文件:设置的 文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:1.2 读取 HBase1.3 向 HBase 写入数据2. happybase + Thrift 服务
本文推荐两种读写 HBase 的方式:
happybase
库 +thrift
接口newAPIHadoopRDD
接口
1. newAPIHadoopRDD 读写 HBase
1.1 前置准备
1、相关依赖:
将 HBase
的 lib
目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:
- 所有hbase开头的jar文件
guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
在 spark/jars
目录中新建一个目录 hbase/
,并将 spark-example-1.6.0.jar
拷贝进去:
mkdir -p /usr/local/spark/jars/hbase/
mv ~/下载/spark-examples* /usr/local/spark/jars/hbase/
注意:在
Spark 2.0
版本上缺少相关把 hbase 的数据转换 python 可读取的 jar 包,需另行下载 spark-example-1.6.0.jar
2、修改 spark-env.sh
文件:
设置 Spark
的 spark-env.sh
文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:
# 在 spark-env.sh 最前面增加下面一行内容
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
1.2 读取 HBase
# coding=utf-8
from pyspark import SparkContext, SparkConf
def read_for_hbase(spark_context):
host = '192.168.131.131'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = spark_context.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result", keyConverter=keyConv, valueConverter=valueConv,
conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print(k, v)
def main():
con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
sc = SparkContext(conf=con)
read_for_hbase(sc)
write_to_hbase(sc)
sc.stop()
if __name__ == '__main__':
main()
1.3 向 HBase 写入数据
# coding=utf-8
# Description:读取、保存数据到 HBase
from pyspark import SparkContext, SparkConf
def write_to_hbase(spark_context):
host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
rawData = ['3,info,name,Rongcheng', '4,info,name,Guanhua']
# (rowkey, [row key, column family, column name, value])
spark_context.parallelize(rawData).map(lambda x: (x[0], x.split(','))).saveAsNewAPIHadoopDataset(conf=conf, keyConverter=keyConv, valueConverter=valueConv)
def main():
con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
sc = SparkContext(conf=con)
read_for_hbase(sc)
write_to_hbase(sc)
sc.stop()
if __name__ == '__main__':
main()
注意:向
HBase
写入数据会报错:pyspark.sql.utils.IllegalArgumentException: 'Can not create a Path from a null string'
,不影响数据写入
2. happybase + Thrift 服务读写 HBase
2.1 依赖安装
由于需要安装第三方库 happybase
,所以需要将 happybase
打包上传到 spark
,这里采用 Anaconda
:
1、打包依赖:
# 虚拟环境创建
conda create --name spark_env python=3.6
# 进入虚拟环境
conda activate spark_env
# 安装 happybase
pip install happybase
# 退出虚拟环境
conda deactivate spark_env
# 查看虚拟环境所在目录
conda env list
# 打包虚拟环境为 zip
cd /usr/local/anaconda3/envs
zip -r spark_env.zip spark_env/
2、将依赖上传到 HDFS:
# 上传后的 hdfs 路径为:/spark/spark_env.zip
hadoop fs -put -f spark_env.zip /spark
3、将依赖上传到 spark
,使得每个节点都能使用,编辑 spark-submit
:
#!/bin/bash
num_executors=20 # 启动的 executor 数量,默认 20
executor_memory=10g # 每个 executor 的内存,默认 1 G
executor_cores=20 # 每个 executor 的核数
# hdfs 路径 #spark_pack 为 spark 访问依赖时解压缩的路径,可自定义, # 必不可少
anaconda_archive=/spark/spark_env.zip#spark_pack
pyspark_python="./spark_pack/bin/python" # 依赖中 python 路径
nohup spark-submit --master yarn
--deploy-mode cluster
--archives $anaconda_archive \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=$pyspark_python \
--conf spark.executorEnv.PYSPARK_PYTHON=$pyspark_python \
上面是 cluster
集群模式的配置方式,client
模式配置为:
spark-submit --master yarn
--deploy-mode client \
--conf spark.yarn.dist.archives=$anaconda_archive \
--conf spark.pyspark.driver.python=/root/anaconda3/envs/spark_env/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=$pyspark_python
save_to_hbase.py
# 若是 --master 为 local
spark-submit --master local
--conf spark.yarn.dist.archives=/root/anaconda3/envs/spark_env \
--conf spark.pyspark.driver.python=/root/anaconda3/envs/spark_env/bin/python \
--conf spark.pyspark.python=/root/anaconda3/envs/spark_env/bin/python
save_to_hbase.py
注意:
client
模式,driver
端需要使用本地的Python
环境,spark.executorEnv.PYSPARK_PYTHON
使用hdfs
上的Python
环境
2.2 读写 HBase
启动 Thrift
服务可参考:Hbase基础(9):python操作hbase之happybase
# coding=utf-8
import happybase
from pyspark import SparkContext, SparkConf
def main():
con = SparkConf().setMaster("local[2]").setAppName("test_spark_hbase")
sc = SparkContext(conf=con)
stocksRaw = sc.textFile("/user/mapr/stocks")
stocks = stocksRaw.filter(lambda r: not r.startswith("date"))
# 创建表
conn = happybase.Connection(server, port=9090)
table_name = 'mytable'
families = {
'cf': dict(max_versions=1)
}
# 创建表
conn.create_table(table_name, families)
def bulk_insert(batch):
"""批量插入,每个 partition 新建一个链接,避免没插入一条就要连接一次"""
table = happybase.Connection(server, port=9090).table(table_name)
for r in batch:
tokens = r.split(",")
key = tokens[0] + "-" + tokens[7]
value = {"info:date": tokens[0]
, "info:open": tokens[1]
, "info:high": tokens[2]
, "info:low": tokens[3]
, "info:close": tokens[4]
, "info:volume": tokens[5]
, "info:adjclose": tokens[6]
, "info:symbol": tokens[0]
}
# Look at jupyter console to see the print output
print(key, value)
table.put(key, value)
stocks.foreachPartition(bulk_insert)
conn.close()
sc.stop()
if __name__ == '__main__':
main()
参考:
更多推荐
所有评论(0)