Logical Query Plan Optimizer

Catalyst is a Spark SQL framework for manipulating trees. It can work with trees of relational operators and expressions in logical plans before they end up as physical execution plans.

scala> sql("select 1 + 1 + 1").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(((1 + 1) + 1), None)]
+- OneRowRelation$

== Analyzed Logical Plan ==
((1 + 1) + 1): int
Project [((1 + 1) + 1) AS ((1 + 1) + 1)#4]
+- OneRowRelation$

== Optimized Logical Plan ==
Project [3 AS ((1 + 1) + 1)#4]
+- OneRowRelation$

== Physical Plan ==
*Project [3 AS ((1 + 1) + 1)#4]
+- Scan OneRowRelation[]

Spark 2.0 uses Catalyst’s tree manipulation library to build an extensible query plan optimizer with a number of query optimizations.

Catalyst supports both rule-based and cost-based optimization.

Collection of Batches — batches Method

batches: Seq[Batch]

batches returns optimization batches.

Table 1. (A subset of) Catalyst Plan Optimizer’s Logical Plan Optimizations
Name Description

Constant Folding

Predicate Pushdown

Nullability (NULL Value) Propagation

Vectorized Parquet Decoder

Combine Typed Filters

Propagate Empty Relation

Simplify Casts

Column Pruning

GetCurrentDatabase / ComputeCurrentTime

Eliminate Serialization

SparkOptimizer — The Default Logical Query Plan Optimizer

SparkOptimizer is the default logical query plan optimizer that is available as optimizer attribute of SessionState.

sparkSession.sessionState.optimizer
Note
SparkOptimizer is merely used to compute the optimized LogicalPlan for a QueryExecution (available as optimizedPlan).

SparkOptimizer requires a SessionCatalog, a SQLConf and ExperimentalMethods with user-defined experimental methods.

Note
SparkOptimizer's input experimentalMethods serves an extension point for custom ExperimentalMethods.

SparkOptimizer extends the Optimizer batches with the following batches:

  1. Optimize Metadata Only Query (as OptimizeMetadataOnlyQuery)

  2. Extract Python UDF from Aggregate (as ExtractPythonUDFFromAggregate)

  3. Prune File Source Table Partitions (as PruneFileSourcePartitions)

  4. User Provided Optimizers for the input user-defined ExperimentalMethods

You can see the result of executing SparkOptimizer on a query plan using optimizedPlan attribute of QueryExecution.

// Applying two filter in sequence on purpose
// We want to kick CombineTypedFilters optimizer in
val dataset = spark.range(10).filter(_ % 2 == 0).filter(_ == 0)

// optimizedPlan is a lazy value
// Only at the first time you call it you will trigger optimizations
// Next calls end up with the cached already-optimized result
// Use explain to trigger optimizations again
scala> dataset.queryExecution.optimizedPlan
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 10, step=1, splits=Some(8))
Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.sql.execution.SparkOptimizer logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.SparkOptimizer=TRACE

Refer to Logging.

results matching ""

    No results matching ""