作为一款非常成熟的大数据工具,Spark已在业界获得了非常广泛的应用。而Python+Spark的结合产物PySpark更是集合了Python的易用和Spark的分布式计算能力,产生了1+1 > 2的效果。本系列文章将从《PySpark DataFrame使用详解》、《Pandas API on Spark使用详解》、《Spark on K8S搭建》、《Structured Streaming》、《Spark性能调优》几个方面分别介绍PySpark的功能。

这里也建议如果是新项目或者短时间内学习使用就直接使用PySpark DataFrame(PySpark SQL)或者Pandas API on Spark,不要再使用PySpark RDD。因为PySpark DataFrame和Pandas API on Spark的功能已经比较成熟,而且PySpark DataFrame的效率要高于PySpark RDD,甚至和Java版程序相差无几,此外Pandas API on Spark可以使用Pandas的API,这无疑极大的丰富了Spark的功能。预计未来几年PySpark的市场使用率将逐步超越Java/Scala,甚至在Spark的官方案例文档中已经把Python作为Spark的第一示例开发语言。

本文涉及的关于PySpark DataFrame的API都可以在官方文档中查阅。

1. 环境配置

组件版本信息如下:

  • Spark 3.4.0
  • Python 3.8,注意Spark 3.4.0不支持Python3.7。可以使用conda管理多个py环境。
  • jdk8,注意jdk版本不低于8u362。如果使用jdk11,需要设置 -Dio.netty.tryReflectionSetAccessible=true,不然可能出现下面的错误。
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available

使用下面pip命令直接安装PySpark:

pip install pyspark[pandas_on_spark]

上述命令包含了pandas_on_spark的依赖包,如果不需要,直接执行pip install pyspark即可。

在windows下面执行PySpark程序如果出现下面的error:

java.io.IOException: Cannot run program "python3": CreateProcess error=2, 系统找不到指定文件

则需要指定python解释器的路径,在程序中设置或者直接设置环境变量:

import os
os.environ['PYSPARK_PYTHON'] = r'D:\...\python.exe'

2. DataFrame API

注意:此处的DataFrame是指PySpark中的DataFrame数据格式,不同于Pandas中的DataFrame。下文的df如不特别说明,也是指PySpark中的DataFrame实例变量。

本文介绍的API都可在官网文档查询。

  • 构建会话环境
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField

spark = SparkSession.builder.appName('pysparkDemo') \
		# .config("spark.some.config.option", "some-value") \
		.getOrCreate()

如果使用jupyter调试pyspark程序,可以配置spark.sql.repl.eagerEval.enabled选项,可以像pandas一样直接执行df即可查看数据(不需要手动调用show函数):

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
  • 读取文件
# pyspark支持读取多种格式文件
spark.read.text()  # 支持读目录
spark.read.csv()
spark.read.json()
spark.read.orc()
spark.read.parquet()
spark.read.jdbc()
spark.read.format()  # 通用读文件API
spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load()

关于pyspark写其他数据库的配置项,可参考官方文档

  • 直接创建DataFrame
# 通过Row创建
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# Row是Datasets在PySpark中保存数据的单位

# 显示声明schema创建
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
# 创建DataFrame可以不指定schema,例如上面的程序仅指定字段名:schema=['a', 'b', 'c', 'd', 'e'],此时字段类型将继承自元素类型。

# 基于Pandas DataFrame创建
df = spark.createDataFrame(pd.DataFrame())
# 这也是一种常见的使用方式,例如有些spark不支持的数据渠道,可以使用pandas先读取数据,然后使用pyspark计算。
  • 数据类型
    前面在创建schema的时候是通过str的形式定义字段类型的,pyspark sql也提供了类型class供用户使用。可在pyspark.sql.types中查看。此外,也可直接查看官方文档
schema=StructType([StructField('a', LongType(), True), 
				   StructField('b', DoubleType(), True), 
				   StructField('c', StringType(), True),
				   StructField('e', DateType(), True),
				   StructField('f', TimestampType(), True)])
# StructField的第一参数表示字段名称,第二个参数表示类型,第三个参数表示是否可为空
  • 查看数据
df.show(n=20, truncate=True)  # 打印前n行,truncate表示字符串太长时截断显示
df.head(n)  		# 取df的前n行,不打印,返回值是Row的list结果集(如果n=1,则直接返回Row)
df.take(n)			# 取df的前n行,不打印,返回值是Row的list结果集(即使n=1,也返回list)
df.tail(n)  		# 取df的后n行,不打印,返回值是Row的结果集,如果n为负值,则表示跳过前|n|行
# head和tail的结果会全部返回到driver进程所在的节点,所以使用时需注意数据量,防止节点内存不足。

df.printSchema()  	# 打印df的schema
df.schema  			# df的schema信息,不打印
df.columns  		# 字段名
df.dtypes  			# 字段类型
df.count()  		# 记录数
df.toPandas()		# 转为pandas DataFrame,所有结果全部返回到driver进程所在的节点,注意oom
df.collect()		# 返回所有数据Row的list集合,所有结果全部返回到driver进程所在的节点,注意oom
  • 过滤
