I am trying to apply a user defined function to Window in PySpark. I have read that UDAF might be the way to to go, but I was not able to find anything concrete.
To give an example (taken from here: Xinh's Tech Blog and modified for PySpark):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
I am applying avg
which is already built into pyspark.sql.functions
, but if instead of avg
I wanted to use something of more complicated and write my own function, how would I do that?
Spark >= 3.0:
SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.
Spark >= 2.4:
SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is
return_type: DataType
@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...
w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('foo', f('bar').over(w))
Please see the doctests and the unit tests for detailed examples.
Spark < 2.4
You cannot. Window functions require UserDefinedAggregateFunction
or equivalent object, not UserDefinedFunction
, and it is not possible to define one in PySpark.
However, in PySpark 2.3 or later, you can define vectorized pandas_udf
, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling
. Furthermore function used with GroupedData.apply
can return arbitrary number of rows.
You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.