DAGSchedulerEventProcessLoop — dag-scheduler-event-loop DAGScheduler Event Bus

DAGSchedulerEventProcessLoop (dag-scheduler-event-loop) is a EventLoop single "business logic" thread for processing DAGSchedulerEvent events.

Table 1. DAGSchedulerEvents and Event Handlers
DAGSchedulerEvent Event Handler Reason

JobSubmitted

handleJobSubmitted

A Spark job was submitted to DAGScheduler using submitJob or runApproximateJob.

MapStageSubmitted

handleMapStageSubmitted

A ShuffleMapStage was submitted using submitMapStage.

StageCancelled

handleStageCancellation

DAGScheduler was requested to cancel a stage.

JobCancelled

handleJobCancellation

DAGScheduler was requested to cancel a job.

JobGroupCancelled

handleJobGroupCancelled

DAGScheduler was requested to cancel a job group.

AllJobsCancelled

DAGScheduler was requested to cancel all running or waiting jobs.

BeginEvent

handleBeginEvent

TaskSetManager informs DAGScheduler that a task is starting (through taskStarted).

GettingResultEvent

TaskSetManager informs DAGScheduler (through taskGettingResult) that a task has completed and results are being fetched remotely.

CompletionEvent

handleTaskCompletion

TaskSetManager informs DAGScheduler (through taskEnded) that a task has completed successfully or failed.

ExecutorAdded

handleExecutorAdded

DAGScheduler was informed (through executorAdded) that an executor was spun up on a host.

ExecutorLost

handleExecutorLost

DAGScheduler was informed (through executorLost) that an executor was lost.

TaskSetFailed

handleTaskSetFailed

DAGScheduler was requested to cancel a TaskSet

ResubmitFailedStages

resubmitFailedStages

DAGScheduler was informed (through handleTaskCompletion) that a task has finished with a FetchFailed.

When created, DAGSchedulerEventProcessLoop gets the reference to the owning DAGScheduler that it uses to call event handler methods on.

Note
DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely, i.e. up to Integer.MAX_VALUE events.

AllJobsCancelled Event and…​

Caution
FIXME

GettingResultEvent Event and handleGetTaskResult Handler

GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent

GettingResultEvent is a DAGSchedulerEvent that triggers handleGetTaskResult (on a separate thread).

Note
GettingResultEvent is posted to inform DAGScheduler (through taskGettingResult) that a task fetches results.

handleGetTaskResult Handler

handleGetTaskResult(taskInfo: TaskInfo): Unit

handleGetTaskResult merely posts SparkListenerTaskGettingResult (to LiveListenerBus Event Bus).

BeginEvent Event and handleBeginEvent Handler

BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent

BeginEvent is a DAGSchedulerEvent that triggers handleBeginEvent (on a separate thread).

Note
BeginEvent is posted to inform DAGScheduler (through taskStarted) that a TaskSetManager starts a task.

handleBeginEvent Handler

handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit

handleBeginEvent looks the stage of task up in stageIdToStage internal registry to compute the last attempt id (or -1 if not available) and posts SparkListenerTaskStart (to listenerBus event bus).

JobGroupCancelled Event and handleJobGroupCancelled Handler

JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

JobGroupCancelled is a DAGSchedulerEvent that triggers handleJobGroupCancelled (on a separate thread).

Note
JobGroupCancelled is posted when DAGScheduler is informed (through cancelJobGroup) that SparkContext was requested to cancel a job group.

handleJobGroupCancelled Handler

handleJobGroupCancelled(groupId: String): Unit

handleJobGroupCancelled finds active jobs in a group and cancels them.

Internally, handleJobGroupCancelled computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id scheduling property set to groupId.

handleJobGroupCancelled then cancels every active job in the group one by one and the cancellation reason: "part of cancelled job group [groupId]".

MapStageSubmitted Event and handleMapStageSubmitted Handler

MapStageSubmitted(
  jobId: Int,
  dependency: ShuffleDependency[_, _, _],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties = null)
extends DAGSchedulerEvent

MapStageSubmitted is a DAGSchedulerEvent that triggers handleMapStageSubmitted (on a separate thread).

scheduler handlemapstagesubmitted.png
Figure 1. MapStageSubmitted Event Handling
Note
MapStageSubmitted is posted when DAGScheduler is informed (through submitMapStage) that SparkContext.submitMapStage.

handleMapStageSubmitted Handler

handleMapStageSubmitted(jobId: Int,
  dependency: ShuffleDependency[_, _, _],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties): Unit

