Dynamic Allocation (of Executors)

Dynamic Allocation (of Executors) (aka Elastic Scaling) is a Spark feature that allows for adding or removing Spark executors dynamically to match the workload.

Unlike in the "traditional" static allocation where a Spark application reserves CPU and memory resources upfront irrespective of how much it really uses at a time, in dynamic allocation you get as much as needed and no more. It allows to scale the number of executors up and down based on workload, i.e. idle executors are removed, and if you need more executors for pending tasks, you simply request them.

Dynamic allocation can be enabled using spark.dynamicAllocation.enabled setting. When enabled, it is assumed that the External Shuffle Service is also used (it is not by default as controlled by spark_shuffle_service_enabled).

ExecutorAllocationManager is the class responsible for dynamic allocation of executors. With dynamic allocation enabled, it is started when the Spark context is initialized.

Dynamic allocation reports the current state using ExecutorAllocationManager metric source.

Dynamic Allocation comes with the policy of scaling executors up and down as follows:

  1. Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and Spark application may need slightly more.

  2. Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.

Dynamic allocation is available for all the currently-supported cluster managers, i.e. Spark Standalone, Hadoop YARN and Apache Mesos.

Tip
Review the excellent slide deck Dynamic Allocation in Spark from Databricks.

Is Dynamic Allocation Enabled? — Utils.isDynamicAllocationEnabled method

isDynamicAllocationEnabled(conf: SparkConf): Boolean

isDynamicAllocationEnabled returns true if all the following conditions hold:

Otherwise, it returns false.

Note
isDynamicAllocationEnabled returns true, i.e. dynamic allocation is enabled, in Spark local (pseudo-cluster) for testing only (with spark.dynamicAllocation.testing enabled).

Internally, isDynamicAllocationEnabled reads spark.executor.instances (assumes 0) and spark.dynamicAllocation.enabled setting (assumes false).

If the value of spark.executor.instances is not 0 and spark.dynamicAllocation.enabled is enabled, isDynamicAllocationEnabled prints the following WARN message to the logs:

WARN Utils: Dynamic Allocation and num executors both set, thus dynamic allocation disabled.
Note
isDynamicAllocationEnabled is used when Spark calculates the initial number of executors for coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos. It is also used for Spark Streaming.
Tip

Enable WARN logging level for org.apache.spark.util.Utils logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.Utils=WARN

Refer to Logging.

Validating Configuration — validateSettings method

validateSettings(): Unit

validateSettings is an internal method to ensure that the settings for dynamic allocation are correct.

It validates the following and throws a SparkException if set incorrectly.

Programmable Dynamic Allocation

Settings

spark.dynamicAllocation.enabled

spark.dynamicAllocation.enabled (default: false) controls whether dynamic allocation is enabled or not. It is assumed that spark.executor.instances is not set or is 0 (which is the default value).

spark.dynamicAllocation.minExecutors

spark.dynamicAllocation.minExecutors (default: 0) sets the minimum number of executors for dynamic allocation.

spark.dynamicAllocation.maxExecutors

spark.dynamicAllocation.maxExecutors (default: Integer.MAX_VALUE) sets the maximum number of executors for dynamic allocation.

spark.dynamicAllocation.initialExecutors

spark.dynamicAllocation.initialExecutors sets the initial number of executors for dynamic allocation.

spark.dynamicAllocation.schedulerBacklogTimeout

spark.dynamicAllocation.schedulerBacklogTimeout (default: 1s) sets…​FIXME

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(default: spark.dynamicAllocation.schedulerBacklogTimeout) sets…​FIXME

spark.dynamicAllocation.executorIdleTimeout

spark.dynamicAllocation.executorIdleTimeout (default: 60s) sets the time for how long an executor can be idle before it gets removed.

spark.dynamicAllocation.cachedExecutorIdleTimeout

spark.dynamicAllocation.cachedExecutorIdleTimeout (default: Integer.MAX_VALUE) sets…​FIXME

spark.dynamicAllocation.testing

spark.dynamicAllocation.testing is…​FIXME

Future

  • SPARK-4922

  • SPARK-4751

  • SPARK-7955

results matching ""

    No results matching ""