Executors

Executors are distributed agents that execute tasks.

They typically run for the entire lifetime of a Spark application and is called static allocation of executors (but you could also opt in for dynamic allocation).

Executors send active task metrics to the driver and inform executor backends about task status updates (including task results).

Note
Executors are managed exclusively by executor backends.

Executors provide in-memory storage for RDDs that are cached in Spark applications (via Block Manager).

When executors are started they register themselves with the driver and communicate directly to execute tasks.

Executor offers are described by executor id and the host on which an executor runs (see Resource Offers in this document).

Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They track running tasks (by their task ids in runningTasks internal registry). Consult Launching Tasks section.

Executors use a thread pool for launching tasks and sending metrics.

It is recommended to have as many executors as data nodes and as many cores as you can get from the cluster.

Executors are described by their id, hostname, environment (as SparkEnv), and classpath (and, less importantly, and more for internal optimization, whether they run in local or cluster mode).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.executor.Executor logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.Executor=INFO

Refer to Logging.

Stopping Executor — stop Method

Caution
FIXME

Creating Executor Instance

Executor requires executorId, executorHostname, a SparkEnv, userClassPath and whether it runs in local or cluster mode (with cluster as the default).

Note
isLocal is enabled exclusively for LocalEndpoint (for Spark in local mode).

When created, you should see the following INFO messages in the logs:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

When in non-local/cluster mode, a BlockManager is initialized.

Note
The BlockManager for an executor is available in SparkEnv passed to the constructor.

A worker requires the additional services (beside the common ones like …​):

ExecutorSource is created (with executorId). And, only for cluster mode, MetricsSystem is requested to register it.

(only for cluster mode) BlockManager is initialized.

Caution
FIXME How many cores are assigned per executor?

Launching Tasks — launchTask Method

launchTask(
  context: ExecutorBackend,
  taskId: Long,
  attemptNumber: Int,
  taskName: String,
  serializedTask: ByteBuffer): Unit

launchTask executes the input serializedTask task concurrently.

Internally, launchTask creates a TaskRunner, registers it in runningTasks internal registry (by taskId), and finally executes it on "Executor task launch worker" thread pool.

executor taskrunner executorbackend.png
Figure 1. Launching tasks on executor using TaskRunners
Note
launchTask is called by CoarseGrainedExecutorBackend (when it handles LaunchTask message), MesosExecutorBackend, and LocalEndpoint.

Sending Heartbeats and Active Tasks Metrics — startDriverHeartbeater Method

Executors keep sending metrics for active tasks to the driver every spark.executor.heartbeatInterval (defaults to 10s with some random initial delay so the heartbeats from different executors do not pile up on the driver).

executor heartbeatReceiver endpoint.png
Figure 2. Executors use HeartbeatReceiver endpoint to report task metrics

An executor sends heartbeats using the internal heartbeater - Heartbeat Sender Thread.

spark HeartbeatReceiver Heartbeat.png
Figure 3. HeartbeatReceiver’s Heartbeat Message Handler

For each task in TaskRunner (in runningTasks internal registry), the task’s metrics are computed (i.e. mergeShuffleReadMetrics and setJvmGCTime) that become part of the heartbeat (with accumulators).

Caution
FIXME How do mergeShuffleReadMetrics and setJvmGCTime influence accumulators?
Note
Executors track the TaskRunner that run tasks. A task might not be assigned to a TaskRunner yet when the executor sends a heartbeat.

A blocking Heartbeat message that holds the executor id, all accumulator updates (per task id), and BlockManagerId is sent to HeartbeatReceiver RPC endpoint (with spark.executor.heartbeatInterval timeout).

Caution
FIXME When is heartbeatReceiverRef created?

If the response requests to reregister BlockManager, you should see the following INFO message in the logs:

INFO Executor: Told to re-register on heartbeat

The internal heartbeatFailures counter is reset (i.e. becomes 0).

If there are any issues with communicating with the driver, you should see the following WARN message in the logs:

WARN Executor: Issue communicating with driver in heartbeater

The internal heartbeatFailures is incremented and checked to be less than the acceptable number of failures. If the number is greater, the following ERROR is printed out to the logs:

