val dataset = spark.range(10).withColumn("bucket", 'id % 3)
import org.apache.spark.sql.expressions.Window
val rankCol = rank over Window.partitionBy('bucket).orderBy('id) as "rank"
val ranked = dataset.withColumn("rank", rankCol)
scala> ranked.explain(true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Project [id#73L, bucket#76L, rank#192] Project [id#73L, bucket#76L, rank#192]
!+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, bucket#76L, rank#82 AS rank#192]
+- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]
! +- Project [id#73L, bucket#76L] +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L]
! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] +- Range (0, 10, step=1, splits=Some(8))
! +- Range (0, 10, step=1, splits=Some(8))
...
TRACE SparkOptimizer: Fixed point reached for batch Operator Optimizations after 2 iterations.
DEBUG SparkOptimizer:
=== Result of Batch Operator Optimizations ===
!Project [id#73L, bucket#76L, rank#192] Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC]
!+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, (id#73L % 3) AS bucket#76L]
! +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Range (0, 10, step=1, splits=Some(8))
! +- Project [id#73L, bucket#76L]
! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L]
! +- Range (0, 10, step=1, splits=Some(8))
...
Column Pruning Optimization Rule
ColumnPruning
is a LogicalPlan rule in Operator Optimizations
batch in the base Optimizer.
Example 1
Example 2
// the business object
case class Person(id: Long, name: String, city: String)
// the dataset to query over
val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS
// the query
// Note that we work with names only (out of 3 attributes in Person)
val query = dataset.groupBy(upper('name) as 'name).count
scala> query.explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
!+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126]
! +- LocalRelation [id#125L, name#126, city#127]
...
== Parsed Logical Plan ==
'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Analyzed Logical Plan ==
name: string, count: bigint
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Optimized Logical Plan ==
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [name#126]
== Physical Plan ==
*HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L])
+- Exchange hashpartitioning(upper(name#126)#171, 200)
+- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L])
+- LocalTableScan [name#126]