How do I call a UDF on a Spark DataFrame using JAVA?

Kai picture Kai · Feb 11, 2016 · Viewed 27.7k times · Source

Similar question as here, but don't have enough points to comment there.

According to the latest Spark documentation an udf can be used in two different ways, one with SQL and another with a DataFrame. I found multiple examples of how to use an udf with sql, but have not been able to find any on how to use a udf directly on a DataFrame.

The solution provided by the o.p. on the question linked above uses __callUDF()__ which is _deprecated_ and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:

"since it's redundant with udf()"

so this means I should be able to use __udf()__ to cal a my udf, but I can't figure out how to do that. I have not stumbled on anything that spells out the syntax for Java-Spark programs. What am I missing?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

Answer

zero323 picture zero323 · Feb 11, 2016

Spark >= 2.3

Scala-style udf can be invoked directly:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

Spark < 2.3

Even if we assume that your UDF is useful and cannot be replaced by a simple getItem call it has incorrect signature. Array columns are exposed using Scala WrappedArray not plain Java Arrays so you have to adjust the signature:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

If UDF is already registered:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

you can simply use callUDF (which is a new function introduced in 1.5) to call it by name:

df.select(callUDF("mode", col("vs"))).show();

You can also use it in selectExprs:

df.selectExpr("mode(vs)").show();