pyFlink提交WordCount任务到Yarn集群测试
一系列pyflink提交任务到上yarn踩过的坑
·
pyFlink提交WordCount任务到Yarn集群测试
目录
部署版本
- Flink:1.17.1
- Hadoop:3.1.2
- Python:3.7.16
- Linux:CentOS7.9
- Kafka:2.8.0
Linux安装部署Python3.7环境
#下载python tar 包
[bigdata@master1 pySpark]$ wget https://www.python.org/ftp/python/3.7.16/Python-3.7.16.tgz
# 安装编译环境
[bigdata@master1 pySpark]$ sudo yum install -y install gcc-c++ cpp binutils glibc glibc-kernheaders glibc-common glibc-devel gcc make tcl
# 解压到当前目录
[bigdata@master1 pySpark]$ tar -zxvf Python-3.7.16.tgz
#编译并指定python的安装目录
[bigdata@master1 Python-3.7.16]$ ./configure --prefix=/home/bigdata/soft/python3.7
# 安装编译python ps:需要手动创建安装目录
[bigdata@master1 Python-3.7.16]$ sudo make && make install
# 建立软连接
[bigdata@master1 bin]$ sudo ln -s /home/bigdata/soft/python3.7/bin/python3.7 /usr/bin/python3.7
[bigdata@master1 bin]$ sudo ln -s /home/bigdata/soft/python3.7/bin/pip3.7 /usr/bin/pip3.7
# 设置环境变量
export PATH=$PATH:/usr/local/nodejs-v10.24/bin
export JAVA_HOME=/home/bigdata/soft/java
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
export SCALA_HOME=/home/bigdata/soft/scala
export PATH=${SCALA_HOME}/bin:$PATH
export PYSPARK_PYTHON=python3.7
# 遇到的问题
# python3.7缺少libffi-devel安装 ps:安装完后需要重新执行make && make install
[bigdata@master1 bin]$ sudo yum install libffi-devel -y
Python安装依赖
# 引入Flink依赖
python3.7 -m pip install apache-flink==1.17.1 -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
# 引入Kafka依赖
python3.7 -m pip install pykafka==2.8.0 -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
PS:如果没有apache-flink1.17.1,安装apache-flink1.17.0也是可以的
# 1、将装好依赖的python环境打包成zip包
[bigdata@master1 soft]$ sudo zip -r venv python3.7/
引入的第三方jar包下载
flink-sql-connector-kafka-1.17.1.jar
WordCount python代码
# coding=utf-8
from pyflink.common import Configuration, SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, CheckpointingMode, \
ExternalizedCheckpointCleanup
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
# import os
# java11_location = '/home/bigdata/soft/jdk-11.0.19' # 设置你自己的路径
# os.environ['JAVA_HOME'] = java11_location
def split(s):
"""
自定义flatMap方法
:param s:
:return:
"""
splits = s.split(" ")
for sp in splits:
yield (sp, 1)
if __name__ == '__main__':
# 构建环境,构建环境时,可添加配置参数,也可默认
config = Configuration()
config.set_integer("python.fn-execution.bundle.size", 1000)
config.set_boolean("rest.flamegraph.enabled", True)
config.set_boolean("pipeline.object-reuse", True)
config.set_string("env.java.opts", "-Dfile.encoding=UTF-8")
config.set_string("rest.bind-port", "8081-8089")
config.set_float("taskmanager.memory.managed.fraction", 0.3)
config.set_string("python.execution-mode", "process")
# config.set_string("python.execution-mode", "thread")
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
# 每 1000ms 开始一次 checkpoint
env.enable_checkpointing(1000 * 60 * 5)
# 设置模式为精确一次 (这是默认值)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
# 开启实验性的 unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()
# Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.get_checkpoint_config().set_checkpoint_timeout(1000 * 60 * 5)
# env.add_jars("file:///E:/pyWorks/wjxPyTest/lib/flink-sql-connector-kafka-1.17.1.jar")
# env.add_jars("hdfs://cluster/python3.7/jars/flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file:///home/bigdata/qh/pyFlink/wjxPyTest/lib/flink-sql-connector-kafka-1.17.1.jar")
# 指定Python虚拟环境,使用python运行flink需要配置
# env.add_python_archive("file:///home/bigdata/soft/venv.zip", "venv")
# env.add_python_archive("hdfs://cluster/python3.7/venv.zip", "venv")
# 指定用于执行python UDF workers (用户自定义函数工作者) 的python解释器的路径,使用python运行flink需要配置
# env.set_python_executable("venv/python3.7/bin/python3.7")
# 定义json序列化器
# deserialization_schema = JsonRowDeserializationSchema.builder() \
# .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_source = KafkaSource \
.builder() \
.set_bootstrap_servers('kafka1.bigdata.com:9092,kafka2.bigdata.com:9092') \
.set_group_id('MY_GROUP') \
.set_topics('test_source_topic') \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.build()
env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "kafka source") \
.flat_map(split) \
.key_by(lambda i: i[0]) \
.reduce(lambda i,j: (i[0],(i[1]+j[1]))) \
.print()
env.execute()
提交运行pyFlink命令
1、python Mode
# python Mode
[bigdata@master1 flink]$ python3.7 pyflink_sc.py
2、Per-Job Mode
# 登录Kerberos,集群如果配置了Kerberos,需要登录一个可提交用户
[bigdata@master1 keytab]$ kinit -kt /etc/security/keytab/spark.service.keytab spark/bigdata@EXAMPLE.COM
# 提交yarn任务 Per-Job Mode 模式在flink1.15之后已经被遗弃了,推荐使用Application 模式
# 模式作业编译在客户端,路径需要是一个提交机上可以直接访问到的真实路径
# 提交的`pyflink_sc.py`执行文件和代码中所依赖到的jar包需提前传输给所有hadoop slave节点服务器
/home/bigdata/soft/flink-1.17.1/bin/flink run --target yarn-per-job \
-Djobmanager.memory.process.size=4096m \
-Dtaskmanager.memory.process.size=6144m \
-Dyarn.application.name=pyflink_sc \
-Dyarn.ship-files=venv.zip \
-pyarch venv.zip \
-pyclientexec venv.zip/python3.7/bin/python3.7 \
-pyexec venv.zip/python3.7/bin/python3.7 \
--python /home/bigdata/qh/pyFlink/wjxPyTest/flink/pyflink_sc.py
PS:该模式在flink1.15之后已经被遗弃了,推荐使用Application 模式。源码阅读:参考链接
3、Application Mode
# 登录Kerberos,集群如果配置了Kerberos,需要登录一个可提交用户
[bigdata@master1 keytab]$ kinit -kt /etc/security/keytab/spark.service.keytab spark/bigdata@EXAMPLE.COM
# 提交任务的时候 `venv.zip` 要位于所在提交的 soft 目录下,且无法使用hdfs文件,提交任务的时候会自带 `file:///home/bigdata/soft`
# 提交的`pyflink_sc.py`执行文件和代码中所依赖到的jar包需提前传输给所有hadoop slave节点服务器
# 模式作业编译在客户端,路径需要是一个提交机上可以直接访问到的真实路径
/home/bigdata/soft/flink-1.17.1/bin/flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=4096m \
-Dtaskmanager.memory.process.size=6144m \
-Dyarn.application.name=pyflink_sc \
-Dyarn.ship-files=venv.zip \
-pyarch venv.zip \
-pyclientexec venv.zip/python3.7/bin/python3.7 \
-pyexec venv.zip/python3.7/bin/python3.7 \
-py /home/bigdata/qh/pyFlink/wjxPyTest/flink/pyflink_sc.py
碰到的错误
报错:Centos import torchvision 出现 No module named ‘_lzma‘
## 解决
1、yum install -y xz-devel
2、pip3.7 install backports.lzma
3、修改原本就存在的lmza.py文件,把 /python/lib/python3.7/lzma.py line 27行,修改如下:
try:
from _lzma import *
from _lzma import _encode_filter_properties, _decode_filter_properties
except ImportError:
from backports.lzma import *
from backports.lzma import _encode_filter_properties, _decode_filter_properties
参考文档
更多推荐
已为社区贡献1条内容
所有评论(0)