Intro

    常用hive字符串拼接函数,转json等操作

import pyspark
import pyspark.sql.functions
from pyspark.sql import SparkSession
# 创建SparkSession对象,调用.builder类
# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSession
spark = SparkSession.builder.appName("pysaprk").getOrCreate()
df = spark.createDataFrame([("id1", "classA", 160, "male", "18", "2019-01-01 11:45:50"),
                            ("id2", "classA", 170, "female", "37", "2019-01-02 11:55:50"),
                            ("id3", "classB", 180, "male", "18", "2019-01-21 11:45:50"),
                            ("id4", "classB", 170, "female", "44", "2019-02-01 12:45:50"),
                            ("id5", "classB", 160, "male", "39", "2019-01-15 10:40:50")],
                           ["id", "class", "height", "sex", "age", "createtime_str"])
df.show(20, truncate=False)
+---+------+------+------+---+-------------------+
|id |class |height|sex   |age|createtime_str     |
+---+------+------+------+---+-------------------+
|id1|classA|160   |male  |18 |2019-01-01 11:45:50|
|id2|classA|170   |female|37 |2019-01-02 11:55:50|
|id3|classB|180   |male  |18 |2019-01-21 11:45:50|
|id4|classB|170   |female|44 |2019-02-01 12:45:50|
|id5|classB|160   |male  |39 |2019-01-15 10:40:50|
+---+------+------+------+---+-------------------+
df.createOrReplaceTempView("temp")

struct

结构体,转json的中间过程

df_struct = spark.sql(
    """
select named_struct("id",id) as col_struct
from temp 
"""
)
df_struct.show()
+----------+
|col_struct|
+----------+
|     [id1]|
|     [id2]|
|     [id3]|
|     [id4]|
|     [id5]|
+----------+
df_struct.printSchema()
root
 |-- col_struct: struct (nullable = false)
 |    |-- id: string (nullable = true)

转json

利用to_json把上面的结构体直接转成json字符串

df_json = spark.sql(
    """
select to_json(named_struct("id",id)) as json
from temp 
"""
)
df_json.show()
+------------+
|        json|
+------------+
|{"id":"id1"}|
|{"id":"id2"}|
|{"id":"id3"}|
|{"id":"id4"}|
|{"id":"id5"}|
+------------+
df_json.printSchema()
root
 |-- json: string (nullable = true)

转数组

df_arr = spark.sql(
    """
select 
cast(struct(sex,age) as string) as arr,
to_json(struct(sex,age)) as json
from temp 
"""
)
df_arr.show(truncate=False)
+------------+---------------------------+
|arr         |json                       |
+------------+---------------------------+
|[male, 18]  |{"sex":"male","age":"18"}  |
|[female, 37]|{"sex":"female","age":"37"}|
|[male, 18]  |{"sex":"male","age":"18"}  |
|[female, 44]|{"sex":"female","age":"44"}|
|[male, 39]  |{"sex":"male","age":"39"}  |
+------------+---------------------------+

struct转数组操作,主要应用在每一行各个列之间的数组拼接,目前的方法可以处理数值类型array
包含字符串时,数据有问题,字符串没有影响,后面有解决方案,再补充

concat

concat常规的字符串拼接函数,没啥,看个例子

spark.sql(
    """
select concat("id",":",id) as col_concat
from temp 
"""
).show()
+----------+
|col_concat|
+----------+
|    id:id1|
|    id:id2|
|    id:id3|
|    id:id4|
|    id:id5|
+----------+

concat_ws

concat_ws(seperator, string s1, string s2…)  

采取分隔符,把各个字段连接起来

spark.sql(
    """
select concat_ws(",",id,sex) as col_concat_ws,
concat(id,",",sex) as next
from temp 
"""
).show()
+-------------+----------+
|col_concat_ws|      next|
+-------------+----------+
|     id1,male|  id1,male|
|   id2,female|id2,female|
|     id3,male|  id3,male|
|   id4,female|id4,female|
|     id5,male|  id5,male|
+-------------+----------+

collect_list和collect_set

和group by连用,把分组中某一列转成一个数组。collect_list不去重,collect_set去重

df_collect = spark.sql(
    """
select sex
,collect_set(age) as set_col
,collect_list(age) as list_age
from temp 
group by sex
"""
)
df_collect.show()
+------+--------+------------+
|   sex| set_col|    list_age|
+------+--------+------------+
|female|[44, 37]|    [37, 44]|
|  male|[39, 18]|[18, 18, 39]|
+------+--------+------------+
df_collect.printSchema()
root
 |-- sex: string (nullable = true)
 |-- set_col: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- list_age: array (nullable = true)
 |    |-- element: string (containsNull = true)

可以看到collect之后以array格式储存

collect_list和concat_ws数组转字符串

spark.sql(
    """
select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp 
group by sex
""").show(truncate=False)
+------+------------+--------+
|sex   |list_age    |age_str |
+------+------------+--------+
|female|[37, 44]    |37,44   |
|male  |[18, 18, 39]|18,18,39|
+------+------------+--------+

利用collect_list拼接嵌套json

df_json2 = spark.sql(
    """
select sex
,to_json(named_struct("age_list",list_age)) as json_str1
,to_json(named_struct("sex",sex,"data",to_json(named_struct("age_list",list_age)) )) as json_str2
from 
(select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp 
group by sex ) as t
""")
df_json2.show(truncate=False)
+------+-----------------------------+-------------------------------------------------------------+
|sex   |json_str1                    |json_str2                                                    |
+------+-----------------------------+-------------------------------------------------------------+
|female|{"age_list":["37","44"]}     |{"sex":"female","data":"{\"age_list\":[\"37\",\"44\"]}"}     |
|male  |{"age_list":["18","18","39"]}|{"sex":"male","data":"{\"age_list\":[\"18\",\"18\",\"39\"]}"}|
+------+-----------------------------+-------------------------------------------------------------+
df_json2.printSchema()
root
 |-- sex: string (nullable = true)
 |-- json_str1: string (nullable = true)
 |-- json_str2: string (nullable = true)

