Flink流批一体计算(9):Flink Python
如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包。为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境。执行 PyFlink job,增加 source和资源文件
目录
使用Python依赖
通过以下场景为您介绍如何使用Python依赖:
- 使用自定义的Python虚拟环境
- 使用第三方Python包
- 使用JAR包
- 使用数据文件
使用自定义的Python虚拟环境
方式一:在集群中的某个节点创建Python虚拟环境
set -e
# 创建Python的虚拟环境。
python3.6 -m venv venv
# 激活Python虚拟环境。
source venv/bin/activate
# 准备Python虚拟环境。
pip install --upgrade pip
# 安装PyFlink依赖。
pip install "apache-flink==1.13.0"
# 退出Python虚拟环境。
deactivate
该命令执行完成后,会生成一个名为venv的目录,即为Python 3.6的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境。
为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境。
以下命令显示了不同的PyFlink作业提交用例:
- 执行 PyFlink job:
$ ./bin/flink run --python examples/python/table/batch/word_count.py
- 使用pyFiles和--pyModule中指定的主入口模块运行PyFlink作业:
./bin/flink run \
--pyModule batch.word_count \
--pyFiles examples/python/table/batch
- 在特定主机<jobmanagerHost>运行的JobManager上提交PyFlink作业(相应地调整命令):
$ ./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/batch/word_count.py
- 在yarn集群上以Per-Job 模式运行PyFlink job:
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/batch/word_count.py
方式二:在本地开发机创建Python虚拟环境
set -e
# 下载Python 3.7 miniconda.sh脚本。
wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"
# 为Python 3.7 miniconda.sh脚本添加执行权限。
chmod +x miniconda.sh
# 创建Python的虚拟环境。
./miniconda.sh -b -p venv
# 激活Conda Python虚拟环境。
source venv/bin/activate ""
# 安装PyFlink依赖。
pip install "apache-flink==1.13.0"
# 退出Conda Python虚拟环境。
conda deactivate
# 删除缓存的包。
rm -rf venv/pkgs
# 将准备好的Conda Python虚拟环境打包。
zip -r venv.zip venv
该命令执行完成后,会生成一个名为venv.zip的文件,即为Python 3.7的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境,或者在虚拟环境中安装所需的第三方Python包。
使用JAR包
如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包。
引用Java UDF或外部连接器的PyFlink作业。--jarfile中指定的JAR文件将上载到集群。
$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--jarfile <jarFile>
使用数据文件
如果您的Flink Python作业中需要访问数据文件,例如模型文件等,则可以通过Python Archives的方式来访问。
- 执行 PyFlink job,增加 source和资源文件 ,--pyFiles中指定的文件将被添加到PYTHONPATH中,因此在Python代码中可用。
$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
更多推荐
所有评论(0)