val dataset = Seq((0, "hello"), (1, "world")).toDF("id", "text")
// Define a regular Scala function
val upper: String => String = _.toUpperCase
// Define a UDF that wraps the upper Scala function defined above
// You could also define the function in place, i.e. inside udf
// but separating Scala functions from Spark SQL's UDFs allows for easier testing
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
// Apply the UDF to change the source dataset
scala> dataset.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
UDFs — User-Defined Functions
User-Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.
Tip
|
Use the higher-level standard Column-based functions with Dataset operators whenever possible before reverting to using your own custom UDF functions since UDFs are a blackbox for Spark and so it does not even try to optimize them. As Reynold once said on Spark’s dev mailing list:
|
You define a new UDF by defining a Scala function as an input parameter of udf
function. It accepts Scala functions of up to 10 input parameters.
You can register UDFs to use in SQL-based query expressions via UDFRegistration
(that is available through SparkSession.udf
attribute).
val spark: SparkSession = ...
scala> spark.udf.register("myUpper", (input: String) => input.toUpperCase)
You can query for available standard and user-defined functions using the Catalog interface (that is available through SparkSession.catalog
attribute).
val spark: SparkSession = ...
scala> spark.catalog.listFunctions.filter('name like "%upper%").show(false)
+-------+--------+-----------+-----------------------------------------------+-----------+
|name |database|description|className |isTemporary|
+-------+--------+-----------+-----------------------------------------------+-----------+
|myupper|null |null |null |true |
|upper |null |null |org.apache.spark.sql.catalyst.expressions.Upper|true |
+-------+--------+-----------+-----------------------------------------------+-----------+
Note
|
UDFs play a vital role in Spark MLlib to define new Transformers that are function objects that transform DataFrames into DataFrames by introducing new columns.
|
udf Functions (in functions object)
udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction
...
udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction
org.apache.spark.sql.functions
object comes with udf
function to let you define a UDF for a Scala function f
.
val df = Seq(
(0, "hello"),
(1, "world")).toDF("id", "text")
// Define a "regular" Scala function
// It's a clone of upper UDF
val toUpper: String => String = _.toUpperCase
import org.apache.spark.sql.functions.udf
val upper = udf(toUpper)
scala> df.withColumn("upper", upper('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
// You could have also defined the UDF this way
val upperUDF = udf { s: String => s.toUpperCase }
// or even this way
val upperUDF = udf[String, String](_.toUpperCase)
scala> df.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
Tip
|
Define custom UDFs based on "standalone" Scala functions (e.g. toUpperUDF ) so you can test the Scala functions using Scala way (without Spark SQL’s "noise") and once they are defined reuse the UDFs in UnaryTransformers.
|
UDFs are Blackbox
Let’s review an example with an UDF. This example is converting strings of size 7 characters only and uses the Dataset
standard operators first and then custom UDF to do the same transformation.
scala> spark.conf.get("spark.sql.parquet.filterPushdown")
res0: String = true
You are going to use the following cities
dataset that is based on Parquet file (as used in Predicate Pushdown / Filter Pushdown for Parquet Data Source section). The reason for parquet is that it is an external data source that does support optimization Spark uses to optimize itself like predicate pushdown.
// no optimization as it is a more involved Scala function in filter
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name.length == 6).map(_.name.toUpperCase)
cities6chars.explain(true)
// or simpler when only concerned with PushedFilters attribute in Parquet
scala> cities6chars.queryExecution.optimizedPlan
res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248]
+- MapElements <function1>, class City, [StructField(id,LongType,false), StructField(name,StringType,true)], obj#247: java.lang.String
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class City), obj#246: City
+- Relation[id#236L,name#237] parquet
// no optimization for Dataset[City]?!
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name == "Warsaw").map(_.name.toUpperCase)
cities6chars.explain(true)
// The filter predicate is pushed down fine for Dataset's Column-based query in where operator
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Let's define a UDF to do the filtering
val isWarsaw = udf { (s: String) => s == "Warsaw" }
// Use the UDF in where (replacing the Column-based query)
scala> cities.where(isWarsaw('name)).queryExecution.executedPlan
res33: org.apache.spark.sql.execution.SparkPlan =
*Filter UDF(name#129)
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>