
TaskRunner is a thread of execution that manages a single individual task. It can be run or killed that boils down to running or killing the task the TaskRunner object manages.


A TaskRunner object is created when an executor is requested to launch a task.

It is created with an ExecutorBackend (to send the task’s status updates to), task and attempt ids, task name, and serialized version of the task (as ByteBuffer).

Running Task — run Method

run is part of java.lang.Runnable contract that TaskRunner follows.

When run is executed, it creates a TaskMemoryManager object (using the global MemoryManager and the constructor’s taskId) to manage the memory allocated for the task’s execution.

It starts measuring the time to deserialize a task.

It sets the current context classloader.

It creates a new instance of the global closure Serializer.

You should see the following INFO message in the logs:

INFO Executor: Running [taskName] (TID [taskId])

At this point, the task is considered running and the ExecutorBackend.statusUpdate is executed (with taskId and TaskState.RUNNING state).

run deserializes the task’s environment (from serializedTask bytes using Task.deserializeWithDependencies) to have the task’s files, jars and properties, and the bytes (i.e. the real task’s body).

The target task to run is not deserialized yet, but only its environment - the files, jars, and properties.
updateDependencies(taskFiles, taskJars) is called.

This is the moment when the proper Task object is deserialized (from taskBytes) using the earlier-created closure Serializer object. The local properties (as localProperties) are initialized to be the task’s properties (from the earlier call to Task.deserializeWithDependencies) and the TaskMemoryManager (created earlier in the method) is set to the task.

The task’s properties were part of the serialized object passed on to the current TaskRunner object.
Until run deserializes the task object, it is only available as the serializedTask byte buffer.

If kill method has been called in the meantime, the execution stops by throwing a TaskKilledException. Otherwise, TaskRunner continues executing the task.

You should see the following DEBUG message in the logs:

DEBUG Executor: Task [taskId]'s epoch is [task.epoch]

TaskRunner sends update of the epoch of the task to MapOutputTracker.

The taskStart time which corresponds to the current time is recorded.

The task runs (with taskId, attemptNumber, and the globally-configured MetricsSystem). It runs inside a "monitored" block (i.e. try-finally block) to clean up after the task’s run finishes regardless of the final outcome - the task’s value or an exception thrown.

After the task’s run finishes (and regardless of an exception thrown or not), run always calls BlockManager.releaseAllLocksForTask (with the current task’s taskId).

run then always queries TaskMemoryManager for memory leaks. If there is any (i.e. the memory freed after the call is greater than 0) and spark.unsafe.exceptionOnMemoryLeak is enabled (it is not by default) with no exception having been thrown while the task was running, a SparkException is thrown:

Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]

Otherwise, if spark.unsafe.exceptionOnMemoryLeak is disabled or an exception was thrown by the task, the following ERROR message is displayed in the logs instead:

ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
If there is a memory leak detected, it leads to a SparkException or ERROR message in the logs.

If there are any releasedLocks (after calling BlockManager.releaseAllLocksForTask earlier) and spark.storage.exceptionOnPinLeak is enabled (it is not by default) with no exception having been thrown while the task was running, a SparkException is thrown:

[releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]

Otherwise, if spark.storage.exceptionOnPinLeak is disabled or an exception was thrown by the task, the following WARN message is displayed in the logs instead:

WARN Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
If there are any releaseLocks, they lead to a SparkException or WARN message in the logs.

The taskFinish time which corresponds to the current time is recorded.

If the task was killed a TaskKilledException is thrown (and the TaskRunner exits).

When a task finishes successfully, it returns a value. The value is serialized (using a new instance of Serializer from SparkEnv, i.e. serializer).

There are two Serializer objects in SparkEnv.

The time to serialize the task’s value is tracked (using beforeSerialization and afterSerialization).

The task’s metrics are set, i.e. executorDeserializeTime, executorRunTime, jvmGCTime, and resultSerializationTime.

A DirectTaskResult object with the serialized result and the latest values of accumulators is created (as directResult). The object is then serialized (using the global closure Serializer).

The limit of the buffer for the serialized DirectTaskResult object is calculated (as resultSize).

The serializedResult is calculated (that soon will be sent to ExecutorBackend). It depends on the size of resultSize.

If maxResultSize is set and the size of the serialized DirectTaskResult exceeds it, the following WARN message is displayed in the logs:

WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
Read about spark.driver.maxResultSize.
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m

scala> sc.version
res0: String = 2.0.0-SNAPSHOT

scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m

scala> sc.range(0, 1024 * 1024 + 10, 1).collect
WARN Executor: Finished task 4.0 in stage 0.0 (TID 4). Result is larger than maxResultSize (1031.4 KB > 1024.0 KB), dropping it.
ERROR TaskSetManager: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)

The final serializedResult becomes a serialized IndirectTaskResult with a TaskResultBlockId for the task’s taskId and resultSize.

Otherwise, when maxResultSize is not positive or resultSize is smaller than maxResultSize, but greater than maxDirectResultSize, a TaskResultBlockId object for the task’s taskId is created (as blockId) and serializedDirectResult is stored as a blockId block to BlockManager with MEMORY_AND_DISK_SER storage level.

The following INFO message is printed out to the logs:

INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)

The final serializedResult becomes a serialized IndirectTaskResult with a TaskResultBlockId for the task’s taskId and resultSize.

The difference between the two cases is that the result is dropped or sent via BlockManager.

When the two cases above do not hold, the following INFO message is printed out to the logs:

INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver

The final serializedResult becomes the serializedDirectResult (that is the serialized DirectTaskResult).

The final serializedResult is either a IndirectTaskResult (with or without BlockManager used) or a DirectTaskResult.

The serializedResult serialized result for the task is sent to the driver using ExecutorBackend as TaskState.FINISHED.

When the TaskRunner finishes, taskId is removed from the internal runningTasks map of the owning Executor (that ultimately cleans up any references to the TaskRunner).

TaskRunner is Java’s Runnable and the contract requires that once a TaskRunner has completed execution it may not be restarted.

Killing Task — kill Method

kill(interruptThread: Boolean): Unit

kill marks the current instance of TaskRunner as killed and passes the call to kill a task on to the task itself (if available).

When executed, you should see the following INFO message in the logs:

INFO TaskRunner: Executor is trying to kill [taskName] (TID [taskId])

Internally, kill enables the internal flag killed and executes its Task.kill method if a task is available.

The internal flag killed is checked in run to stop executing the task. Calling Task.kill method allows for task interruptions later on.


