import org.apache.spark.sql.functions._
Standard Functions — functions
object
org.apache.spark.sql.functions
object offers many functions to work with Columns in Datasets.
Note
|
The functions object is an experimental feature of Spark since version 1.3.0.
|
You can access the functions using the following import
statement:
There are over 300 functions in the functions
object. Some functions are transformations of Column objects (or column names) into other Column
objects or transform DataFrame
into DataFrame
.
The functions are grouped by functional areas:
Functions | Description | |
---|---|---|
Window functions |
Ranking records per window partition |
|
Sequential numbering per window partition |
||
Cumulative distribution of records across window partitions |
||
-
Aggregate functions
-
Non-aggregate functions (aka normal functions)
-
Date and time functions
-
…and others
Tip
|
You should read the official documentation of the functions object. |
explode
Caution
|
FIXME |
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show
+---------+---+
| array|num|
+---------+---+
|[0, 1, 2]| 0|
|[0, 1, 2]| 1|
|[0, 1, 2]| 2|
+---------+---+
Ranking Records per Window Partition — rank
functions
rank(): Column
dense_rank(): Column
percent_rank(): Column
rank
functions assign the sequential rank of each distinct value per window partition. They are equivalent to RANK
, DENSE_RANK
and PERCENT_RANK
functions in the good ol' SQL.
val dataset = spark.range(9).withColumn("bucket", 'id % 3)
import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
| 0| 0| 1|
| 3| 0| 2|
| 6| 0| 3|
| 1| 1| 1|
| 4| 1| 2|
| 7| 1| 3|
| 2| 2| 1|
| 5| 2| 2|
| 8| 2| 3|
+---+------+----+
scala> dataset.withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
| 0| 0| 0.0|
| 3| 0| 0.5|
| 6| 0| 1.0|
| 1| 1| 0.0|
| 4| 1| 0.5|
| 7| 1| 1.0|
| 2| 2| 0.0|
| 5| 2| 0.5|
| 8| 2| 1.0|
+---+------+------------+
rank
function assigns the same rank for duplicate rows with a gap in the sequence (similarly to Olympic medal places). dense_rank
is like rank
for duplicate rows but compacts the ranks and removes the gaps.
// rank function with duplicates
// Note the missing/sparse ranks, i.e. 2 and 4
scala> dataset.union(dataset).withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
| 0| 0| 1|
| 0| 0| 1|
| 3| 0| 3|
| 3| 0| 3|
| 6| 0| 5|
| 6| 0| 5|
| 1| 1| 1|
| 1| 1| 1|
| 4| 1| 3|
| 4| 1| 3|
| 7| 1| 5|
| 7| 1| 5|
| 2| 2| 1|
| 2| 2| 1|
| 5| 2| 3|
| 5| 2| 3|
| 8| 2| 5|
| 8| 2| 5|
+---+------+----+
// dense_rank function with duplicates
// Note that the missing ranks are now filled in
scala> dataset.union(dataset).withColumn("dense_rank", dense_rank over byBucket).show
+---+------+----------+
| id|bucket|dense_rank|
+---+------+----------+
| 0| 0| 1|
| 0| 0| 1|
| 3| 0| 2|
| 3| 0| 2|
| 6| 0| 3|
| 6| 0| 3|
| 1| 1| 1|
| 1| 1| 1|
| 4| 1| 2|
| 4| 1| 2|
| 7| 1| 3|
| 7| 1| 3|
| 2| 2| 1|
| 2| 2| 1|
| 5| 2| 2|
| 5| 2| 2|
| 8| 2| 3|
| 8| 2| 3|
+---+------+----------+
// percent_rank function with duplicates
scala> dataset.union(dataset).withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
| 0| 0| 0.0|
| 0| 0| 0.0|
| 3| 0| 0.4|
| 3| 0| 0.4|
| 6| 0| 0.8|
| 6| 0| 0.8|
| 1| 1| 0.0|
| 1| 1| 0.0|
| 4| 1| 0.4|
| 4| 1| 0.4|
| 7| 1| 0.8|
| 7| 1| 0.8|
| 2| 2| 0.0|
| 2| 2| 0.0|
| 5| 2| 0.4|
| 5| 2| 0.4|
| 8| 2| 0.8|
| 8| 2| 0.8|
+---+------+------------+
Cumulative Distribution of Records Across Window Partitions — cume_dist
function
cume_dist(): Column
cume_dist
computes the cumulative distribution of the records in window partitions. This is equivalent to SQL’s CUME_DIST
function.
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("cume_dist", cume_dist over windowSpec).show
+---+------+------------------+
| id|bucket| cume_dist|
+---+------+------------------+
| 0| 0|0.3333333333333333|
| 3| 0|0.6666666666666666|
| 6| 0| 1.0|
| 1| 1|0.3333333333333333|
| 4| 1|0.6666666666666666|
| 7| 1| 1.0|
| 2| 2|0.3333333333333333|
| 5| 2|0.6666666666666666|
| 8| 2| 1.0|
+---+------+------------------+
lag
functions
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
lag(e: Column, offset: Int, defaultValue: Any): Column
lag
returns the value in e
/ columnName
column that is offset
records before the current record. lag
returns null
value if the number of records in a window partition is less than offset
or defaultValue
.
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lag", lag('id, 1) over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
| 0| 0|null|
| 3| 0| 0|
| 6| 0| 3|
| 1| 1|null|
| 4| 1| 1|
| 7| 1| 4|
| 2| 2|null|
| 5| 2| 2|
| 8| 2| 5|
+---+------+----+
scala> dataset.withColumn("lag", lag('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
| 0| 0|null|
| 3| 0|null|
| 6| 0| 0|
| 1| 1|null|
| 4| 1|null|
| 7| 1| 1|
| 2| 2|null|
| 5| 2|null|
| 8| 2| 2|
+---+------+----+
Caution
|
FIXME It looks like lag with a default value has a bug — the default value’s not used at all.
|
lead
functions
lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column
lead
returns the value that is offset
records after the current records, and defaultValue
if there is less than offset
records after the current record. lag
returns null
value if the number of records in a window partition is less than offset
or defaultValue
.
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lead", lead('id, 1) over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
| 0| 0| 0|
| 0| 0| 3|
| 3| 0| 3|
| 3| 0| 6|
| 6| 0| 6|
| 6| 0|null|
| 1| 1| 1|
| 1| 1| 4|
| 4| 1| 4|
| 4| 1| 7|
| 7| 1| 7|
| 7| 1|null|
| 2| 2| 2|
| 2| 2| 5|
| 5| 2| 5|
| 5| 2| 8|
| 8| 2| 8|
| 8| 2|null|
+---+------+----+
scala> dataset.withColumn("lead", lead('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
| 0| 0| 3|
| 0| 0| 3|
| 3| 0| 6|
| 3| 0| 6|
| 6| 0|null|
| 6| 0|null|
| 1| 1| 4|
| 1| 1| 4|
| 4| 1| 7|
| 4| 1| 7|
| 7| 1|null|
| 7| 1|null|
| 2| 2| 5|
| 2| 2| 5|
| 5| 2| 8|
| 5| 2| 8|
| 8| 2|null|
| 8| 2|null|
+---+------+----+
Caution
|
FIXME It looks like lead with a default value has a bug — the default value’s not used at all.
|
Sequential numbering per window partition — row_number
functions
row_number(): Column
row_number
returns a sequential number starting at 1
within a window partition.
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("row_number", row_number() over windowSpec).show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
| 0| 0| 1|
| 0| 0| 2|
| 3| 0| 3|
| 3| 0| 4|
| 6| 0| 5|
| 6| 0| 6|
| 1| 1| 1|
| 1| 1| 2|
| 4| 1| 3|
| 4| 1| 4|
| 7| 1| 5|
| 7| 1| 6|
| 2| 2| 1|
| 2| 2| 2|
| 5| 2| 3|
| 5| 2| 4|
| 8| 2| 5|
| 8| 2| 6|
+---+------+----------+
ntile
function
ntile(n: Int): Column
ntile
computes the ntile group id (from 1
to n
inclusive) in an ordered window partition.
val dataset = spark.range(7).select('*, 'id % 3 as "bucket")
import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
scala> dataset.select('*, ntile(3) over byBuckets as "ntile").show
+---+------+-----+
| id|bucket|ntile|
+---+------+-----+
| 0| 0| 1|
| 3| 0| 2|
| 6| 0| 3|
| 1| 1| 1|
| 4| 1| 2|
| 2| 2| 1|
| 5| 2| 2|
+---+------+-----+
Caution
|
FIXME How is ntile different from rank ? What about performance?
|
Creating Columns — col
and column
methods
col(colName: String): Column
column(colName: String): Column
col
and column
methods create a Column that you can later use to reference a column in a dataset.
import org.apache.spark.sql.functions._
scala> val nameCol = col("name")
nameCol: org.apache.spark.sql.Column = name
scala> val cityCol = column("city")
cityCol: org.apache.spark.sql.Column = city
Defining UDFs (udf factories)
udf(f: FunctionN[...]): UserDefinedFunction
The udf
family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f
function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f
).
import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)
// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")
// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
| 0| 1|
| 1| 1|
| 2| 1|
| 3| 1|
+---+---+
Since Spark 2.0.0, there is another variant of udf
function:
udf(f: AnyRef, dataType: DataType): UserDefinedFunction
udf(f: AnyRef, dataType: DataType)
allows you to use a Scala closure for the function argument (as f
) and explicitly declaring the output data type (as dataType
).
// given the dataframe above
import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)
scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
+---+---+
String functions
split function
split(str: Column, pattern: String): Column
split
function splits str
column using pattern
. It returns a new Column
.
Note
|
split UDF uses java.lang.String.split(String regex, int limit) method.
|
val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))
scala> withSplit.show
+---+-------------+----------------+
|num| input| split|
+---+-------------+----------------+
| 0| hello|world| [hello, world]|
| 1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
|
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.
|
upper function
upper(e: Column): Column
upper
function converts a string column into one with all letter upper. It returns a new Column
.
Note
|
The following example uses two functions that accept a Column and return another to showcase how to chain them.
|
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))
scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
| 0| 1|hello|OLLEH|
| 2| 3|world|DLROW|
| 2| 4| ala| ALA|
+---+---+-----+-----+
Non-aggregate functions
They are also called normal functions.
struct functions
struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column
struct
family of functions allows you to create a new struct column based on a collection of Column
or their names.
Note
|
The difference between struct and another similar array function is that the types of the columns can be different (in struct ).
|
scala> df.withColumn("struct", struct($"name", $"val")).show
+---+---+-----+---------+
| id|val| name| struct|
+---+---+-----+---------+
| 0| 1|hello|[hello,1]|
| 2| 3|world|[world,3]|
| 2| 4| ala| [ala,4]|
+---+---+-----+---------+
broadcast function
broadcast[T](df: Dataset[T]): Dataset[T]
broadcast
function marks the input Dataset small enough to be used in broadcast join
.
Tip
|
Consult Broadcast Join document. |
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]
scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('token))
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Optimized Logical Plan ==
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- Filter isnotnull(_2#43)
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- Filter isnotnull(_1#55)
+- LocalRelation [_1#55, _2#56]
== Physical Plan ==
*Project [token#46, id#45, prob#59]
+- *BroadcastHashJoin [token#46], [token#58], Inner, BuildRight
:- *Project [_1#42 AS id#45, _2#43 AS token#46]
: +- *Filter isnotnull(_2#43)
: +- LocalTableScan [_1#42, _2#43]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#55 AS token#58, _2#56 AS prob#59]
+- *Filter isnotnull(_1#55)
+- LocalTableScan [_1#55, _2#56]
expr function
expr(expr: String): Column
expr
function parses the input expr
SQL string to a Column
it represents.
val ds = Seq((0, "hello"), (1, "world"))
.toDF("id", "token")
.as[(Long, String)]
scala> ds.show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
| 1|world|
+---+-----+
val filterExpr = expr("token = 'hello'")
scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
+---+-----+
Internally, expr
uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.
count
Caution
|
FIXME |
window
Caution
|
FIXME |