log4j.logger.org.apache.spark.executor.Executor=DEBUG
TaskRunner
TaskRunner
is a thread of execution that manages a single individual task. It can be run or killed that boils down to running or killing the task the TaskRunner
object manages.
Tip
|
Enable Add the following line to Refer to Logging. |
Lifecycle
Caution
|
FIXME Image with state changes |
A TaskRunner
object is created when an executor is requested to launch a task.
It is created with an ExecutorBackend (to send the task’s status updates to), task and attempt ids, task name, and serialized version of the task (as ByteBuffer
).
Running Task — run
Method
Note
|
run is part of java.lang.Runnable contract that TaskRunner follows.
|
When run
is executed, it creates a TaskMemoryManager object (using the global MemoryManager and the constructor’s taskId
) to manage the memory allocated for the task’s execution.
It starts measuring the time to deserialize a task.
It sets the current context classloader.
Caution
|
FIXME What is part of the classloader? |
It creates a new instance of the global closure Serializer.
You should see the following INFO message in the logs:
INFO Executor: Running [taskName] (TID [taskId])
At this point, the task is considered running and the ExecutorBackend.statusUpdate is executed (with taskId
and TaskState.RUNNING
state).
run
deserializes the task’s environment (from serializedTask
bytes using Task.deserializeWithDependencies
) to have the task’s files, jars and properties, and the bytes (i.e. the real task’s body).
Note
|
The target task to run is not deserialized yet, but only its environment - the files, jars, and properties. |
Caution
|
FIXME Describe Task.deserializeWithDependencies .
|
updateDependencies(taskFiles, taskJars)
is called.
Caution
|
FIXME What does updateDependencies do?
|
This is the moment when the proper Task object is deserialized (from taskBytes
) using the earlier-created closure Serializer object. The local properties (as localProperties
) are initialized to be the task’s properties (from the earlier call to Task.deserializeWithDependencies
) and the TaskMemoryManager
(created earlier in the method) is set to the task.
Note
|
The task’s properties were part of the serialized object passed on to the current TaskRunner object.
|
Note
|
Until run deserializes the task object, it is only available as the serializedTask byte buffer.
|
If kill method has been called in the meantime, the execution stops by throwing a TaskKilledException
. Otherwise, TaskRunner
continues executing the task.
You should see the following DEBUG message in the logs:
DEBUG Executor: Task [taskId]'s epoch is [task.epoch]
TaskRunner sends update of the epoch of the task to MapOutputTracker.
Caution
|
FIXME Why is MapOutputTracker.updateEpoch needed?
|
The taskStart
time which corresponds to the current time is recorded.
The task runs (with taskId
, attemptNumber
, and the globally-configured MetricsSystem
). It runs inside a "monitored" block (i.e. try-finally
block) to clean up after the task’s run finishes regardless of the final outcome - the task’s value or an exception thrown.
After the task’s run finishes (and regardless of an exception thrown or not), run
always calls BlockManager.releaseAllLocksForTask
(with the current task’s taskId
).
run
then always queries TaskMemoryManager for memory leaks. If there is any (i.e. the memory freed after the call is greater than 0) and spark.unsafe.exceptionOnMemoryLeak is enabled (it is not by default) with no exception having been thrown while the task was running, a SparkException
is thrown:
Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Otherwise, if spark.unsafe.exceptionOnMemoryLeak is disabled or an exception was thrown by the task, the following ERROR message is displayed in the logs instead:
ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Note
|
If there is a memory leak detected, it leads to a SparkException or ERROR message in the logs.
|
If there are any releasedLocks
(after calling BlockManager.releaseAllLocksForTask
earlier) and spark.storage.exceptionOnPinLeak is enabled (it is not by default) with no exception having been thrown while the task was running, a SparkException
is thrown:
[releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Otherwise, if spark.storage.exceptionOnPinLeak is disabled or an exception was thrown by the task, the following WARN message is displayed in the logs instead:
WARN Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Note
|
If there are any releaseLocks , they lead to a SparkException or WARN message in the logs.
|
The taskFinish
time which corresponds to the current time is recorded.
If the task was killed a TaskKilledException
is thrown (and the TaskRunner
exits).
Caution
|
FIXME Finish me! |
When a task finishes successfully, it returns a value. The value is serialized (using a new instance of Serializer
from SparkEnv, i.e. serializer
).
Note
|
There are two Serializer objects in SparkEnv.
|
The time to serialize the task’s value is tracked (using beforeSerialization
and afterSerialization
).
The task’s metrics are set, i.e. executorDeserializeTime
, executorRunTime
, jvmGCTime
, and resultSerializationTime
.
Caution
|
FIXME Describe the metrics in more details. And include a figure to show the metric points. |
run
collects the latest values of accumulators (as accumUpdates
).
A DirectTaskResult object with the serialized result and the latest values of accumulators is created (as directResult
). The object is then serialized (using the global closure Serializer).
The limit of the buffer for the serialized DirectTaskResult object is calculated (as resultSize
).
The serializedResult
is calculated (that soon will be sent to ExecutorBackend). It depends on the size of resultSize
.
If maxResultSize
is set and the size of the serialized DirectTaskResult exceeds it, the following WARN message is displayed in the logs:
WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
Tip
|
Read about spark.driver.maxResultSize. |
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m
scala> sc.version
res0: String = 2.0.0-SNAPSHOT
scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m
scala> sc.range(0, 1024 * 1024 + 10, 1).collect
WARN Executor: Finished task 4.0 in stage 0.0 (TID 4). Result is larger than maxResultSize (1031.4 KB > 1024.0 KB), dropping it.
...
ERROR TaskSetManager: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)
...
The final serializedResult
becomes a serialized IndirectTaskResult with a TaskResultBlockId
for the task’s taskId
and resultSize
.
Otherwise, when maxResultSize
is not positive or resultSize
is smaller than maxResultSize
, but greater than maxDirectResultSize
, a TaskResultBlockId
object for the task’s taskId
is created (as blockId
) and serializedDirectResult
is stored as a blockId
block to BlockManager with MEMORY_AND_DISK_SER
storage level.
Caution
|
FIXME Describe maxDirectResultSize .
|
The following INFO message is printed out to the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)
The final serializedResult
becomes a serialized IndirectTaskResult with a TaskResultBlockId
for the task’s taskId
and resultSize
.
Note
|
The difference between the two cases is that the result is dropped or sent via BlockManager. |
When the two cases above do not hold, the following INFO message is printed out to the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver
The final serializedResult
becomes the serializedDirectResult
(that is the serialized DirectTaskResult).
Note
|
The final serializedResult is either a IndirectTaskResult (with or without BlockManager used) or a DirectTaskResult.
|
The serializedResult
serialized result for the task is sent to the driver using ExecutorBackend as TaskState.FINISHED
.
Caution
|
FIXME Complete catch block.
|
When the TaskRunner
finishes, taskId
is removed from the internal runningTasks
map of the owning Executor
(that ultimately cleans up any references to the TaskRunner
).
Note
|
TaskRunner is Java’s Runnable and the contract requires that once a TaskRunner has completed execution it may not be restarted.
|
Killing Task — kill
Method
kill(interruptThread: Boolean): Unit
kill
marks the current instance of TaskRunner
as killed and passes the call to kill a task on to the task itself (if available).
When executed, you should see the following INFO message in the logs:
INFO TaskRunner: Executor is trying to kill [taskName] (TID [taskId])
Internally, kill
enables the internal flag killed
and executes its Task.kill method if a task is available.