log4j.logger.org.apache.spark.scheduler.TaskResultGetter=DEBUG
TaskResultGetter
TaskResultGetter
is a helper class of TaskSchedulerImpl for asynchronous deserialization of task results of tasks that have finished successfully (possibly fetching remote blocks) or the failures for failed tasks.
Caution
|
FIXME Image with the dependencies |
Tip
|
Consult Task States in Tasks to learn about the different task states. |
Note
|
The only instance of TaskResultGetter is created while TaskSchedulerImpl is created.
|
TaskResultGetter
requires a SparkEnv and TaskSchedulerImpl to be created and is stopped when TaskSchedulerImpl
stops.
TaskResultGetter
uses task-result-getter
asynchronous task executor for operation.
Tip
|
Enable Add the following line to Refer to Logging. |
task-result-getter
Asynchronous Task Executor
getTaskResultExecutor: ExecutorService
getTaskResultExecutor
creates a daemon thread pool with spark.resultGetter.threads threads and task-result-getter
prefix.
Tip
|
Read up on java.util.concurrent.ThreadPoolExecutor that getTaskResultExecutor uses under the covers.
|
serializer
Attribute
serializer: ThreadLocal[SerializerInstance]
serializer
is a thread-local SerializerInstance
that TaskResultGetter
uses to deserialize byte buffers (with TaskResult
s or a TaskEndReason
).
When created for a new thread, serializer
is initialized with a new instance of Serializer
(using SparkEnv.closureSerializer).
Note
|
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.
|
taskResultSerializer
Attribute
taskResultSerializer: ThreadLocal[SerializerInstance]
taskResultSerializer
is a thread-local SerializerInstance
that TaskResultGetter
uses to…
When created for a new thread, taskResultSerializer
is initialized with a new instance of Serializer
(using SparkEnv.serializer).
Note
|
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.
|
Deserializing Task Result and Notifying TaskSchedulerImpl — enqueueSuccessfulTask
Method
enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit
enqueueSuccessfulTask
submits an asynchronous task (to task-result-getter
asynchronous task executor) that first deserializes serializedData
to a DirectTaskResult
, then updates the internal accumulator (with the size of the DirectTaskResult
) and ultimately notifies the TaskSchedulerImpl
that the tid
task was completed and the task result was received successfully or not.
Note
|
enqueueSuccessfulTask is just the asynchronous task enqueued for execution by task-result-getter asynchronous task executor at some point in the future.
|
Internally, the enqueued task first deserializes serializedData
to a TaskResult
(using the internal thread-local serializer).
The TaskResult could be a DirectTaskResult or a IndirectTaskResult.
For a DirectTaskResult, the task checks the available memory for the task result and, when the size overflows spark.driver.maxResultSize, it simply returns.
Note
|
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else. That is why the check for quota does abort when there is not enough memory.
|
Otherwise, when there is enough memory to hold the task result, it deserializes the DirectTaskResult
(using the internal thread-local taskResultSerializer).
For a IndirectTaskResult, the task checks the available memory for the task result and, when the size could overflow the maximum result size, it removes the block and simply returns.
Otherwise, when there is enough memory to hold the task result, you should see the following DEBUG message in the logs:
DEBUG Fetching indirect task result for TID [tid]
The task notifies TaskSchedulerImpl
that it is about to fetch a remote block for a task result. It then gets the block from remote block managers (as serialized bytes).
When the block could not be fetched, TaskSchedulerImpl
is informed (with TaskResultLost
task failure reason) and the task simply returns.
Note
|
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else and so the real handling is when TaskSchedulerImpl is informed.
|
The task result (as a serialized byte buffer) is then deserialized to a DirectTaskResult (using the internal thread-local serializer) and deserialized again using the internal thread-local taskResultSerializer (just like for the DirectTaskResult
case). The block is removed from BlockManagerMaster
and simply returns.
Note
|
A IndirectTaskResult is deserialized twice to become the final deserialized task result (using serializer for a DirectTaskResult ). Compare it to a DirectTaskResult task result that is deserialized once only.
|
With no exceptions thrown, enqueueSuccessfulTask
informs the TaskSchedulerImpl
that the tid
task was completed and the task result was received.
A ClassNotFoundException
leads to aborting the TaskSet
(with ClassNotFound with classloader: [loader]
error message) while any non-fatal exception shows the following ERROR message in the logs followed by aborting the TaskSet
.
ERROR Exception while getting task result
Note
|
enqueueSuccessfulTask is called when TaskSchedulerImpl is notified about a task that has finished successfully (i.e. in FINISHED state).
|
Deserializing TaskFailedReason and Notifying TaskSchedulerImpl — enqueueFailedTask
Method
enqueueFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState.TaskState,
serializedData: ByteBuffer): Unit
enqueueFailedTask
submits an asynchronous task (to task-result-getter
asynchronous task executor) that first attempts to deserialize a TaskFailedReason
from serializedData
(using the internal thread-local serializer) and then notifies TaskSchedulerImpl
that the task has failed.
Any ClassNotFoundException
leads to the following ERROR message in the logs (without breaking the flow of enqueueFailedTask
):
ERROR Could not deserialize TaskEndReason: ClassNotFound with classloader [loader]
Note
|
enqueueFailedTask is called when TaskSchedulerImpl is notified about a task that has failed (and is in FAILED , KILLED or LOST state).
|