scala> sql("select 1 + 1 + 1").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(((1 + 1) + 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
((1 + 1) + 1): int
Project [((1 + 1) + 1) AS ((1 + 1) + 1)#4]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [3 AS ((1 + 1) + 1)#4]
+- OneRowRelation$
== Physical Plan ==
*Project [3 AS ((1 + 1) + 1)#4]
+- Scan OneRowRelation[]
Logical Query Plan Optimizer
Catalyst is a Spark SQL framework for manipulating trees. It can work with trees of relational operators and expressions in logical plans before they end up as physical execution plans.
Spark 2.0 uses Catalyst’s tree manipulation library to build an extensible query plan optimizer with a number of query optimizations.
Catalyst supports both rule-based and cost-based optimization.
Collection of Batches — batches
Method
batches: Seq[Batch]
batches
returns optimization batches.
Name | Description |
---|---|
SparkOptimizer — The Default Logical Query Plan Optimizer
SparkOptimizer
is the default logical query plan optimizer that is available as optimizer
attribute of SessionState
.
sparkSession.sessionState.optimizer
Note
|
SparkOptimizer is merely used to compute the optimized LogicalPlan for a QueryExecution (available as optimizedPlan ).
|
SparkOptimizer
requires a SessionCatalog, a SQLConf and ExperimentalMethods
with user-defined experimental methods.
Note
|
SparkOptimizer 's input experimentalMethods serves an extension point for custom ExperimentalMethods .
|
SparkOptimizer
extends the Optimizer
batches with the following batches:
-
Optimize Metadata Only Query (as
OptimizeMetadataOnlyQuery
) -
Extract Python UDF from Aggregate (as
ExtractPythonUDFFromAggregate
) -
Prune File Source Table Partitions (as
PruneFileSourcePartitions
) -
User Provided Optimizers for the input user-defined
ExperimentalMethods
You can see the result of executing SparkOptimizer
on a query plan using optimizedPlan
attribute of QueryExecution
.
// Applying two filter in sequence on purpose
// We want to kick CombineTypedFilters optimizer in
val dataset = spark.range(10).filter(_ % 2 == 0).filter(_ == 0)
// optimizedPlan is a lazy value
// Only at the first time you call it you will trigger optimizations
// Next calls end up with the cached already-optimized result
// Use explain to trigger optimizations again
scala> dataset.queryExecution.optimizedPlan
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 10, step=1, splits=Some(8))
Tip
|
Enable Add the following line to
Refer to Logging. |
Further reading or watching
-
(video) Modern Spark DataFrame and Dataset (Intermediate Tutorial) by Adam Breindel from Databricks.