pyspark.sql.PandasCogroupedOps.applyInArrow#

PandasCogroupedOps.applyInArrow(func, schema)[source]#

Applies a function to each cogroup using Arrow and returns the result as a DataFrame.

The function should take two pyarrow.Tables and return another pyarrow.Table. Alternatively, the user can pass a function that takes a tuple of pyarrow.Scalar grouping key(s) and the two pyarrow.Tables. For each side of the cogroup, all columns are passed together as a pyarrow.Table to the user-function and the returned pyarrow.Table are combined as a DataFrame.

The schema should be a StructType describing the schema of the returned pyarrow.Table. The column labels of the returned pyarrow.Table must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. The length of the returned pyarrow.Table can be arbitrary.

New in version 4.0.0.

Parameters
funcfunction

a Python native function that takes two pyarrow.Tables, and outputs a pyarrow.Table, or that takes one tuple (grouping keys) and two pyarrow.Tables, and outputs a pyarrow.Table.

schemapyspark.sql.types.DataType or str

the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

Notes

This function requires a full shuffle. All the data of a cogroup will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

This API is unstable, and for developers.

Examples

>>> import pyarrow  
>>> df1 = spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
>>> df2 = spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
>>> def summarize(l, r):
...     return pyarrow.Table.from_pydict({
...         "left": [l.num_rows],
...         "right": [r.num_rows]
...     })
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
...     summarize, schema="left long, right long"
... ).show()  
+----+-----+
|left|right|
+----+-----+
|   2|    1|
|   2|    1|
+----+-----+

Alternatively, the user can define a function that takes three arguments. In this case, the grouping key(s) will be passed as the first argument and the data will be passed as the second and third arguments. The grouping key(s) will be passed as a tuple of Arrow scalars types, e.g., pyarrow.Int32Scalar and pyarrow.FloatScalar. The data will still be passed in as two pyarrow.Tables containing all columns from the original Spark DataFrames.

>>> def summarize(key, l, r):
...     return pyarrow.Table.from_pydict({
...         "key": [key[0].as_py()],
...         "left": [l.num_rows],
...         "right": [r.num_rows]
...     })
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
...     summarize, schema="key long, left long, right long"
... ).show()  
+---+----+-----+
|key|left|right|
+---+----+-----+
|  1|   2|    1|
|  2|   2|    1|
+---+----+-----+