It is called with a job id (for a new job to be created), a ShuffleDependency, and a JobListener.

You should see the following INFOs in the logs:

Got map stage job %s (%s) with %d output partitions
Final stage: [finalStage] ([finalStage.name])
Parents of final stage: [finalStage.parents]
Missing parents: [list of stages]

SparkListenerJobStart event is posted to LiveListenerBus (so other event listeners know about the event - not only DAGScheduler).

The execution procedure of MapStageSubmitted events is then exactly (FIXME ?) as for JobSubmitted.

Tip

The difference between handleMapStageSubmitted and handleJobSubmitted:

  • handleMapStageSubmitted has ShuffleDependency among the input parameters while handleJobSubmitted has finalRDD, func, and partitions.

  • handleMapStageSubmitted initializes finalStage as getShuffleMapStage(dependency, jobId) while handleJobSubmitted as finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  • handleMapStageSubmitted INFO logs Got map stage job %s (%s) with %d output partitions with dependency.rdd.partitions.length while handleJobSubmitted does Got job %s (%s) with %d output partitions with partitions.length.

  • FIXME: Could the above be cut to ActiveJob.numPartitions?

  • handleMapStageSubmitted adds a new job with finalStage.addActiveJob(job) while handleJobSubmitted sets with finalStage.setActiveJob(job).

  • handleMapStageSubmitted checks if the final stage has already finished, tells the listener and removes it using the code:

    if (finalStage.isAvailable) {
      markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
    }

TaskSetFailed Event and handleTaskSetFailed Handler

TaskSetFailed(
  taskSet: TaskSet,
  reason: String,
  exception: Option[Throwable])
extends DAGSchedulerEvent

TaskSetFailed is a DAGSchedulerEvent that triggers handleTaskSetFailed method.

Note
TaskSetFailed is posted when DAGScheduler is requested to cancel a TaskSet.

handleTaskSetFailed Handler

handleTaskSetFailed(
  taskSet: TaskSet,
  reason: String,
  exception: Option[Throwable]): Unit

handleTaskSetFailed looks the stage (of the input taskSet) up in the internal stageIdToStage registry and aborts it.

ResubmitFailedStages Event and resubmitFailedStages Handler

ResubmitFailedStages extends DAGSchedulerEvent

ResubmitFailedStages is a DAGSchedulerEvent that triggers resubmitFailedStages method.

Note
ResubmitFailedStages is posted for FetchFailed case in handleTaskCompletion.

resubmitFailedStages Handler

resubmitFailedStages(): Unit

resubmitFailedStages iterates over the internal collection of failed stages and submits them.

Note
resubmitFailedStages does nothing when there are no failed stages reported.

You should see the following INFO message in the logs:

INFO Resubmitting failed stages

resubmitFailedStages clears the internal cache of RDD partition locations first. It then makes a copy of the collection of failed stages so DAGScheduler can track failed stages afresh.

Note
At this point DAGScheduler has no failed stages reported.

The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.

ExecutorLost Event and handleExecutorLost Handler — fetchFailed Disabled Case

ExecutorLost(
  execId: String,
  reason: ExecutorLossReason)
extends DAGSchedulerEvent

ExecutorLost is a DAGSchedulerEvent that triggers handleExecutorLost method with fetchFailed disabled, i.e. false.

Note

handleExecutorLost recognizes two cases (by means of fetchFailed):

  • fetch failures (fetchFailed is true) from executors that are indirectly assumed lost. See FetchFailed case in handleTaskCompletion.

  • lost executors (fetchFailed is false) for executors that did not report being alive in a given timeframe

handleExecutorLost Handler

handleExecutorLost(
  execId: String,
  filesLost: Boolean,
  maybeEpoch: Option[Long] = None): Unit

The current epoch could be provided (as the input maybeEpoch) or is requested from MapOutputTrackerMaster.

Caution
FIXME When is maybeEpoch passed in?
dagscheduler handleExecutorLost.png
Figure 2. DAGScheduler.handleExecutorLost

Recurring ExecutorLost events lead to the following repeating DEBUG message in the logs:

DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
Note
handleExecutorLost handler uses DAGScheduler's failedEpoch and FIXME internal registries.

Otherwise, when the executor execId is not in the list of executor lost or the executor failure’s epoch is smaller than the input maybeEpoch, the executor’s lost event is recorded in failedEpoch internal registry.

Caution
FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too.

You should see the following INFO message in the logs:

INFO Executor lost: [execId] (epoch [epoch])
Caution
FIXME Review what’s filesLost.

