Whole-Stage Code Generation (aka Whole-Stage CodeGen)

Note
Review SPARK-12795 Whole stage codegen to learn about the work to support it.

Whole-Stage Code Generation (aka WholeStageCodegen or WholeStageCodegenExec) fuses multiple operators (as a subtree of plans that support codegen) together into a single Java function that is aimed at improving execution performance. It collapses a query into a single optimized function that eliminates virtual function calls and leverages CPU registers for intermediate data.

WholeStageCodegenExec case class works with a SparkPlan to produce a codegened pipeline. It is a unary node in SparkPlan with support for codegen.

Tip
Use Dataset.explain method to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
Tip
Consider using Debugging Query Execution facility to deep dive into whole stage codegen.
scala> spark.range(10).select('id as 'asId).where('id === 4).explain
== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L AS asId#3L]
:     +- Filter (id#0L = 4)
:        +- Range 0, 1, 8, 10, [id#0L]

SparkPlan plans with support for codegen extend CodegenSupport.

Note
Whole stage codegen is used by some modern massively parallel processing (MPP) databases to archive great performance. See Efficiently Compiling Efficient Query Plans for Modern Hardware (PDF).

Whole stage codegen uses spark.sql.codegen.wholeStage setting to control…​FIXME

Note
Janino is used to compile a Java source code into a Java class.

Before a query is executed, CollapseCodegenStages case class is used to find the plans that support codegen and collapse them together as WholeStageCodegen. It is part of the sequence of rules QueryExecution.preparations that will be applied in order to the physical plan before execution.

CodegenSupport Contract

CodegenSupport is a custom SparkPlan for operators that support codegen.

It however allows custom implementations to optionally disable codegen using supportCodegen predicate (that defaults to true).

It assumes that custom implementations define:

  • doProduce(ctx: CodegenContext): String

Codegen Operators

SparkPlan plans that support codegen extend CodegenSupport.

Caution
FIXME Where is RowDataSourceScanExec used?
  • BatchedDataSourceScanExec

  • ExpandExec

  • BaseLimitExec

  • SortExec

  • WholeStageCodegenExec and InputAdapter

  • TungstenAggregate

  • BroadcastHashJoinExec

  • SortMergeJoinExec

BroadcastHashJoinExec

BroadcastHashJoinExec variables are prefixed with bhj (see CodegenSupport.variablePrefix).

val ds = Seq((0,"playing"), (1, "with"), (2, "broadcast")).toDS

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res18: String = 10485760

scala> ds.join(ds).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#21, _2#22]

== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]

== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [_1#32, _2#33]

// Use broadcast function to mark the right-side Dataset
// eligible for broadcasting explicitly

scala> ds.join(broadcast(ds)).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
   +- LocalRelation [_1#21, _2#22]

== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
   +- LocalRelation [_1#43, _2#44]

== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
   +- LocalRelation [_1#43, _2#44]

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [_1#43, _2#44]

SampleExec

scala> spark.range(10).sample(false, 0.4).explain
== Physical Plan ==
WholeStageCodegen
:  +- Sample 0.0, 0.4, false, -7634498724724501829
:     +- Range 0, 1, 8, 10, [id#15L]

RangeExec

scala> spark.range(10).explain
== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 10, [id#20L]

CollapseCodegenStages

CollapseCodegenStages is a Rule[SparkPlan], i.e. a transformation of SparkPlan into another SparkPlan.

It searches for sub-plans (aka stages) that support codegen and collapse them together as a WholeStageCodegen.

Note
Only CodegenSupport SparkPlans support codegen for which supportCodegen is enabled (true).

It is assumed that all Expression instances except CodegenFallback support codegen.

CollapseCodegenStages uses the internal setting spark.sql.codegen.maxFields (default: 200) to control the number of fields in input and output schemas before deactivating whole-stage codegen. It counts the fields included in complex types, i.e. StructType, MapType, ArrayType, UserDefinedType, and their combinations, recursively. See SPARK-14554.

It inserts InputAdapter leaf nodes in a SparkPlan recursively that is then used to generate code that consumes an RDD iterator of InternalRow.

BenchmarkWholeStageCodegen - Performance Benchmark

BenchmarkWholeStageCodegen class provides a benchmark to measure whole stage codegen performance.

You can execute it using the command:

build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
Note
You need to un-ignore tests in BenchmarkWholeStageCodegen by replacing ignore with test.
$ build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
...
Running benchmark: range/limit/sum
  Running case: range/limit/sum codegen=false
22:55:23.028 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  Running case: range/limit/sum codegen=true

Java HotSpot(TM) 64-Bit Server VM 1.8.0_77-b03 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz

range/limit/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
range/limit/sum codegen=false             376 /  433       1394.5           0.7       1.0X
range/limit/sum codegen=true              332 /  388       1581.3           0.6       1.1X

[info] - range/limit/sum (10 seconds, 74 milliseconds)

results matching ""

    No results matching ""