scala> spark.range(10).select('id as 'asId).where('id === 4).explain
== Physical Plan ==
WholeStageCodegen
: +- Project [id#0L AS asId#3L]
: +- Filter (id#0L = 4)
: +- Range 0, 1, 8, 10, [id#0L]
Whole-Stage Code Generation (aka Whole-Stage CodeGen)
|
Note
|
Review SPARK-12795 Whole stage codegen to learn about the work to support it. |
Whole-Stage Code Generation (aka WholeStageCodegen or WholeStageCodegenExec) fuses multiple operators (as a subtree of plans that support codegen) together into a single Java function that is aimed at improving execution performance. It collapses a query into a single optimized function that eliminates virtual function calls and leverages CPU registers for intermediate data.
WholeStageCodegenExec case class works with a SparkPlan to produce a codegened pipeline. It is a unary node in SparkPlan with support for codegen.
|
Tip
|
Use Dataset.explain method to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
|
|
Tip
|
Consider using Debugging Query Execution facility to deep dive into whole stage codegen. |
SparkPlan plans with support for codegen extend CodegenSupport.
|
Note
|
Whole stage codegen is used by some modern massively parallel processing (MPP) databases to archive great performance. See Efficiently Compiling Efficient Query Plans for Modern Hardware (PDF). |
Whole stage codegen uses spark.sql.codegen.wholeStage setting to control…FIXME
|
Note
|
Janino is used to compile a Java source code into a Java class. |
Before a query is executed, CollapseCodegenStages case class is used to find the plans that support codegen and collapse them together as WholeStageCodegen. It is part of the sequence of rules QueryExecution.preparations that will be applied in order to the physical plan before execution.
CodegenSupport Contract
CodegenSupport is a custom SparkPlan for operators that support codegen.
It however allows custom implementations to optionally disable codegen using supportCodegen predicate (that defaults to true).
It assumes that custom implementations define:
-
doProduce(ctx: CodegenContext): String
Codegen Operators
SparkPlan plans that support codegen extend CodegenSupport.
-
ProjectExecforas -
FilterExecforwhereorfilter -
Range -
SampleExec for
sample -
RowDataSourceScanExec
|
Caution
|
FIXME Where is RowDataSourceScanExec used?
|
-
BatchedDataSourceScanExec -
ExpandExec -
BaseLimitExec -
SortExec -
WholeStageCodegenExecandInputAdapter -
TungstenAggregate -
SortMergeJoinExec
BroadcastHashJoinExec
BroadcastHashJoinExec variables are prefixed with bhj (see CodegenSupport.variablePrefix).
val ds = Seq((0,"playing"), (1, "with"), (2, "broadcast")).toDS
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res18: String = 10485760
scala> ds.join(ds).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#21, _2#22]
== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]
== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
+- LocalTableScan [_1#32, _2#33]
// Use broadcast function to mark the right-side Dataset
// eligible for broadcasting explicitly
scala> ds.join(broadcast(ds)).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#21, _2#22]
== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#43, _2#44]
== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#43, _2#44]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
+- LocalTableScan [_1#43, _2#44]
CollapseCodegenStages
CollapseCodegenStages is a Rule[SparkPlan], i.e. a transformation of SparkPlan into another SparkPlan.
|
Note
|
CollapseCodegenStages is used in the sequence of rules to apply to a SparkPlan before query execution.
|
It searches for sub-plans (aka stages) that support codegen and collapse them together as a WholeStageCodegen.
|
Note
|
Only CodegenSupport SparkPlans support codegen for which supportCodegen is enabled (true).
|
It is assumed that all Expression instances except CodegenFallback support codegen.
CollapseCodegenStages uses the internal setting spark.sql.codegen.maxFields (default: 200) to control the number of fields in input and output schemas before deactivating whole-stage codegen. It counts the fields included in complex types, i.e. StructType, MapType, ArrayType, UserDefinedType, and their combinations, recursively. See SPARK-14554.
It inserts InputAdapter leaf nodes in a SparkPlan recursively that is then used to generate code that consumes an RDD iterator of InternalRow.
BenchmarkWholeStageCodegen - Performance Benchmark
BenchmarkWholeStageCodegen class provides a benchmark to measure whole stage codegen performance.
You can execute it using the command:
build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
|
Note
|
You need to un-ignore tests in BenchmarkWholeStageCodegen by replacing ignore with test.
|
$ build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
...
Running benchmark: range/limit/sum
Running case: range/limit/sum codegen=false
22:55:23.028 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Running case: range/limit/sum codegen=true
Java HotSpot(TM) 64-Bit Server VM 1.8.0_77-b03 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
range/limit/sum codegen=false 376 / 433 1394.5 0.7 1.0X
range/limit/sum codegen=true 332 / 388 1581.3 0.6 1.1X
[info] - range/limit/sum (10 seconds, 74 milliseconds)