GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
DAGSchedulerEventProcessLoop — dag-scheduler-event-loop DAGScheduler Event Bus
DAGSchedulerEventProcessLoop (dag-scheduler-event-loop) is a EventLoop single "business logic" thread for processing DAGSchedulerEvent events.
| DAGSchedulerEvent | Event Handler | Reason |
|---|---|---|
A Spark job was submitted to DAGScheduler using submitJob or |
||
A ShuffleMapStage was submitted using |
||
|
||
|
||
|
||
|
||
TaskSetManager informs |
||
TaskSetManager informs |
||
TaskSetManager informs |
||
|
||
|
||
|
||
|
When created, DAGSchedulerEventProcessLoop gets the reference to the owning DAGScheduler that it uses to call event handler methods on.
|
Note
|
DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely, i.e. up to Integer.MAX_VALUE events.
|
AllJobsCancelled Event and…
|
Caution
|
FIXME |
GettingResultEvent Event and handleGetTaskResult Handler
GettingResultEvent is a DAGSchedulerEvent that triggers handleGetTaskResult (on a separate thread).
|
Note
|
GettingResultEvent is posted to inform DAGScheduler (through taskGettingResult) that a task fetches results.
|
handleGetTaskResult Handler
handleGetTaskResult(taskInfo: TaskInfo): Unit
handleGetTaskResult merely posts SparkListenerTaskGettingResult (to LiveListenerBus Event Bus).
BeginEvent Event and handleBeginEvent Handler
BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
BeginEvent is a DAGSchedulerEvent that triggers handleBeginEvent (on a separate thread).
|
Note
|
BeginEvent is posted to inform DAGScheduler (through taskStarted) that a TaskSetManager starts a task.
|
handleBeginEvent Handler
handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit
handleBeginEvent looks the stage of task up in stageIdToStage internal registry to compute the last attempt id (or -1 if not available) and posts SparkListenerTaskStart (to listenerBus event bus).
JobGroupCancelled Event and handleJobGroupCancelled Handler
JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
JobGroupCancelled is a DAGSchedulerEvent that triggers handleJobGroupCancelled (on a separate thread).
|
Note
|
JobGroupCancelled is posted when DAGScheduler is informed (through cancelJobGroup) that SparkContext was requested to cancel a job group.
|
handleJobGroupCancelled Handler
handleJobGroupCancelled(groupId: String): Unit
handleJobGroupCancelled finds active jobs in a group and cancels them.
Internally, handleJobGroupCancelled computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id scheduling property set to groupId.
handleJobGroupCancelled then cancels every active job in the group one by one and the cancellation reason: "part of cancelled job group [groupId]".
MapStageSubmitted Event and handleMapStageSubmitted Handler
MapStageSubmitted(
jobId: Int,
dependency: ShuffleDependency[_, _, _],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
MapStageSubmitted is a DAGSchedulerEvent that triggers handleMapStageSubmitted (on a separate thread).
MapStageSubmitted Event Handling|
Note
|
MapStageSubmitted is posted when DAGScheduler is informed (through submitMapStage) that SparkContext.submitMapStage.
|
handleMapStageSubmitted Handler
handleMapStageSubmitted(jobId: Int,
dependency: ShuffleDependency[_, _, _],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit
It is called with a job id (for a new job to be created), a ShuffleDependency, and a JobListener.
You should see the following INFOs in the logs:
Got map stage job %s (%s) with %d output partitions
Final stage: [finalStage] ([finalStage.name])
Parents of final stage: [finalStage.parents]
Missing parents: [list of stages]
SparkListenerJobStart event is posted to LiveListenerBus (so other event listeners know about the event - not only DAGScheduler).
The execution procedure of MapStageSubmitted events is then exactly (FIXME ?) as for JobSubmitted.
|
Tip
|
The difference between
|
TaskSetFailed Event and handleTaskSetFailed Handler
TaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable])
extends DAGSchedulerEvent
TaskSetFailed is a DAGSchedulerEvent that triggers handleTaskSetFailed method.
|
Note
|
TaskSetFailed is posted when DAGScheduler is requested to cancel a TaskSet.
|
handleTaskSetFailed Handler
handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit
handleTaskSetFailed looks the stage (of the input taskSet) up in the internal stageIdToStage registry and aborts it.
ResubmitFailedStages Event and resubmitFailedStages Handler
ResubmitFailedStages extends DAGSchedulerEvent
ResubmitFailedStages is a DAGSchedulerEvent that triggers resubmitFailedStages method.
|
Note
|
ResubmitFailedStages is posted for FetchFailed case in handleTaskCompletion.
|
resubmitFailedStages Handler
resubmitFailedStages(): Unit
resubmitFailedStages iterates over the internal collection of failed stages and submits them.
|
Note
|
resubmitFailedStages does nothing when there are no failed stages reported.
|
You should see the following INFO message in the logs:
INFO Resubmitting failed stages
resubmitFailedStages clears the internal cache of RDD partition locations first. It then makes a copy of the collection of failed stages so DAGScheduler can track failed stages afresh.
|
Note
|
At this point DAGScheduler has no failed stages reported.
|
The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.
ExecutorLost Event and handleExecutorLost Handler — fetchFailed Disabled Case
ExecutorLost(
execId: String,
reason: ExecutorLossReason)
extends DAGSchedulerEvent
ExecutorLost is a DAGSchedulerEvent that triggers handleExecutorLost method with fetchFailed disabled, i.e. false.
|
Note
|
|
handleExecutorLost Handler
handleExecutorLost(
execId: String,
filesLost: Boolean,
maybeEpoch: Option[Long] = None): Unit
The current epoch could be provided (as the input maybeEpoch) or is requested from MapOutputTrackerMaster.
|
Caution
|
FIXME When is maybeEpoch passed in?
|
Recurring ExecutorLost events lead to the following repeating DEBUG message in the logs:
DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
|
Note
|
handleExecutorLost handler uses DAGScheduler's failedEpoch and FIXME internal registries.
|
Otherwise, when the executor execId is not in the list of executor lost or the executor failure’s epoch is smaller than the input maybeEpoch, the executor’s lost event is recorded in failedEpoch internal registry.
|
Caution
|
FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too. |
You should see the following INFO message in the logs:
INFO Executor lost: [execId] (epoch [epoch])
|
Caution
|
FIXME Review what’s filesLost.
|
handleExecutorLost exits unless the ExecutorLost event was for a map output fetch operation (and the input filesLost is true) or external shuffle service is not used.
In such a case, you should see the following INFO message in the logs:
INFO Shuffle files lost for executor: [execId] (epoch [epoch])
handleExecutorLost walks over all ShuffleMapStages in DAGScheduler’s shuffleToMapStage internal registry and do the following (in order):
-
ShuffleMapStage.removeOutputsOnExecutor(execId)is called
In case DAGScheduler’s shuffleToMapStage internal registry has no shuffles registered, MapOutputTrackerMaster is requested to increment epoch.
Ultimatelly, DAGScheduler clears the internal cache of RDD partition locations.
JobCancelled Event and handleJobCancellation Handler
JobCancelled(jobId: Int) extends DAGSchedulerEvent
JobCancelled is a DAGSchedulerEvent that triggers handleJobCancellation method (on a separate thread).
|
Note
|
JobCancelled is posted when DAGScheduler is requested to cancel a job.
|
handleJobCancellation Handler
handleJobCancellation(jobId: Int, reason: String = "")
handleJobCancellation first makes sure that the input jobId has been registered earlier (using jobIdToStageIds internal registry).
If the input jobId is not known to DAGScheduler, you should see the following DEBUG message in the logs:
DEBUG DAGScheduler: Trying to cancel unregistered job [jobId]
Otherwise, handleJobCancellation fails the active job and all independent stages (by looking up the active job using jobIdToActiveJob) with failure reason:
Job [jobId] cancelled [reason]
CompletionEvent Event and handleTaskCompletion Handler
CompletionEvent event informs DAGScheduler about task completions. It is handled by handleTaskCompletion method.
handleTaskCompletion(event: CompletionEvent): Unit
|
Note
|
CompletionEvent holds contextual information about the completed task.
|
The task knows about the stage it belongs to (using Task.stageId), the partition it works on (using Task.partitionId), and the stage attempt (using Task.stageAttemptId).
OutputCommitCoordinator.taskCompleted is called.
If the reason for task completion is not Success, SparkListenerTaskEnd is posted to LiveListenerBus. The only difference with TaskEndReason: Success is how the stage attempt id is calculated. Here, it is Task.stageAttemptId (not Stage.latestInfo.attemptId).
|
Caution
|
FIXME What is the difference between stage attempt ids? |
If the stage the task belongs to has been cancelled, stageIdToStage should not contain it, and the method quits.
The main processing depends on the TaskEndReason - the reason for task completion (using event.reason). The method skips processing TaskEndReasons: TaskCommitDenied, ExceptionFailure, TaskResultLost, ExecutorLostFailure, TaskKilled, and UnknownReason, i.e. it does nothing.
TaskEndReason: Success
SparkListenerTaskEnd is posted to LiveListenerBus.
The partition the task worked on is removed from pendingPartitions of the stage.
The processing splits per task type - ResultTask or ShuffleMapTask - and DAGScheduler.submitWaitingStages is called.
ResultTask
For ResultTask, the stage is ResultStage. If there is no job active for the stage (using resultStage.activeJob), the following INFO message appears in the logs:
INFO Ignoring result from [task] because its job has finished
Otherwise, check whether the task is marked as running for the job (using job.finished) and proceed. The method skips execution when the task has already been marked as completed in the job.
|
Caution
|
FIXME When could a task that has just finished be ignored, i.e. the job has already marked finished? Could it be for stragglers?
|
DAGScheduler.updateAccumulators(event) is called.
The partition is marked as finished (using job.finished) and the number of partitions calculated increased (using job.numFinished).
If the whole job has finished (when job.numFinished == job.numPartitions), then:
-
markStageAsFinishedis called -
cleanupStateForJobAndIndependentStages(job) -
SparkListenerJobEnd is posted to LiveListenerBus with
JobSucceeded
The JobListener of the job is notified about the task’s successful completion. In case the step fails, i.e. throws an exception, the JobListener is notified about the failure.
|
Caution
|
FIXME When would job.listener.taskSucceeded throw an exception? How?
|
ShuffleMapTask
For ShuffleMapTask, the stage is ShuffleMapStage.
DAGScheduler.updateAccumulators(event) is called.
event.result is MapStatus that knows the executor id where the task has finished (using status.location.executorId).
You should see the following DEBUG message in the logs:
DEBUG ShuffleMapTask finished on [execId]
If failedEpoch contains the executor and the epoch of the ShuffleMapTask is not greater than that in failedEpoch, you should see the following INFO message in the logs:
INFO Ignoring possibly bogus [task] completion from executor [executorId]
Otherwise, shuffleStage.addOutputLoc(smt.partitionId, status) is called.
The method does more processing only if the internal runningStages contains the ShuffleMapStage with no more pending partitions to compute (using shuffleStage.pendingPartitions).
markStageAsFinished(shuffleStage) is called.
The following INFO logs appear in the logs:
INFO looking for newly runnable stages
INFO running: [runningStages]
INFO waiting: [waitingStages]
INFO failed: [failedStages]
mapOutputTracker.registerMapOutputs with changeEpoch is called.
The internal cache of RDD partition locations is cleared.
If the map stage is ready, i.e. all partitions have shuffle outputs, map-stage jobs waiting on this stage (using shuffleStage.mapStageJobs) are marked as finished. MapOutputTrackerMaster is requested for statistics (for shuffleStage.shuffleDep) and every map-stage job is markMapStageJobAsFinished(job, stats).
Otherwise, if the map stage is not ready, the following INFO message appears in the logs:
INFO Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]
shuffleStage is submitted to DAGScheduler for execution.
TaskEndReason: Resubmitted
For Resubmitted case, you should see the following INFO message in the logs:
INFO Resubmitted [task], so marking it as still running
The task (by task.partitionId) is added to the collection of pending partitions of the stage (using stage.pendingPartitions).
|
Tip
|
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched. |
TaskEndReason: FetchFailed
FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) comes with BlockManagerId (as bmAddress) and the other self-explanatory values.
|
Note
|
A task knows about the id of the stage it belongs to. |
When FetchFailed happens, stageIdToStage is used to access the failed stage (using task.stageId and the task is available in event in handleTaskCompletion(event: CompletionEvent)). shuffleToMapStage is used to access the map stage (using shuffleId).
If failedStage.latestInfo.attemptId != task.stageAttemptId, you should see the following INFO in the logs:
INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
|
Caution
|
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?
|
And the case finishes. Otherwise, the case continues.
If the failed stage is in runningStages, the following INFO message shows in the logs:
INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])
markStageAsFinished(failedStage, Some(failureMessage)) is called.
|
Caution
|
FIXME What does markStageAsFinished do?
|
If the failed stage is not in runningStages, the following DEBUG message shows in the logs:
DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running
When disallowStageRetryForTest is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) is called.
|
Caution
|
FIXME Describe disallowStageRetryForTest and abortStage.
|
If the number of fetch failed attempts for the stage exceeds the allowed number, the failed stage is aborted with the reason:
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]
If there are no failed stages reported (DAGScheduler.failedStages is empty), the following INFO shows in the logs:
INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure
And the following code is executed:
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
|
Caution
|
FIXME What does the above code do? |
For all the cases, the failed stage and map stages are both added to the internal collection of failed stages.
If mapId (in the FetchFailed object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress) and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.
|
Caution
|
FIXME What does mapStage.removeOutputLoc do?
|
If bmAddress (in the FetchFailed object for the case) is provided, handleExecutorLost (with fetchFailed enabled) is called.
StageCancelled Event and handleStageCancellation Handler
StageCancelled(stageId: Int) extends DAGSchedulerEvent
StageCancelled is a DAGSchedulerEvent that triggers handleStageCancellation (on a separate thread).
handleStageCancellation Handler
handleStageCancellation(stageId: Int): Unit
handleStageCancellation checks if the input stageId was registered earlier (in the internal stageIdToStage registry) and if it was attempts to cancel the associated jobs (with "because Stage [stageId] was cancelled" cancellation reason).
|
Note
|
A stage tracks the jobs it belongs to using jobIds property.
|
If the stage stageId was not registered earlier, you should see the following INFO message in the logs:
INFO No active jobs to kill for Stage [stageId]
|
Note
|
handleStageCancellation is the result of executing SparkContext.cancelStage(stageId: Int) that is called from the web UI (controlled by spark.ui.killEnabled).
|
JobSubmitted Event and handleJobSubmitted Handler
JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
JobSubmitted is a DAGSchedulerEvent that triggers handleJobSubmitted method (on a separate thread).
handleJobSubmitted Handler
handleJobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties)
handleJobSubmitted creates a new ResultStage (as finalStage in the picture above) and a ActiveJob.
You should see the following INFO messages in the logs:
INFO DAGScheduler: Got job [jobId] ([callSite.shortForm]) with [partitions.length] output partitions
INFO DAGScheduler: Final stage: [finalStage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [getMissingParentStages(finalStage)]
handleJobSubmitted then registers the job in the internal registries, i.e. jobIdToActiveJob and activeJobs, and sets the job for the stage (using setActiveJob).
Ultimately, handleJobSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the stage.
ExecutorAdded Event and handleExecutorAdded Handler
ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
ExecutorAdded is a DAGSchedulerEvent that triggers handleExecutorAdded method (on a separate thread).
Removing Executor From failedEpoch Registry — handleExecutorAdded Handler
handleExecutorAdded(execId: String, host: String)
handleExecutorAdded checks if the input execId executor was registered in failedEpoch and, if it was, removes it from the failedEpoch registry.
You should see the following INFO message in the logs:
INFO Host added was in lost list earlier: [host]