pyspark.sql.functions.pandas_udf#
- pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)[source]#
Creates a pandas user defined function (a.k.a. vectorized user defined function).
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.
New in version 2.3.0.
Changed in version 3.4.0: Supports Spark Connect.
Changed in version 4.0.0: Supports keyword-arguments in SCALAR and GROUPED_AGG type.
- Parameters
- ffunction, optional
user-defined function. A python function if used as a standalone function
- returnType
pyspark.sql.types.DataType
or str, optional the return type of the user-defined function. The value can be either a
pyspark.sql.types.DataType
object or a DDL-formatted type string.- functionTypeint, optional
an enum value in
pyspark.sql.functions.PandasUDFType
. Default: SCALAR. This parameter exists for compatibility. Using Python type hints is encouraged.
See also
Notes
The user-defined functions do not support conditional expressions or short circuiting in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions.
The user-defined functions do not take keyword arguments on the calling side.
The data type of returned pandas.Series from the user-defined functions should be matched with defined returnType (see
types.to_arrow_type()
andtypes.from_arrow_type()
). When there is mismatch between them, Spark might do conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users.Currently,
pyspark.sql.types.ArrayType
ofpyspark.sql.types.TimestampType
and nestedpyspark.sql.types.StructType
are currently not supported as output types.Examples
In order to use this API, customarily the below are imported:
>>> import pandas as pd >>> from pyspark.sql.functions import pandas_udf
From Spark 3.0 with Python 3.6+, Python type hints detect the function types as below:
>>> @pandas_udf(IntegerType()) ... def slen(s: pd.Series) -> pd.Series: ... return s.str.len()
Prior to Spark 3.0, the pandas UDF used functionType to decide the execution type as below:
>>> from pyspark.sql.functions import PandasUDFType >>> from pyspark.sql.types import IntegerType >>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR) ... def slen(s): ... return s.str.len()
It is preferred to specify type hints for the pandas UDF instead of specifying pandas UDF type via functionType which will be deprecated in the future releases.
Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of
pyspark.sql.types.StructType
. The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:>>> @pandas_udf("col1 string, col2 long") >>> def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame: ... s3['col2'] = s1 + s2.str.len() ... return s3 ... >>> # Create a Spark DataFrame that has three columns including a struct column. ... df = spark.createDataFrame( ... [[1, "a string", ("a nested string",)]], ... "long_col long, string_col string, struct_col struct<col1:string>") >>> df.printSchema() root |-- long_column: long (nullable = true) |-- string_column: string (nullable = true) |-- struct_column: struct (nullable = true) | |-- col1: string (nullable = true) >>> df.select(func("long_col", "string_col", "struct_col")).printSchema() |-- func(long_col, string_col, struct_col): struct (nullable = true) | |-- col1: string (nullable = true) | |-- col2: long (nullable = true)
In the following sections, it describes the combinations of the supported type hints. For simplicity, pandas.DataFrame variant is omitted.
- Series to Series
pandas.Series, … -> pandas.Series
The function takes one or more pandas.Series and outputs one pandas.Series. The output of the function should always be of the same length as the input.
>>> @pandas_udf("string") ... def to_upper(s: pd.Series) -> pd.Series: ... return s.str.upper() ... >>> df = spark.createDataFrame([("John Doe",)], ("name",)) >>> df.select(to_upper("name")).show() +--------------+ |to_upper(name)| +--------------+ | JOHN DOE| +--------------+
>>> @pandas_udf("first string, last string") ... def split_expand(s: pd.Series) -> pd.DataFrame: ... return s.str.split(expand=True) ... >>> df = spark.createDataFrame([("John Doe",)], ("name",)) >>> df.select(split_expand("name")).show() +------------------+ |split_expand(name)| +------------------+ | {John, Doe}| +------------------+
This type of Pandas UDF can use keyword arguments:
>>> @pandas_udf(returnType=IntegerType()) ... def calc(a: pd.Series, b: pd.Series) -> pd.Series: ... return a + 10 * b ... >>> spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show() +-----------------------------+ |calc(b => (id * 10), a => id)| +-----------------------------+ | 0| | 101| +-----------------------------+
Note
The length of the input is not that of the whole input column, but is the length of an internal batch used for each call to the function.
- Iterator of Series to Iterator of Series
Iterator[pandas.Series] -> Iterator[pandas.Series]
The function takes an iterator of pandas.Series and outputs an iterator of pandas.Series. In this case, the created pandas UDF instance requires one input column when this is called as a PySpark column. The length of the entire output from the function should be the same length of the entire input; therefore, it can prefetch the data from the input iterator as long as the lengths are the same.
It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case. The pseudocode below illustrates the example.
@pandas_udf("long") def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: # Do some expensive initialization with a state state = very_expensive_initialization() for x in iterator: # Use that state for whole iterator. yield calculate_with_state(x, state) df.select(calculate("value")).show()
>>> from typing import Iterator >>> @pandas_udf("long") ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: ... for s in iterator: ... yield s + 1 ... >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) >>> df.select(plus_one(df.v)).show() +-----------+ |plus_one(v)| +-----------+ | 2| | 3| | 4| +-----------+
Note
The length of each series is the length of a batch internally used.
- Iterator of Multiple Series to Iterator of Series
Iterator[Tuple[pandas.Series, …]] -> Iterator[pandas.Series]
The function takes an iterator of a tuple of multiple pandas.Series and outputs an iterator of pandas.Series. In this case, the created pandas UDF instance requires input columns as many as the series when this is called as a PySpark column. Otherwise, it has the same characteristics and restrictions as Iterator of Series to Iterator of Series case.
>>> from typing import Iterator, Tuple >>> from pyspark.sql.functions import struct, col >>> @pandas_udf("long") ... def multiply(iterator: Iterator[Tuple[pd.Series, pd.DataFrame]]) -> Iterator[pd.Series]: ... for s1, df in iterator: ... yield s1 * df.v ... >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) >>> df.withColumn('output', multiply(col("v"), struct(col("v")))).show() +---+------+ | v|output| +---+------+ | 1| 1| | 2| 4| | 3| 9| +---+------+
Note
The length of each series is the length of a batch internally used.
- Series to Scalar
pandas.Series, … -> Any
The function takes pandas.Series and returns a scalar value. The returnType should be a primitive data type, and the returned scalar can be either a python primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64. Any should ideally be a specific scalar type accordingly.
>>> @pandas_udf("double") ... def mean_udf(v: pd.Series) -> float: ... return v.mean() ... >>> df = spark.createDataFrame( ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) >>> df.groupby("id").agg(mean_udf(df['v'])).show() +---+-----------+ | id|mean_udf(v)| +---+-----------+ | 1| 1.5| | 2| 6.0| +---+-----------+
This type of Pandas UDF can use keyword arguments:
>>> @pandas_udf("double") ... def weighted_mean_udf(v: pd.Series, w: pd.Series) -> float: ... import numpy as np ... return np.average(v, weights=w) ... >>> df = spark.createDataFrame( ... [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), (2, 10.0, 3.0)], ... ("id", "v", "w")) >>> df.groupby("id").agg(weighted_mean_udf(w=df["w"], v=df["v"])).show() +---+---------------------------------+ | id|weighted_mean_udf(w => w, v => v)| +---+---------------------------------+ | 1| 1.6666666666666667| | 2| 7.166666666666667| +---+---------------------------------+
This UDF can also be used as window functions as below:
>>> from pyspark.sql import Window >>> @pandas_udf("double") ... def mean_udf(v: pd.Series) -> float: ... return v.mean() ... >>> df = spark.createDataFrame( ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) >>> w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0) >>> df.withColumn('mean_v', mean_udf("v").over(w)).show() +---+----+------+ | id| v|mean_v| +---+----+------+ | 1| 1.0| 1.0| | 1| 2.0| 1.5| | 2| 3.0| 3.0| | 2| 5.0| 4.0| | 2|10.0| 7.5| +---+----+------+
Note
For performance reasons, the input series to window functions are not copied. Therefore, mutating the input series is not allowed and will cause incorrect results. For the same reason, users should also not rely on the index of the input series.