Answer a question

Table A has many columns with a date column, Table B has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A is small, table B is massive.

I need to join B to A under the condition that a given element a of A.datetime corresponds to

B[B['datetime'] <= a]]['datetime'].max()

There are a couple ways to do this, but I would like the most efficient way.

Option 1

Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof.

Option 2

Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition

B['datetime'] <= A['datetime']

Then eliminate all the superfluous rows.

Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.

EDIT: Here is the sample input and expected output:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+

Answers

Anyone trying to do this in pyspark 3.x can use pyspark.sql.PandasCogroupedOps.applyInPandas
For Example:
  from pyspark.sql import SparkSession, Row, DataFrame
  import pandas as pd
  spark = SparkSession.builder.master("local").getOrCreate()

  df1 = spark.createDataFrame(
      [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
      ("time", "id", "v1"))
  df2 = spark.createDataFrame(
      [(20000101, 1, "x"), (20000101, 2, "y")],
      ("time", "id", "v2"))
  def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")
  df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
      asof_join, schema="time int, id int, v1 double, v2 string"
  ).show()


  >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  +--------+---+---+---+
  |    time| id| v1| v2|
  +--------+---+---+---+
  |20000101|  1|1.0|  x|
  |20000102|  1|3.0|  x|
  |20000101|  2|2.0|  y|
  |20000102|  2|4.0|  y|
  +--------+---+---+---+
Logo

Python社区为您提供最前沿的新闻资讯和知识内容

更多推荐