GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
DAGSchedulerEventProcessLoop — dag-scheduler-event-loop DAGScheduler Event Bus
DAGSchedulerEventProcessLoop
(dag-scheduler-event-loop) is a EventLoop
single "business logic" thread for processing DAGSchedulerEvent
events.
DAGSchedulerEvent | Event Handler | Reason |
---|---|---|
A Spark job was submitted to DAGScheduler using submitJob or |
||
A ShuffleMapStage was submitted using |
||
|
||
|
||
|
||
|
||
TaskSetManager informs |
||
TaskSetManager informs |
||
TaskSetManager informs |
||
|
||
|
||
|
||
|
When created, DAGSchedulerEventProcessLoop
gets the reference to the owning DAGScheduler that it uses to call event handler methods on.
Note
|
DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely, i.e. up to Integer.MAX_VALUE events.
|
AllJobsCancelled
Event and…
Caution
|
FIXME |
GettingResultEvent
Event and handleGetTaskResult
Handler
GettingResultEvent
is a DAGSchedulerEvent
that triggers handleGetTaskResult (on a separate thread).
Note
|
GettingResultEvent is posted to inform DAGScheduler (through taskGettingResult) that a task fetches results.
|
handleGetTaskResult
Handler
handleGetTaskResult(taskInfo: TaskInfo): Unit
handleGetTaskResult
merely posts SparkListenerTaskGettingResult (to LiveListenerBus
Event Bus).
BeginEvent
Event and handleBeginEvent
Handler
BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
BeginEvent
is a DAGSchedulerEvent
that triggers handleBeginEvent (on a separate thread).
Note
|
BeginEvent is posted to inform DAGScheduler (through taskStarted) that a TaskSetManager starts a task.
|
handleBeginEvent
Handler
handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit
handleBeginEvent
looks the stage of task
up in stageIdToStage internal registry to compute the last attempt id (or -1
if not available) and posts SparkListenerTaskStart (to listenerBus event bus).
JobGroupCancelled
Event and handleJobGroupCancelled
Handler
JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
JobGroupCancelled
is a DAGSchedulerEvent
that triggers handleJobGroupCancelled (on a separate thread).
Note
|
JobGroupCancelled is posted when DAGScheduler is informed (through cancelJobGroup) that SparkContext was requested to cancel a job group.
|
handleJobGroupCancelled
Handler
handleJobGroupCancelled(groupId: String): Unit
handleJobGroupCancelled
finds active jobs in a group and cancels them.
Internally, handleJobGroupCancelled
computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id
scheduling property set to groupId
.
handleJobGroupCancelled
then cancels every active job in the group one by one and the cancellation reason: "part of cancelled job group [groupId]".
MapStageSubmitted
Event and handleMapStageSubmitted
Handler
MapStageSubmitted(
jobId: Int,
dependency: ShuffleDependency[_, _, _],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
MapStageSubmitted
is a DAGSchedulerEvent
that triggers handleMapStageSubmitted (on a separate thread).
MapStageSubmitted
Event Handling
Note
|
MapStageSubmitted is posted when DAGScheduler is informed (through submitMapStage) that SparkContext.submitMapStage.
|
handleMapStageSubmitted
Handler
handleMapStageSubmitted(jobId: Int,
dependency: ShuffleDependency[_, _, _],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit
It is called with a job id (for a new job to be created), a ShuffleDependency, and a JobListener.
You should see the following INFOs in the logs:
Got map stage job %s (%s) with %d output partitions
Final stage: [finalStage] ([finalStage.name])
Parents of final stage: [finalStage.parents]
Missing parents: [list of stages]
SparkListenerJobStart event is posted to LiveListenerBus (so other event listeners know about the event - not only DAGScheduler).
The execution procedure of MapStageSubmitted events is then exactly (FIXME ?) as for JobSubmitted.
Tip
|
The difference between
|
TaskSetFailed
Event and handleTaskSetFailed
Handler
TaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable])
extends DAGSchedulerEvent
TaskSetFailed
is a DAGSchedulerEvent
that triggers handleTaskSetFailed method.
Note
|
TaskSetFailed is posted when DAGScheduler is requested to cancel a TaskSet .
|
handleTaskSetFailed
Handler
handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit
handleTaskSetFailed
looks the stage (of the input taskSet
) up in the internal stageIdToStage registry and aborts it.
ResubmitFailedStages
Event and resubmitFailedStages
Handler
ResubmitFailedStages extends DAGSchedulerEvent
ResubmitFailedStages
is a DAGSchedulerEvent
that triggers resubmitFailedStages method.
Note
|
ResubmitFailedStages is posted for FetchFailed case in handleTaskCompletion .
|
resubmitFailedStages
Handler
resubmitFailedStages(): Unit
resubmitFailedStages
iterates over the internal collection of failed stages and submits them.
Note
|
resubmitFailedStages does nothing when there are no failed stages reported.
|
You should see the following INFO message in the logs:
INFO Resubmitting failed stages
resubmitFailedStages
clears the internal cache of RDD partition locations first. It then makes a copy of the collection of failed stages so DAGScheduler
can track failed stages afresh.
Note
|
At this point DAGScheduler has no failed stages reported.
|
The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.
ExecutorLost
Event and handleExecutorLost
Handler — fetchFailed
Disabled Case
ExecutorLost(
execId: String,
reason: ExecutorLossReason)
extends DAGSchedulerEvent
ExecutorLost
is a DAGSchedulerEvent
that triggers handleExecutorLost method with fetchFailed
disabled, i.e. false
.
Note
|
|
handleExecutorLost
Handler
handleExecutorLost(
execId: String,
filesLost: Boolean,
maybeEpoch: Option[Long] = None): Unit
The current epoch could be provided (as the input maybeEpoch
) or is requested from MapOutputTrackerMaster.
Caution
|
FIXME When is maybeEpoch passed in?
|
Recurring ExecutorLost
events lead to the following repeating DEBUG message in the logs:
DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
Note
|
handleExecutorLost handler uses DAGScheduler 's failedEpoch and FIXME internal registries.
|
Otherwise, when the executor execId
is not in the list of executor lost or the executor failure’s epoch is smaller than the input maybeEpoch
, the executor’s lost event is recorded in failedEpoch
internal registry.
Caution
|
FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too. |
You should see the following INFO message in the logs:
INFO Executor lost: [execId] (epoch [epoch])
Caution
|
FIXME Review what’s filesLost .
|
handleExecutorLost
exits unless the ExecutorLost
event was for a map output fetch operation (and the input filesLost
is true
) or external shuffle service is not used.
In such a case, you should see the following INFO message in the logs:
INFO Shuffle files lost for executor: [execId] (epoch [epoch])
handleExecutorLost
walks over all ShuffleMapStages in DAGScheduler’s shuffleToMapStage
internal registry and do the following (in order):
-
ShuffleMapStage.removeOutputsOnExecutor(execId)
is called
In case DAGScheduler’s shuffleToMapStage
internal registry has no shuffles registered, MapOutputTrackerMaster
is requested to increment epoch.
Ultimatelly, DAGScheduler
clears the internal cache of RDD partition locations.
JobCancelled
Event and handleJobCancellation
Handler
JobCancelled(jobId: Int) extends DAGSchedulerEvent
JobCancelled
is a DAGSchedulerEvent
that triggers handleJobCancellation method (on a separate thread).
Note
|
JobCancelled is posted when DAGScheduler is requested to cancel a job.
|
handleJobCancellation
Handler
handleJobCancellation(jobId: Int, reason: String = "")
handleJobCancellation
first makes sure that the input jobId
has been registered earlier (using jobIdToStageIds internal registry).
If the input jobId
is not known to DAGScheduler
, you should see the following DEBUG message in the logs:
DEBUG DAGScheduler: Trying to cancel unregistered job [jobId]
Otherwise, handleJobCancellation
fails the active job and all independent stages (by looking up the active job using jobIdToActiveJob) with failure reason:
Job [jobId] cancelled [reason]
CompletionEvent
Event and handleTaskCompletion
Handler
CompletionEvent
event informs DAGScheduler
about task completions. It is handled by handleTaskCompletion
method.
handleTaskCompletion(event: CompletionEvent): Unit
Note
|
CompletionEvent holds contextual information about the completed task.
|
The task knows about the stage it belongs to (using Task.stageId
), the partition it works on (using Task.partitionId
), and the stage attempt (using Task.stageAttemptId
).
OutputCommitCoordinator.taskCompleted
is called.
If the reason for task completion is not Success
, SparkListenerTaskEnd is posted to LiveListenerBus. The only difference with TaskEndReason: Success is how the stage attempt id is calculated. Here, it is Task.stageAttemptId
(not Stage.latestInfo.attemptId
).
Caution
|
FIXME What is the difference between stage attempt ids? |
If the stage the task belongs to has been cancelled, stageIdToStage
should not contain it, and the method quits.
The main processing depends on the TaskEndReason
- the reason for task completion (using event.reason
). The method skips processing TaskEndReasons
: TaskCommitDenied
, ExceptionFailure
, TaskResultLost
, ExecutorLostFailure
, TaskKilled
, and UnknownReason
, i.e. it does nothing.
TaskEndReason: Success
SparkListenerTaskEnd is posted to LiveListenerBus.
The partition the task worked on is removed from pendingPartitions
of the stage.
The processing splits per task type - ResultTask or ShuffleMapTask - and DAGScheduler.submitWaitingStages is called.
ResultTask
For ResultTask
, the stage is ResultStage. If there is no job active for the stage (using resultStage.activeJob
), the following INFO message appears in the logs:
INFO Ignoring result from [task] because its job has finished
Otherwise, check whether the task is marked as running for the job (using job.finished
) and proceed. The method skips execution when the task has already been marked as completed in the job.
Caution
|
FIXME When could a task that has just finished be ignored, i.e. the job has already marked finished ? Could it be for stragglers?
|
DAGScheduler.updateAccumulators(event) is called.
The partition is marked as finished
(using job.finished
) and the number of partitions calculated increased (using job.numFinished
).
If the whole job has finished (when job.numFinished == job.numPartitions
), then:
-
markStageAsFinished
is called -
cleanupStateForJobAndIndependentStages(job)
-
SparkListenerJobEnd is posted to LiveListenerBus with
JobSucceeded
The JobListener
of the job is notified about the task’s successful completion. In case the step fails, i.e. throws an exception, the JobListener
is notified about the failure.
Caution
|
FIXME When would job.listener.taskSucceeded throw an exception? How?
|
ShuffleMapTask
For ShuffleMapTask, the stage is ShuffleMapStage.
DAGScheduler.updateAccumulators(event) is called.
event.result
is MapStatus
that knows the executor id where the task has finished (using status.location.executorId
).
You should see the following DEBUG message in the logs:
DEBUG ShuffleMapTask finished on [execId]
If failedEpoch contains the executor and the epoch of the ShuffleMapTask is not greater than that in failedEpoch
, you should see the following INFO message in the logs:
INFO Ignoring possibly bogus [task] completion from executor [executorId]
Otherwise, shuffleStage.addOutputLoc(smt.partitionId, status)
is called.
The method does more processing only if the internal runningStages
contains the ShuffleMapStage with no more pending partitions to compute (using shuffleStage.pendingPartitions
).
markStageAsFinished(shuffleStage)
is called.
The following INFO logs appear in the logs:
INFO looking for newly runnable stages
INFO running: [runningStages]
INFO waiting: [waitingStages]
INFO failed: [failedStages]
mapOutputTracker.registerMapOutputs with changeEpoch
is called.
The internal cache of RDD partition locations is cleared.
If the map stage is ready, i.e. all partitions have shuffle outputs, map-stage jobs waiting on this stage (using shuffleStage.mapStageJobs
) are marked as finished. MapOutputTrackerMaster
is requested for statistics (for shuffleStage.shuffleDep
) and every map-stage job is markMapStageJobAsFinished(job, stats)
.
Otherwise, if the map stage is not ready, the following INFO message appears in the logs:
INFO Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]
shuffleStage
is submitted to DAGScheduler
for execution.
TaskEndReason: Resubmitted
For Resubmitted
case, you should see the following INFO message in the logs:
INFO Resubmitted [task], so marking it as still running
The task (by task.partitionId
) is added to the collection of pending partitions of the stage (using stage.pendingPartitions
).
Tip
|
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched. |
TaskEndReason: FetchFailed
FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage)
comes with BlockManagerId
(as bmAddress
) and the other self-explanatory values.
Note
|
A task knows about the id of the stage it belongs to. |
When FetchFailed
happens, stageIdToStage
is used to access the failed stage (using task.stageId
and the task
is available in event
in handleTaskCompletion(event: CompletionEvent)
). shuffleToMapStage
is used to access the map stage (using shuffleId
).
If failedStage.latestInfo.attemptId != task.stageAttemptId
, you should see the following INFO in the logs:
INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
Caution
|
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?
|
And the case finishes. Otherwise, the case continues.
If the failed stage is in runningStages
, the following INFO message shows in the logs:
INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])
markStageAsFinished(failedStage, Some(failureMessage))
is called.
Caution
|
FIXME What does markStageAsFinished do?
|
If the failed stage is not in runningStages
, the following DEBUG message shows in the logs:
DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running
When disallowStageRetryForTest
is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None)
is called.
Caution
|
FIXME Describe disallowStageRetryForTest and abortStage .
|
If the number of fetch failed attempts for the stage exceeds the allowed number, the failed stage is aborted with the reason:
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]
If there are no failed stages reported (DAGScheduler.failedStages is empty), the following INFO shows in the logs:
INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure
And the following code is executed:
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
Caution
|
FIXME What does the above code do? |
For all the cases, the failed stage and map stages are both added to the internal collection of failed stages.
If mapId
(in the FetchFailed
object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress)
and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.
Caution
|
FIXME What does mapStage.removeOutputLoc do?
|
If bmAddress
(in the FetchFailed
object for the case) is provided, handleExecutorLost (with fetchFailed
enabled) is called.
StageCancelled
Event and handleStageCancellation
Handler
StageCancelled(stageId: Int) extends DAGSchedulerEvent
StageCancelled
is a DAGSchedulerEvent
that triggers handleStageCancellation (on a separate thread).
handleStageCancellation
Handler
handleStageCancellation(stageId: Int): Unit
handleStageCancellation
checks if the input stageId
was registered earlier (in the internal stageIdToStage registry) and if it was attempts to cancel the associated jobs (with "because Stage [stageId] was cancelled" cancellation reason).
Note
|
A stage tracks the jobs it belongs to using jobIds property.
|
If the stage stageId
was not registered earlier, you should see the following INFO message in the logs:
INFO No active jobs to kill for Stage [stageId]
Note
|
handleStageCancellation is the result of executing SparkContext.cancelStage(stageId: Int) that is called from the web UI (controlled by spark.ui.killEnabled).
|
JobSubmitted
Event and handleJobSubmitted
Handler
JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
JobSubmitted
is a DAGSchedulerEvent
that triggers handleJobSubmitted method (on a separate thread).
handleJobSubmitted
Handler
handleJobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties)
handleJobSubmitted
creates a new ResultStage
(as finalStage
in the picture above) and a ActiveJob
.
You should see the following INFO messages in the logs:
INFO DAGScheduler: Got job [jobId] ([callSite.shortForm]) with [partitions.length] output partitions
INFO DAGScheduler: Final stage: [finalStage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [getMissingParentStages(finalStage)]
handleJobSubmitted
then registers the job in the internal registries, i.e. jobIdToActiveJob and activeJobs, and sets the job for the stage (using setActiveJob
).
Ultimately, handleJobSubmitted
posts SparkListenerJobStart message to LiveListenerBus and submits the stage.
ExecutorAdded
Event and handleExecutorAdded
Handler
ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
ExecutorAdded
is a DAGSchedulerEvent
that triggers handleExecutorAdded method (on a separate thread).
Removing Executor From failedEpoch
Registry — handleExecutorAdded
Handler
handleExecutorAdded(execId: String, host: String)
handleExecutorAdded
checks if the input execId
executor was registered in failedEpoch and, if it was, removes it from the failedEpoch
registry.
You should see the following INFO message in the logs:
INFO Host added was in lost list earlier: [host]