Query Execution

QueryExecution is a part of the public Dataset API and represents the query execution that will eventually produce the data in a Dataset.

Note

You can access the QueryExecution of a Dataset using queryExecution attribute.

val ds: Dataset[Long] = ...
ds.queryExecution

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a LogicalPlan or use the QueryExecution after executing the LogicalPlan).

Table 1. QueryExecution Lazy Attributes
Attribute Description

analyzed

Result of applying the Analyzer's rules to the LogicalPlan (of the QueryExecution).

withCachedData

LogicalPlan that is the analyzed plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.

optimizedPlan

LogicalPlan (of a structured query) being the result of executing the session-owned Catalyst Query Optimizer to withCachedData.

sparkPlan

SparkPlan that is the result of requesting SparkPlanner to plan a optimized logical query plan.

NOTE: In fact, the result SparkPlan is the first Spark query plan from the collection of possible query plans from SparkPlanner.

executedPlan

SparkPlan ready for execution. It is the sparkPlan plan with all the preparation rules applied.

toRdd

RDD[InternalRow] that is the result of executing executedPlan plan, i.e. executedPlan.execute().

TIP: InternalRow is the internal optimized binary row format.

Note

You can access the lazy attributes as follows:

val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

A streaming variant of QueryExecution is IncrementalExecution.

debug package object contains methods for debugging query execution that you can use to do the full analysis of your queries (as Dataset objects).

Caution
FIXME What’s planner? analyzed? Why do we need assertSupported?

It belongs to org.apache.spark.sql.execution package.

Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 5, [id#39L]

hiveResultString Method

hiveResultString(): Seq[String]

hiveResultString returns the result as a Hive-compatible sequence of strings.

scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)

scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id	name	age, 0	Jacek	42)
Caution
FIXME

Internally, hiveResultString does..

Note
hiveResultString is executed when…​

Creating QueryExecution Instance

class QueryExecution(
  val sparkSession: SparkSession,
  val logical: LogicalPlan)

QueryExecution requires a SparkSession and a LogicalPlan.

Accessing SparkPlanner — planner Method

planner: SparkPlanner

planner returns the current SparkPlanner.

planner is merely to expose internal planner (in the current SessionState).

preparations — SparkPlan Optimization Rules (to apply before Query Execution)

preparations is a sequence of SparkPlan optimization rules.

Tip
A SparkPlan optimization rule transforms a SparkPlan to another SparkPlan.

This collection is an intermediate phase of query execution that developers can used to introduce further optimizations.

The current list of SparkPlan transformations in preparations is as follows:

  1. ExtractPythonUDFs

  2. PlanSubqueries

  3. EnsureRequirements

  4. CollapseCodegenStages

  5. ReuseExchange

  6. ReuseSubquery

Note
The transformation rules applied in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is accessed.

IncrementalExecution

IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.

It lives in org.apache.spark.sql.execution.streaming package.

Caution
FIXME What is stateStrategy?

Stateful operators in the query plan are numbered using operatorId that starts with 0.

IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.

Caution
FIXME What does IncrementalExecution do? Where is it used?

results matching ""

    No results matching ""