handleExecutorLost exits unless the ExecutorLost event was for a map output fetch operation (and the input filesLost is true) or external shuffle service is not used.

In such a case, you should see the following INFO message in the logs:

INFO Shuffle files lost for executor: [execId] (epoch [epoch])

handleExecutorLost walks over all ShuffleMapStages in DAGScheduler’s shuffleToMapStage internal registry and do the following (in order):

JobCancelled Event and handleJobCancellation Handler

JobCancelled(jobId: Int) extends DAGSchedulerEvent

JobCancelled is a DAGSchedulerEvent that triggers handleJobCancellation method (on a separate thread).

Note
JobCancelled is posted when DAGScheduler is requested to cancel a job.

handleJobCancellation Handler

handleJobCancellation(jobId: Int, reason: String = "")

handleJobCancellation first makes sure that the input jobId has been registered earlier (using jobIdToStageIds internal registry).

If the input jobId is not known to DAGScheduler, you should see the following DEBUG message in the logs:

DEBUG DAGScheduler: Trying to cancel unregistered job [jobId]

Otherwise, handleJobCancellation fails the active job and all independent stages (by looking up the active job using jobIdToActiveJob) with failure reason:

Job [jobId] cancelled [reason]

CompletionEvent Event and handleTaskCompletion Handler

CompletionEvent event informs DAGScheduler about task completions. It is handled by handleTaskCompletion method.

handleTaskCompletion(event: CompletionEvent): Unit
dagscheduler tasksetmanager.png
Figure 3. DAGScheduler and CompletionEvent
Note
CompletionEvent holds contextual information about the completed task.

The task knows about the stage it belongs to (using Task.stageId), the partition it works on (using Task.partitionId), and the stage attempt (using Task.stageAttemptId).

OutputCommitCoordinator.taskCompleted is called.

If the reason for task completion is not Success, SparkListenerTaskEnd is posted to LiveListenerBus. The only difference with TaskEndReason: Success is how the stage attempt id is calculated. Here, it is Task.stageAttemptId (not Stage.latestInfo.attemptId).

Caution
FIXME What is the difference between stage attempt ids?

If the stage the task belongs to has been cancelled, stageIdToStage should not contain it, and the method quits.

The main processing depends on the TaskEndReason - the reason for task completion (using event.reason). The method skips processing TaskEndReasons: TaskCommitDenied, ExceptionFailure, TaskResultLost, ExecutorLostFailure, TaskKilled, and UnknownReason, i.e. it does nothing.

TaskEndReason: Success

The partition the task worked on is removed from pendingPartitions of the stage.

The processing splits per task type - ResultTask or ShuffleMapTask - and DAGScheduler.submitWaitingStages is called.

ResultTask

For ResultTask, the stage is ResultStage. If there is no job active for the stage (using resultStage.activeJob), the following INFO message appears in the logs:

INFO Ignoring result from [task] because its job has finished

Otherwise, check whether the task is marked as running for the job (using job.finished) and proceed. The method skips execution when the task has already been marked as completed in the job.

Caution
FIXME When could a task that has just finished be ignored, i.e. the job has already marked finished? Could it be for stragglers?

The partition is marked as finished (using job.finished) and the number of partitions calculated increased (using job.numFinished).

If the whole job has finished (when job.numFinished == job.numPartitions), then:

Caution
FIXME When would job.listener.taskSucceeded throw an exception? How?
ShuffleMapTask

For ShuffleMapTask, the stage is ShuffleMapStage.

event.result is MapStatus that knows the executor id where the task has finished (using status.location.executorId).

You should see the following DEBUG message in the logs:

DEBUG ShuffleMapTask finished on [execId]

If failedEpoch contains the executor and the epoch of the ShuffleMapTask is not greater than that in failedEpoch, you should see the following INFO message in the logs:

INFO Ignoring possibly bogus [task] completion from executor [executorId]

Otherwise, shuffleStage.addOutputLoc(smt.partitionId, status) is called.

The method does more processing only if the internal runningStages contains the ShuffleMapStage with no more pending partitions to compute (using shuffleStage.pendingPartitions).

markStageAsFinished(shuffleStage) is called.

The following INFO logs appear in the logs:

INFO looking for newly runnable stages
INFO running: [runningStages]
INFO waiting: [waitingStages]
INFO failed: [failedStages]

mapOutputTracker.registerMapOutputs with changeEpoch is called.

