问题:Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres

我有以下数据框(df4):

+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id                                  |exclusion_reason                          |created_at                |updated_at                |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+

我需要写入 Postgres 数据库。我正在使用 AWS Glue,而 Postgres 数据库位于 VPC 中,因此我需要使用胶水连接和glueContext.write_dynamic_frame.from_jdbc_conf方法来执行此操作。问题是我不断收到错误ERROR: column "matchback_exclusion_reason" is of type jsonb but expression is of type character。数据框中的数据类型为字符串,数据库中的数据类型为JSONB。

我已经看到建议我只需将stringtype: "unspecified"添加到我的 write 语句中,但以下会产生相同的错误:

datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")

output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})

我可以以某种方式将此列转换为 JSON 吗?我尝试创建一个结构类型来解析元素,但这也不起作用(下面的代码):

schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

是否可以将列类型修改为 JSONB 类型?理想情况下,我基本上只想“json.load”排除_reason 列,这样我就可以将它写入Postgres。

解答

ResolveChoice可以通过将exclusion_reason转换为json来提供帮助:

datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])

来自AWS Glue 开发人员指南

使用ResolveChoice指定当列包含多种类型的值时应如何处理。您可以选择将列转换为单一数据类型、丢弃一种或多种类型,或者将所有类型保留在单独的列或结构中。您可以为每列选择不同的解析策略或指定应用于所有列的全局策略。

Logo

PostgreSQL社区为您提供最前沿的新闻资讯和知识内容

更多推荐