问题:使用 monotonically_increasing_id() 将行号分配给 pyspark 数据帧

我正在使用 monotonically_increasing_id() 使用以下语法将行号分配给 pyspark 数据帧:

df1 = df1.withColumn("idx", monotonically_increasing_id())

现在 df1 有 26,572,528 条记录。所以我期待 idx 值在 0-26,572,527 之间。

但是当我选择 max(idx) 时,它的值非常大:335,008,054,165。

这个函数是怎么回事?使用此功能与具有相似记录数的另一个数据集合并是否可靠?

我有大约 300 个数据帧,我想将它们组合成一个数据帧。因此,一个数据帧包含 ID,其他数据帧包含与它们对应的不同记录

解答

编辑:可以在此处找到执行此操作的方法和风险的完整示例

来自文档

生成单调递增的 64 位整数的列。

生成的ID保证单调递增且唯一,但不连续。当前实现将分区 ID 放在高 31 位中,将每个分区内的记录号放在低 33 位中。假设数据帧的分区少于10亿,每个分区的记录少于80亿。

因此,它不像 RDB 中的自动增量 id,并且对于合并它可靠。

如果您需要像 RDB 中的自动增量行为并且您的数据是可排序的,那么您可以使用row_number

df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
|  1|   ....... |
|  2|   ....... |
|  3| ..........|
+---+-----------+

如果您的数据不可排序并且您不介意使用 rdds 创建索引然后回退到数据帧,则可以使用rdd.zipWithIndex()

一个例子可以在这里找到

简而言之:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()

df.show()

# your data           | indexes
+---------------------+---+
|         _1          | _2| 
+-----------=---------+---+
|[data col1,data col2]|  0|
|[data col1,data col2]|  1|
|[data col1,data col2]|  2|
+---------------------+---+

在那之后,您可能需要更多的转换才能使您的数据框达到您需要的状态。注意:不是一个非常高效的解决方案。

希望这可以帮助。祝你好运!

编辑: 想一想,可以结合monotonically_increasing_id来使用row_number:

# create a monotonically increasing id 
df = df.withColumn("idx", monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')

虽然不确定性能。

Logo

Python社区为您提供最前沿的新闻资讯和知识内容

更多推荐