ERROR Executor: Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times

The executor exits (using System.exit and exit code 56).

Tip
Read about TaskMetrics in TaskMetrics.

heartbeater - Heartbeat Sender Thread

heartbeater is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is driver-heartbeater.

Coarse-Grained Executors

Coarse-grained executors are executors that use CoarseGrainedExecutorBackend for task scheduling.

FetchFailedException

Caution
FIXME

FetchFailedException exception is thrown when an executor (more specifically TaskRunner) has failed to fetch a shuffle block.

It contains the following:

  • the unique identifier for a BlockManager (as BlockManagerId)

  • shuffleId

  • mapId

  • reduceId

  • message - a short exception message

  • cause - a Throwable object

TaskRunner catches it and informs ExecutorBackend about the case (using statusUpdate with TaskState.FAILED task state).

Caution
FIXME Image with the call to ExecutorBackend.

Resource Offers

Read resourceOffers in TaskSchedulerImpl and resourceOffer in TaskSetManager.

"Executor task launch worker" Thread Pool

Executors use the daemon cached thread pools with the name Executor task launch worker-ID (with ID being the task id) for launching tasks.

Executor Memory — spark.executor.memory or SPARK_EXECUTOR_MEMORY settings

You can control the amount of memory per executor using spark.executor.memory setting. It sets the available memory equally for all executors per application.

Note
The amount of memory per executor is looked up when SparkContext is created.

You can change the assigned memory per executor per node in standalone cluster using SPARK_EXECUTOR_MEMORY environment variable.

You can find the value displayed as Memory per Node in web UI for standalone Master (as depicted in the figure below).

spark standalone webui memory per node.png
Figure 4. Memory per Node in Spark Standalone’s web UI

The above figure shows the result of running Spark shell with the amount of memory per executor defined explicitly (on command line), i.e.

./bin/spark-shell --master spark://localhost:7077 -c spark.executor.memory=2g

Metrics

Every executor registers its own ExecutorSource to report metrics.

Internal Registries and Counters

Table 1. Internal Registries and Counters
Name Description

runningTasks

heartbeatFailures

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.executor.cores

Number of cores for an executor.

spark.executor.extraClassPath

List of URLs representing a user’s CLASSPATH.

Each entry is separated by system-dependent path separator, i.e. : on Unix/MacOS systems and ; on Microsoft Windows.

spark.executor.extraJavaOptions

Extra Java options for executors.

Used to prepare the command to launch CoarseGrainedExecutorBackend in a YARN container.

spark.executor.extraLibraryPath

List of additional library paths separated by system-dependent path separator, i.e. : on Unix/MacOS systems and ; on Microsoft Windows.

Used to prepare the command to launch CoarseGrainedExecutorBackend in a YARN container.

spark.executor.userClassPathFirst

false

Flag to control whether to load classes in user jars before those in Spark jars.

spark.executor.heartbeatInterval

10s

Interval after which an executor reports heartbeat and metrics for active tasks to the driver.

Refer to Sending heartbeats and partial metrics for active tasks in this document.

spark.executor.heartbeat.maxFailures

60

Number of times an executor will try to send heartbeats to the driver before it gives up and exits (with exit code 56).

NOTE: It was introduced in SPARK-13522 Executor should kill itself when it’s unable to heartbeat to the driver more than N times.

spark.executor.id

spark.executor.instances

0

Number of executors to use.

NOTE: When greater than 0, it disables dynamic allocation.

spark.executor.memory

1g

Amount of memory to use per executor process (equivalent to SPARK_EXECUTOR_MEMORY environment variable).

Refer to Executor Memory — spark.executor.memory or SPARK_EXECUTOR_MEMORY settings in this document.

spark.executor.port

spark.executor.logs.rolling.maxSize

spark.executor.logs.rolling.maxRetainedFiles

spark.executor.logs.rolling.strategy

spark.executor.logs.rolling.time.interval

spark.executor.port

spark.executor.uri

Equivalent to SPARK_EXECUTOR_URI

spark.task.maxDirectResultSize

1048576B

results matching ""

    No results matching ""