Logical Query Plan Analyzer

Analyzer is a logical query plan analyzer. It resolves unresolved attributes and relations to typed objects using information in a SessionCatalog and FunctionRegistry.

Analyzer is available at runtime through analyzer attribute of the current SparkSession.

sparkSession.sessionState.analyzer
Note
sessionState attribute in SparkSession is a private[sql] value so to access it your code has to be in org.apache.spark.sql package.

Analyzer is a RuleExecutor with CheckAnalysis and defines Substitution, Resolution, Nondeterministic, UDF, FixNullability, and Cleanup batches of rules to apply to a logical plan.

Analyzer uses a SessionCatalog, a CatalystConf, and a configurable number of iterations (as maxIterations).

Analyzer defines extendedResolutionRules attribute being a collection of rules (that process a LogicalPlan) as an extension point that a custom Analyzer can use to extend the Resolution batch. The collection of rules is added at the end of the Resolution batch.

You can access the result of executing Analyzer against the logical plan of a Dataset using explain method or QueryExecution:

val dataset = spark.range(5).withColumn("new_column", 'id + 5 as "plus5")

scala> dataset.explain(extended = true)
== Parsed Logical Plan ==
'Project [*, ('id + 5) AS plus5#148 AS new_column#149]
+- Range (0, 5, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint, new_column: bigint
Project [id#145L, (id#145L + cast(5 as bigint)) AS new_column#149L]
+- Range (0, 5, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#145L, (id#145L + 5) AS new_column#149L]
+- Range (0, 5, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#145L, (id#145L + 5) AS new_column#149L]
+- *Range (0, 5, step=1, splits=Some(8))

scala> dataset.queryExecution.analyzed
res14: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [id#145L, (id#145L + cast(5 as bigint)) AS new_column#149L]
+- Range (0, 5, step=1, splits=Some(8))
Tip

Enable TRACE or DEBUG logging level for org.apache.spark.sql.hive.HiveSessionState$$anon$1 (when Hive support is enabled) or org.apache.spark.sql.internal.SessionState$$anon$1 logger to see what happens inside Analyzer.

Add the following line to conf/log4j.properties:

# when Hive support is enabled
log4j.logger.org.apache.spark.sql.hive.HiveSessionState$$anon$1=TRACE

# with no Hive support
log4j.logger.org.apache.spark.sql.internal.SessionState$$anon$1=TRACE

Refer to Logging.


The reason for such a weird-looking logger name is that analyzer attribute is created as an anonymous subclass of Analyzer class.

RuleExecutor — Abstract Rule Executor

RuleExecutor can apply collection of optimization rules (as batches) to a plan tree.

Note
The plan tree can be logical plan or physical plan.

Scala-wise, RuleExecutor is an abstract type constructor parameterized by the TreeType type parameter.

abstract class RuleExecutor[TreeType <: TreeNode[_]]

RuleExecutor defines the protected batches method that implementations are supposed to define with the collection of Batch instances to execute.

protected def batches: Seq[Batch]

Applying Rules to Tree — execute Method

execute(plan: TreeType): TreeType

execute iterates over batches and applies rules sequentially to the input plan.

It tracks the number of iterations and the time of executing each rule (with a plan).

When a rule changes a plan, you should see the following TRACE message in the logs:

TRACE HiveSessionState$$anon$1:
=== Applying Rule [ruleName] ===
[currentAndModifiedPlansSideBySide]

After the number of iterations has reached the number of iterations for the batch’s Strategy it stops execution and prints out the following WARN message to the logs:

WARN HiveSessionState$$anon$1: Max iterations ([iteration]) reached for batch [batchName]

When the plan has not changed (after applying rules), you should see the following TRACE message in the logs and execute moves on to applying the rules in the next batch. The moment is called fixed point (i.e. when the execution converges).

TRACE HiveSessionState$$anon$1: Fixed point reached for batch [batchName] after [iteration] iterations.

After the batch finishes, if the plan has been changed by the rules, you should see the following DEBUG message in the logs:

DEBUG HiveSessionState$$anon$1:
=== Result of Batch [batchName] ===
[currentAndModifiedPlansSideBySide]

Otherwise, when the rules had no changes to a plan, you should see the following TRACE message in the logs:

TRACE HiveSessionState$$anon$1: Batch [batchName] has no effect.

Batch — Collection of Optimization Rules

A batch in Catalyst is a named collection of optimization rules with a strategy, e.g.

Batch("Substitution", fixedPoint,
  CTESubstitution,
  WindowsSubstitution,
  EliminateUnions,
  new SubstituteUnresolvedOrdinals(conf)),

A Strategy can be Once or FixedPoint (with a number of iterations).

Note
Once strategy is a FixedPoint strategy with one iteration.

Optimization Rule

An optimization rule in Catalyst is a named optimization that can be applied to a plan tree.

Rule abstract class defines ruleName attribute and a single method apply:

apply(plan: TreeType): TreeType
Note
TreeType is the type of the plan tree that a rule works with, e.g. LogicalPlan, SparkPlan or Expression.

results matching ""

    No results matching ""