If the map stage is ready, i.e. all partitions have shuffle outputs, map-stage jobs waiting on this stage (using shuffleStage.mapStageJobs) are marked as finished. MapOutputTrackerMaster is requested for statistics (for shuffleStage.shuffleDep) and every map-stage job is markMapStageJobAsFinished(job, stats).

Otherwise, if the map stage is not ready, the following INFO message appears in the logs:

INFO Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]

TaskEndReason: Resubmitted

For Resubmitted case, you should see the following INFO message in the logs:

INFO Resubmitted [task], so marking it as still running

The task (by task.partitionId) is added to the collection of pending partitions of the stage (using stage.pendingPartitions).

Tip
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched.

TaskEndReason: FetchFailed

FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) comes with BlockManagerId (as bmAddress) and the other self-explanatory values.

Note
A task knows about the id of the stage it belongs to.

When FetchFailed happens, stageIdToStage is used to access the failed stage (using task.stageId and the task is available in event in handleTaskCompletion(event: CompletionEvent)). shuffleToMapStage is used to access the map stage (using shuffleId).

If failedStage.latestInfo.attemptId != task.stageAttemptId, you should see the following INFO in the logs:

INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
Caution
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?

And the case finishes. Otherwise, the case continues.

If the failed stage is in runningStages, the following INFO message shows in the logs:

INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])

markStageAsFinished(failedStage, Some(failureMessage)) is called.

Caution
FIXME What does markStageAsFinished do?

If the failed stage is not in runningStages, the following DEBUG message shows in the logs:

DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running

When disallowStageRetryForTest is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) is called.

Caution
FIXME Describe disallowStageRetryForTest and abortStage.
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]

If there are no failed stages reported (DAGScheduler.failedStages is empty), the following INFO shows in the logs:

INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure

And the following code is executed:

messageScheduler.schedule(
  new Runnable {
    override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
Caution
FIXME What does the above code do?

For all the cases, the failed stage and map stages are both added to the internal collection of failed stages.

If mapId (in the FetchFailed object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress) and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.

Caution
FIXME What does mapStage.removeOutputLoc do?

If bmAddress (in the FetchFailed object for the case) is provided, handleExecutorLost (with fetchFailed enabled) is called.

StageCancelled Event and handleStageCancellation Handler

StageCancelled(stageId: Int) extends DAGSchedulerEvent

StageCancelled is a DAGSchedulerEvent that triggers handleStageCancellation (on a separate thread).

handleStageCancellation Handler

handleStageCancellation(stageId: Int): Unit

handleStageCancellation checks if the input stageId was registered earlier (in the internal stageIdToStage registry) and if it was attempts to cancel the associated jobs (with "because Stage [stageId] was cancelled" cancellation reason).

Note
A stage tracks the jobs it belongs to using jobIds property.

If the stage stageId was not registered earlier, you should see the following INFO message in the logs:

INFO No active jobs to kill for Stage [stageId]
Note
handleStageCancellation is the result of executing SparkContext.cancelStage(stageId: Int) that is called from the web UI (controlled by spark.ui.killEnabled).

JobSubmitted Event and handleJobSubmitted Handler

JobSubmitted(
  jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties = null)
extends DAGSchedulerEvent

JobSubmitted is a DAGSchedulerEvent that triggers handleJobSubmitted method (on a separate thread).

handleJobSubmitted Handler

handleJobSubmitted(
  jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties)

handleJobSubmitted creates a new ResultStage (as finalStage in the picture above) and a ActiveJob.

dagscheduler handleJobSubmitted.png
Figure 4. DAGScheduler.handleJobSubmitted Method

You should see the following INFO messages in the logs:

INFO DAGScheduler: Got job [jobId] ([callSite.shortForm]) with [partitions.length] output partitions
INFO DAGScheduler: Final stage: [finalStage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [getMissingParentStages(finalStage)]

handleJobSubmitted then registers the job in the internal registries, i.e. jobIdToActiveJob and activeJobs, and sets the job for the stage (using setActiveJob).

Ultimately, handleJobSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the stage.

ExecutorAdded Event and handleExecutorAdded Handler

ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent

ExecutorAdded is a DAGSchedulerEvent that triggers handleExecutorAdded method (on a separate thread).

Removing Executor From failedEpoch Registry — handleExecutorAdded Handler

handleExecutorAdded(execId: String, host: String)

handleExecutorAdded checks if the input execId executor was registered in failedEpoch and, if it was, removes it from the failedEpoch registry.

You should see the following INFO message in the logs:

INFO Host added was in lost list earlier: [host]

results matching ""

    No results matching ""