目录

使用Python依赖

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境

方式二:在本地开发机创建Python虚拟环境

使用JAR包

使用数据文件


使用Python依赖

通过以下场景为您介绍如何使用Python依赖:

  • 使用自定义的Python虚拟环境
  • 使用第三方Python包
  • 使用JAR包
  • 使用数据文件

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境
set -e

# 创建Python的虚拟环境。

python3.6 -m venv venv

# 激活Python虚拟环境。

source venv/bin/activate

# 准备Python虚拟环境。

pip install --upgrade pip

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Python虚拟环境。

deactivate

该命令执行完成后,会生成一个名为venv的目录,即为Python 3.6的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境。

为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境。

以下命令显示了不同的PyFlink作业提交用例:

  • 执行 PyFlink job:
$ ./bin/flink run --python examples/python/table/batch/word_count.py
  • 使用pyFiles和--pyModule中指定的主入口模块运行PyFlink作业:
./bin/flink run \
--pyModule batch.word_count \
--pyFiles examples/python/table/batch
  • 在特定主机<jobmanagerHost>运行的JobManager上提交PyFlink作业(相应地调整命令):
$ ./bin/flink run \
 --jobmanager <jobmanagerHost>:8081 \
 --python examples/python/table/batch/word_count.py
  • 在yarn集群上以Per-Job 模式运行PyFlink job:
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/batch/word_count.py
方式二:在本地开发机创建Python虚拟环境
set -e

# 下载Python 3.7 miniconda.sh脚本。

wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"

# 为Python 3.7 miniconda.sh脚本添加执行权限。

chmod +x miniconda.sh

# 创建Python的虚拟环境。

./miniconda.sh -b -p venv

# 激活Conda Python虚拟环境。

source venv/bin/activate ""

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Conda Python虚拟环境。

conda deactivate

# 删除缓存的包。

rm -rf venv/pkgs

# 将准备好的Conda Python虚拟环境打包。

zip -r venv.zip venv

该命令执行完成后,会生成一个名为venv.zip的文件,即为Python 3.7的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境,或者在虚拟环境中安装所需的第三方Python包。

使用JAR包

如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包。

引用Java UDF或外部连接器的PyFlink作业。--jarfile中指定的JAR文件将上载到集群。

$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--jarfile <jarFile>

使用数据文件

如果您的Flink Python作业中需要访问数据文件,例如模型文件等,则可以通过Python Archives的方式来访问。

  • 执行 PyFlink job,增加 source和资源文件 ,--pyFiles中指定的文件将被添加到PYTHONPATH中,因此在Python代码中可用。
$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