df.filter(df.age > 3)  	# 可接受Column或者字符串sql表达式
df.filter("age > 3")
df.filter(df['age'] > 3)	# 推荐使用这种形式访问列名,不会和属性冲突
df.filter(df.name.contains("Spark"))
# where和filter等价
df.where(df.age > 3)
df.where("age > 3")
  • 字段查询
df.select('a', 'b')  # 查询a、b列
df.select(F.length(df.c).alias('c_len'))  # 单列计算查询。针对PySpark DataFrame Column的操作函数必须使用PySpark内置的操作函数,不能使用python原生函数。此处的alias也可以换成name
  • 聚合查询
df.agg(F.max(F.b).name('max_b'))		# 聚合查询
df.agg(F.max('b').name('max_b'))
df.agg({'b': 'max'}).withColumnRenamed('max(b)', 'max_b')
df.agg(F.max(F.a).name('max_a'), F.max(F.b).name('max_b'))
df.agg(F.max('a').name('max_a'), F.max('b').name('max_b'))
df.agg({'a': 'max', 'b': 'max'}).withColumnsRenamed({'max(a)': 'max_a', 'max(b)': 'max_b'})
# PySpark针对DataFrame Column的计算函数支持显示调用和字符串两种语法格式,可根据习惯选择使用
  • 分组查询
df.groupBy('a', 'c').avg()
df.groupBy('a', 'c').agg()
df.groupBy('a', F.length('c').name('len_c')).agg()
# 使用df.groupby效果相同
  • 新增列
    前面的查询结果往往只有计算的结果,如果需要增加一个新列应该怎么操作呢?
df.withColumn('upper_c', F.upper(df.c))				# 新增一列
df.withColumns({'v1': df.a + 10, 'v2': df.b * 10})	# 新增多列
  • explode
    explode通俗解释就是“压扁”,效果等同于rdd中的flatMap。注意:不是多行转列。
df.select(F.explode(F.split(df.value, "\s+")).alias("word")).groupBy("word").count()
# 先使用空格分隔df中的value字段,然后统计word count

eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt"))
eDF.select(explode(eDF.mapfield).alias("key", "value"))  # mapfield的key、value值分别转为key、value字段
  • 函数
    前面我们已经列举了部分PySpark内置函数,PySpark的内置函数远不止这些,从常见的逻辑函数、字符串处理函数、时间函数、聚合函数、开窗函数等等,可查看官方文档
  • pandas udf
    PySpark DataFrame也支持执行pandas udf,使得数据可以按照pandas中的数据类型执行计算。Spark和Pandas之间的数据传输通过Arrow实现。
##################1如1出udf#################
@pandas_udf('long')		# 返回元素类型
def pandas_plus_one(s: pd.Series) -> pd.Series:	# 必须指定udf的返回值类型
    return s + 1
# 需要注意参数s和udf的返回值类型虽然都是Series,但是不能在udf中使用和前后数据有关的操作,例如rolling。这是因为在spark中数据是分布式存储的,数据分批次调用udf。
df.select(pandas_plus_one(df.a))

##################多入1出udf#################
@pandas_udf('double')
def pandas_series_sum(s1: pd.Series, s2: pd.Series) -> pd.Series:	# 必须指定udf的返回值类型
    return s1 + s2

df.select(pandas_series_sum(df.a, df.b))

##################聚合输出udf################
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()
    
df.groupby("id").agg(mean_udf(df['v']))

##################单到单迭代器#################
# 迭代器形式
from typing import Iterator
@pandas_udf("long")  # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()

##################DataFrame输出#################
@pandas_udf("col1 string, col2 long")  # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")
df.select(func("long_col", "string_col", "struct_col"))

##################DataFrame输出#################
@pandas_udf("col1 string, col2 long")
def func(s: pd.Series) -> pd.DataFrame:
    return pd.DataFrame({'col1': s+1, 'col2': s-1})

df.select(func(df['a']))
# 注意返回的结果是一列
  • 结合pandas api
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]
# pandas_df虽然是pandas.DataFrame类型,但是同样不能执行和前后数据相关的操作。
# 结果使用生成器输出,类似于flink中的process算子,好处是可以灵活地自定义输出

df.mapInPandas(pandas_filter_func, schema=df.schema)

# 结合groupBy分组操作
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema)

注意:前面已经提到spark中的数据和pandas.DataFrame之间传输是通过Arrow实现的,这个过程是分批次传输的,如果每个批次的数据太多,容易导致oom。为了避免出现这种情况,可以设置spark.sql.execution.arrow.maxRecordsPerBatch参数来设置每个批次的最大记录数(默认10000)。

  • 结合sql
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA")

df.selectExpr('add_one(v1)')
df.select(F.expr('count(*)') > 0)
  • 时区设置
    spark内部默认使用UTC时区,在输出展示时,会默认转为会话时区。如果需要设置为指定时区,可以使用spark.sql.session.timeZone配置项设置。在转为pandas数据时,会转为会话时区,并丢失时区属性。此外,spark中的时间戳精度为微妙,pandas.DataFrame中的纳秒时间戳转为PySpark.DataFrame数据时会截断纳秒值。还需要注意,非Pandas标准UDF将把时间戳数据加载为Python datetime对象,这与Pandas时间戳不同。建议在使用pandas_udfs中的时间戳时使用Pandas时间序列功能,如numpy中的datetime64和timedelta64,以获得更好地性能。
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