Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres
问题:Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres 我有以下数据框(df4): +------------------------------------+------------------------------------------+--------------------------+--------------------------+ |id |ex
问题:Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres
我有以下数据框(df4):
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id |exclusion_reason |created_at |updated_at |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
我需要写入 Postgres 数据库。我正在使用 AWS Glue,而 Postgres 数据库位于 VPC 中,因此我需要使用胶水连接和glueContext.write_dynamic_frame.from_jdbc_conf
方法来执行此操作。问题是我不断收到错误ERROR: column "matchback_exclusion_reason" is of type jsonb but expression is of type character
。数据框中的数据类型为字符串,数据库中的数据类型为JSONB。
我已经看到建议我只需将stringtype: "unspecified"
添加到我的 write 语句中,但以下会产生相同的错误:
datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")
output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})
我可以以某种方式将此列转换为 JSON 吗?我尝试创建一个结构类型来解析元素,但这也不起作用(下面的代码):
schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))
df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))
是否可以将列类型修改为 JSONB 类型?理想情况下,我基本上只想“json.load
”排除_reason 列,这样我就可以将它写入Postgres。
解答
ResolveChoice
可以通过将exclusion_reason
转换为json
来提供帮助:
datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])
来自AWS Glue 开发人员指南
使用
ResolveChoice
指定当列包含多种类型的值时应如何处理。您可以选择将列转换为单一数据类型、丢弃一种或多种类型,或者将所有类型保留在单独的列或结构中。您可以为每列选择不同的解析策略或指定应用于所有列的全局策略。
更多推荐
所有评论(0)