实际案例

df.show(20, truncate=False)
+---+------+------+------+---+-------------------+
|id |class |height|sex   |age|createtime_str     |
+---+------+------+------+---+-------------------+
|id1|classA|160   |male  |18 |2019-01-01 11:45:50|
|id2|classA|170   |female|37 |2019-01-02 11:55:50|
|id3|classB|180   |male  |18 |2019-01-21 11:45:50|
|id4|classB|170   |female|44 |2019-02-01 12:45:50|
|id5|classB|160   |male  |39 |2019-01-15 10:40:50|
+---+------+------+------+---+-------------------+

数组类型

每个id的特征数组存放,每个班级的学生放在一起,格式如下:

{"class":"classA",
 "data": {
          "id1": [1, 2, 3 ], 
          "id2": [4, 5, 6 ] }
}
df_json3 = spark.sql(
    """
select 
class
,concat('{"classs":"',class,'","data":{',concat_ws(',',collect_list(id_map)),'}}') AS json_str
from 
(select *,
concat('"',id,'":',cast(struct(height,age) as string) ) as id_map
from temp  ) as t
group by class
""")
df_json3.show(truncate=False)
+------+----------------------------------------------------------------------------+
|class |json_str                                                                    |
+------+----------------------------------------------------------------------------+
|classB|{"classs":"classB","data":{"id3":[180, 18],"id4":[170, 44],"id5":[160, 39]}}|
|classA|{"classs":"classA","data":{"id1":[160, 18],"id2":[170, 37]}}                |
+------+----------------------------------------------------------------------------+

k-v类型

每个id的特征数组存放,每个班级的学生放在一起,格式如下:

{“class”: “classA”,
“data”: {
“id1”: {“height”: 1, “age”: 3},
“id2”: {“height”: 1, “age”: 3}}
}

df_json4 = spark.sql(
    """
select 
class
,concat('{"classs":"',class,'","data":{',concat_ws(',',collect_list(id_map)),'}}') AS json_str
from 
(select *,
concat('"',id,'":',to_json(struct(height,age)) ) as id_map
from temp  ) as t
group by class
""")
df_json4.show(truncate=False)
+------+----------------------------------------------------------------------------------------------------------------------------+
|class |json_str                                                                                                                    |
+------+----------------------------------------------------------------------------------------------------------------------------+
|classB|{"classs":"classB","data":{"id3":{"height":180,"age":"18"},"id4":{"height":170,"age":"44"},"id5":{"height":160,"age":"39"}}}|
|classA|{"classs":"classA","data":{"id1":{"height":160,"age":"18"},"id2":{"height":170,"age":"37"}}}                                |
+------+----------------------------------------------------------------------------------------------------------------------------+

顺序拼接字符串

按照某种顺序,依次拼接字符串
如下数据,对每个user,按照date顺序,升序拼接value
主要逻辑:

  • sort_array默认升序,把排序字段和value拼成一个字符串,使用排序功能
  • 排序之后的array,concat_ws拼接成大的字符串
  • 正则替换,去掉排序字段
import pandas as pd
df = pd.DataFrame({'user':['jack','jack','jack'],'date': ['20230725', '20230501', '20230601'], 'value': [1, 2, 3]})
spark_df_sort = spark.createDataFrame(df)
spark_df_sort.show(truncate=False)
+----+--------+-----+
|user|date    |value|
+----+--------+-----+
|jack|20230725|1    |
|jack|20230501|2    |
|jack|20230601|3    |
+----+--------+-----+
spark_df_sort.createOrReplaceTempView('test_sort')
sql = """
SELECT user,
CONCAT_WS(';' , arr) as arr_str,
regexp_replace(CONCAT_WS(';' , arr),'\\d+_','') AS value_str

FROM 

(select user,sort_array(collect_set(concat(date,"_",value))) as arr
from test_sort
group by user
) as t
"""
sql = sql.replace("\\", '\\\\')
ddd1 = spark.sql(sql)
ddd1.printSchema()
ddd1.show(truncate=False)

root
 |-- user: string (nullable = true)
 |-- arr_str: string (nullable = false)
 |-- value_str: string (nullable = false)

+----+--------------------------------+---------+
|user|arr_str                         |value_str|
+----+--------------------------------+---------+
|jack|20230501_2;20230601_3;20230725_1|2;3;1    |
+----+--------------------------------+---------+

倒序排序

sql = """
SELECT user,
CONCAT_WS(';' , arr) as arr_str,
regexp_replace(CONCAT_WS(';' , arr),'\\d+_','') AS value_str

FROM 

(select user,reverse(sort_array(collect_set(concat(date,"_",value)))) as arr
from test_sort
group by user
) as t
"""
sql = sql.replace("\\", '\\\\')
ddd1 = spark.sql(sql)
ddd1.printSchema()
ddd1.show(truncate=False)

root
 |-- user: string (nullable = true)
 |-- arr_str: string (nullable = false)
 |-- value_str: string (nullable = false)

+----+--------------------------------+---------+
|user|arr_str                         |value_str|
+----+--------------------------------+---------+
|jack|20230725_1;20230601_3;20230501_2|1;3;2    |
+----+--------------------------------+---------+
spark.stop()

                                2022-02-16 于南京市江宁区九龙湖
                            update 2023-02-10 于南京市江宁区九龙湖
                            update 2023-07-25 于南京市江宁区九龙湖

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