DAGScheduler
Note
|
The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards. "Reading the sources", I say?! Yes, I am kidding! |
Introduction
DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling, i.e. after an RDD action has been called it becomes a job that is then transformed into a set of stages that are submitted as TaskSets for execution (see Execution Model).
The fundamental concepts of DAGScheduler
are jobs and stages (refer to Jobs and Stages respectively) that it tracks through internal registries and counters.
DAGScheduler works solely on the driver and is created as part of SparkContext’s initialization (right after TaskScheduler and SchedulerBackend are ready).
DAGScheduler does three things in Spark (thorough explanations follow):
-
Computes an execution DAG, i.e. DAG of stages, for a job.
-
Determines the preferred locations to run each task on.
-
Handles failures due to shuffle output files being lost.
It computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.
In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.
Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.
DAGScheduler uses an event queue architecture in which a thread can post DAGSchedulerEvent
events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.
DAGScheduler runs stages in topological order.
Tip
|
Enable Add the following line to
Refer to Logging. |
DAGScheduler needs SparkContext, Task Scheduler, LiveListenerBus, MapOutputTracker and Block Manager to work. However, at the very minimum, DAGScheduler needs SparkContext only (and asks SparkContext for the other services).
DAGScheduler reports metrics about its execution (refer to the section Metrics).
When DAGScheduler schedules a job as a result of executing an action on a RDD or calling SparkContext.runJob() method directly, it spawns parallel tasks to compute (partial) results per partition.
createResultStage
Internal Method
createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage
Caution
|
FIXME |
createShuffleMapStage
Method
Caution
|
FIXME |
getOrCreateParentStages
Internal Method
getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]
getOrCreateParentStages
computes ShuffleDependency
immediate parents and gets or creates a shuffle map stage for each ShuffleDependency.
Note
|
getOrCreateParentStages is used when DAGScheduler createShuffleMapStage and createResultStage.
|
getShuffleDependencies
Method
getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
getShuffleDependencies
…TK
Caution
|
FIXME |
Note
|
getShuffleDependencies is used when DAGScheduler getOrCreateParentStages and getMissingAncestorShuffleDependencies.
|
getOrCreateShuffleMapStage
Method
Caution
|
FIXME |
getMissingAncestorShuffleDependencies
Internal Method
Caution
|
FIXME |
getMissingAncestorShuffleDependencies(
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]]
Note
|
getMissingAncestorShuffleDependencies is used when DAGScheduler getOrCreateShuffleMapStage.
|
Creating DAGScheduler Instance
DAGScheduler(
sc: SparkContext,
taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
DAGScheduler
requires a SparkContext, TaskScheduler, LiveListenerBus, MapOutputTrackerMaster, BlockManagerMaster, SparkEnv, and a Clock
.
Note
|
DAGScheduler can reference all the services through a single SparkContext.
|
When created, DAGScheduler
does the following (in order):
-
Creates a
DAGSchedulerSource
-
Creates messageScheduler
-
Creates eventProcessLoop and immediatelly sets itself in the current
TaskScheduler
-
Initializes the internal registries and counters.
At the very end of the initialization, DAGScheduler
starts eventProcessLoop.
LiveListenerBus
Event Bus for SparkListenerEvent
s — listenerBus
Property
listenerBus: LiveListenerBus
listenerBus
is a LiveListenerBus to post scheduling events and is passed in when DAGScheduler
is created.
Internal Registries and Counters
DAGScheduler
uses internal registries and counters for managing active jobs and stages.
Name | Description |
---|---|
The next job id counting from Used when |
|
The next stage id counting from Used when |
|
The lookup table for stages per their ids. Used when |
|
The lookup table of all stages per |
|
The lookup table of ShuffleMapStage per shuffle id |
|
The lookup table of |
|
The stages with parents to be computed |
|
The stages currently running. |
|
The collection of the stages that failed due to fetch failures (as reported by CompletionEvents for FetchFailed end reasons). |
|
A collection of |
|
The internal cache of partition locations per RDD. Refer to Cache Tracking. |
|
The lookup table of lost executors and the epoch of the event. |
runApproximateJob
Method
Caution
|
FIXME |
executorHeartbeatReceived
Method
executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean
executorHeartbeatReceived
posts a SparkListenerExecutorMetricsUpdate (to listenerBus) and informs BlockManagerMaster that blockManagerId
block manager is alive (by posting BlockManagerHeartbeat).
Note
|
executorHeartbeatReceived is called when TaskSchedulerImpl handles executorHeartbeatReceived .
|
Cleaning Up Job State and Independent Stages — cleanupStateForJobAndIndependentStages
Method
cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit
cleanupStateForJobAndIndependentStages
cleans up the state for job
and any stages that are not part of any other job.
cleanupStateForJobAndIndependentStages
looks the job
up in the internal jobIdToStageIds registry.
If no stages are found, the following ERROR is printed out to the logs:
ERROR No stages registered for job [jobId]
Oterwise, cleanupStateForJobAndIndependentStages
uses stageIdToStage registry to find the stages (the real objects not ids!).
For each stage, cleanupStateForJobAndIndependentStages
reads the jobs the stage belongs to.
If the job
does not belong to the jobs of the stage, the following ERROR is printed out to the logs:
ERROR Job [jobId] not registered for stage [stageId] even though that stage was registered for the job
If the job
was the only job for the stage, the stage (and the stage id) gets cleaned up from the registries, i.e. runningStages, shuffleIdToMapStage, waitingStages, failedStages and stageIdToStage.
While removing from runningStages, you should see the following DEBUG message in the logs:
DEBUG Removing running stage [stageId]
While removing from waitingStages, you should see the following DEBUG message in the logs:
DEBUG Removing stage [stageId] from waiting set.
While removing from failedStages, you should see the following DEBUG message in the logs:
DEBUG Removing stage [stageId] from failed set.
After all cleaning (using stageIdToStage as the source registry), if the stage belonged to the one and only job
, you should see the following DEBUG message in the logs:
DEBUG After removal of stage [stageId], remaining stages = [stageIdToStage.size]
The job
is removed from jobIdToStageIds, jobIdToActiveJob, activeJobs registries.
The final stage of the job
is removed, i.e. ResultStage or ShuffleMapStage.
Note
|
cleanupStateForJobAndIndependentStages is used in handleTaskCompletion when a ResultTask has completed successfully, failJobAndIndependentStages and markMapStageJobAsFinished.
|
Marking MapStage Job Finished — markMapStageJobAsFinished
Method
markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit
markMapStageJobAsFinished
marks map stage jobs finished and notifies Spark listeners.
Internally, markMapStageJobAsFinished
marks the zeroth partition finished and increases the number of tasks finished in job
.
Ultimately, SparkListenerJobEnd is posted to LiveListenerBus (as listenerBus) for the job
, the current time (in millis) and JobSucceeded
job result.
Note
|
markMapStageJobAsFinished is used in handleMapStageSubmitted and handleTaskCompletion.
|
Clearing Cache of RDD Partition Locations — clearCacheLocs
Internal Method
clearCacheLocs(): Unit
clearCacheLocs
clears the internal cache of the partition locations of all RDDs.
Note
|
DAGScheduler clears the cache while resubmitting failed stages, and as a result of JobSubmitted, MapStageSubmitted, CompletionEvent, ExecutorLost events.
|
Failing Job and Single-Job Stages — failJobAndIndependentStages
Internal Method
failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit
The internal failJobAndIndependentStages
method fails the input job
and all the stages that are only used by the job.
Internally, failJobAndIndependentStages
uses jobIdToStageIds
internal registry to look up the stages registered for the job.
If no stages could be found, you should see the following ERROR message in the logs:
ERROR No stages registered for job [id]
Otherwise, for every stage, failJobAndIndependentStages
finds the job ids the stage belongs to.
If no stages could be found or the job is not referenced by the stages, you should see the following ERROR message in the logs:
ERROR Job [id] not registered for stage [id] even though that stage was registered for the job
Only when there is exactly one job registered for the stage and the stage is in RUNNING state (in runningStages
internal registry), TaskScheduler
is requested to cancel the stage’s tasks and marks the stage finished.
Note
|
failJobAndIndependentStages is called from handleJobCancellation and abortStage .
|
Note
|
failJobAndIndependentStages uses jobIdToStageIds, stageIdToStage, and runningStages internal registries.
|
Posting JobSubmitted
Event — submitJob
method
submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U]
submitJob
creates a JobWaiter and posts a JobSubmitted
event.
Internally, submitJob
does the following:
You may see a IllegalArgumentException
thrown when the input partitions
references partitions not in the input rdd
:
Attempting to access a non-existent partition: [p]. Total number of partitions: [maxPartitions]
Note
|
submitJob is called when SparkContext submits a job and DAGScheduler runs a job.
|
Note
|
submitJob assumes that the partitions of a RDD are indexed from 0 onwards in sequential order.
|
Posting StageCancelled
Event — cancelStage
Method
cancelStage(stageId: Int)
cancelJobGroup
merely posts a StageCancelled event to the DAGScheduler’s Internal Event Bus.
Note
|
cancelStage is executed when a SparkContext is requested to cancel a stage.
|
Posting JobGroupCancelled
Event — cancelJobGroup
Method
cancelJobGroup(groupId: String): Unit
cancelJobGroup
prints the following INFO message to the logs followed by posting a JobGroupCancelled event to the DAGScheduler’s Internal Event Bus.
INFO Asked to cancel job group [groupId]
Note
|
cancelJobGroup is executed when a SparkContext is requested to cancel a specified group of jobs.
|
Posting AllJobsCancelled
Event — cancelAllJobs
Method
cancelAllJobs(): Unit
cancelAllJobs
merely posts a AllJobsCancelled event to the DAGScheduler’s Internal Event Bus.
Note
|
cancelAllJobs is executed when a SparkContext is requested to cancel all running and scheduled Spark jobs.
|
Posting BeginEvent
Event — taskStarted
Method
taskStarted(task: Task[_], taskInfo: TaskInfo)
taskStarted
merely posts a BeginEvent event to the DAGScheduler’s Internal Event Bus.
Note
|
taskStarted is executed when a TaskSetManager starts a task.
|
Posting GettingResultEvent
Event — taskGettingResult
Method
taskGettingResult(taskInfo: TaskInfo)
taskGettingResult
merely posts a GettingResultEvent event to the DAGScheduler’s Internal Event Bus.
Note
|
taskGettingResult is executed when a TaskSetManager gets notified about a task fetching result.
|
Posting CompletionEvent
Event — taskEnded
Method
taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics): Unit
taskEnded
merely posts a CompletionEvent event to the DAGScheduler’s Internal Event Bus.
Note
|
taskEnded is called when a TaskSetManager reports task completions, i.e. successes or failures.
|
Tip
|
Read about TaskMetrics in TaskMetrics.
|
Posting MapStageSubmitted
Event — submitMapStage
Method
submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics]
submitMapStage
posts a MapStageSubmitted event to the DAGScheduler’s Internal Event Bus and returns the JobWaiter with one task only and a result handler that will call the callback
function.
submitMapStage
increments nextJobId for the job id.
Note
|
submitMapStage is used when SparkContext submits a map stage for execution.
|
Posting TaskSetFailed
Event — taskSetFailed
Method
taskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit
taskSetFailed
simply posts a TaskSetFailed to DAGScheduler’s Internal Event Bus.
Note
|
The input arguments of taskSetFailed are exactly the arguments of TaskSetFailed.
|
Note
|
taskSetFailed is executed when a TaskSetManager is aborted.
|
Posting ExecutorLost
Event — executorLost
Method
executorLost(execId: String, reason: ExecutorLossReason): Unit
executorLost
simply posts a ExecutorLost event to DAGScheduler’s Internal Event Bus.
Posting ExecutorAdded
Event — executorAdded
Method
executorAdded(execId: String, host: String): Unit
executorAdded
simply posts a ExecutorAdded event to DAGScheduler’s Internal Event Bus.
Posting JobCancelled
Event — cancelJob
Method
cancelJob(jobId: Int): Unit
cancelJob
prints the following INFO message and posts a JobCancelled to DAGScheduler’s Internal Event Bus.
INFO DAGScheduler: Asked to cancel job [id]
Note
|
cancelJob is called when SparkContext and JobWaiter are requested to cancel a Spark job.
|
messageScheduler
Single-Thread Executor
Caution
|
FIXME |
Submitting Action Job — runJob
Method
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit
runJob
submits an action job to the DAGScheduler
and waits for a result.
When the job succeeds, you should see the following INFO message in the logs:
INFO Job [jobId] finished: [callSite], took [time] s
When the job fails, you should see the following INFO message in the logs and the exception (that led to the failure) is thrown.
INFO Job [jobId] failed: [callSite], took [time] s
Note
|
runJob is used when SparkContext runs a job.
|
Aborting Stage — abortStage
Internal Method
abortStage(
failedStage: Stage,
reason: String,
exception: Option[Throwable]): Unit
abortStage
is an internal method that finds all the active jobs that depend on the failedStage
stage and fails them.
Internally, abortStage
looks the failedStage
stage up in the internal stageIdToStage registry and exits if there the stage was not registered earlier.
If it was, abortStage
finds all the active jobs (in the internal activeJobs registry) with the final stage depending on the failedStage
stage.
At this time, the completionTime
property (of the failed stage’s StageInfo
) is assigned to the current time (millis).
All the active jobs that depend on the failed stage (as calculated above) and the stages that do not belong to other jobs (aka independent stages) are failed (with the failure reason being "Job aborted due to stage failure: [reason]" and the input exception
).
If there are no jobs depending on the failed stage, you should see the following INFO message in the logs:
INFO Ignoring failure of [failedStage] because all jobs depending on it are done
Note
|
abortStage is used to handle TaskSetFailed event, when submitting a stage with no active job
|
Checking If Stage Depends on Another Stage — stageDependsOn
Method
stageDependsOn(stage: Stage, target: Stage): Boolean
stageDependsOn
compares two stages and returns whether the stage
depends on target
stage (i.e. true
) or not (i.e. false
).
Note
|
A stage A depends on stage B if B is among the ancestors of A .
|
Internally, stageDependsOn
walks through the graph of RDDs of the input stage
. For every RDD in the RDD’s dependencies (using RDD.dependencies
) stageDependsOn
adds the RDD of a NarrowDependency
to a stack of RDDs to visit while for a ShuffleDependency
it getOrCreateShuffleMapStage for the dependency and the stage
's first job id that it later adds to a stack of RDDs to visit if the map stage is ready, i.e. all the partitions have shuffle outputs.
After all the RDDs of the input stage
are visited, stageDependsOn
checks if the target
's RDD is among the RDDs of the stage
, i.e. whether the stage
depends on target
stage.
Marking Stage Finished — markStageAsFinished
Internal Method
markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit
Caution
|
FIXME |
dag-scheduler-event-loop — DAGScheduler’s Internal Event Bus
eventProcessLoop
is DAGScheduler’s event bus to which Spark (by submitJob) posts jobs to schedule their execution. Later on, TaskSetManager talks back to DAGScheduler
to inform about the status of the tasks using the same "communication channel".
It allows Spark to release the current thread when posting happens and let the event loop handle events on a separate thread - asynchronously.
…IMAGE…FIXME
Caution
|
FIXME statistics? MapOutputStatistics ?
|
Submitting Waiting Stages for Execution — submitWaitingStages
Method
submitWaitingChildStages(parent: Stage): Unit
submitWaitingStages
method checks for waiting or failed stages that could now be eligible for submission.
When executed, you should see the following TRACE
messages in the logs:
TRACE DAGScheduler: Checking for newly runnable parent stages
TRACE DAGScheduler: running: [runningStages]
TRACE DAGScheduler: waiting: [waitingStages]
TRACE DAGScheduler: failed: [failedStages]
The method clears the internal waitingStages
set with stages that wait for their parent stages to finish.
It goes over the waiting stages sorted by job ids in increasing order and calls submitStage method.
Submitting Stage for Execution — submitStage
Internal Method
submitStage(stage: Stage)
submitStage
is an internal method that DAGScheduler
uses to submit the input stage
or its missing parents (if there any).
Note
|
submitStage is also used to resubmit failed stages.
|
submitStage
recursively submits any missing parents of the stage.
Internally, submitStage
first finds the earliest-created ActiveJob
that needs the stage
.
You should see the following DEBUG message in the logs:
DEBUG DAGScheduler: submitStage([stage])
Only when the stage
is not in waiting (waitingStages
), running (runningStages
) or failed states submitStage
proceeds.
The list of missing parent stages of the stage
is calculated (sorted by their ids) and the following DEBUG message shows up in the logs:
DEBUG DAGScheduler: missing: [missing]
When the stage
has no parent stages missing, you should see the following INFO message in the logs:
INFO DAGScheduler: Submitting [stage] ([stage.rdd]), which has no missing parents
The stage is submitted. That finishes the stage submission.
If however there are missing parent stages for the stage
, all parent stages are submitted (by id in increasing order), and the stage
is added to waitingStages
stages.
In case when submitStage
could find no active job for the stage
, it aborts the stage with the reason:
No active job for stage [id]
Calculating Missing Parent Map Stages — getMissingParentStages
Internal Method
getMissingParentStages(stage: Stage): List[Stage]
getMissingParentStages
calculates missing parent map stages for the input stage
.
It starts with the stage’s target RDD (as stage.rdd
). If there are uncached partitions, it traverses the dependencies of the RDD (as RDD.dependencies
) that can be the instances of ShuffleDependency or NarrowDependency.
For each ShuffleDependency, the method searches for the corresponding ShuffleMapStage (using getShuffleMapStage
) and if unavailable, the method adds it to a set of missing (map) stages.
Caution
|
FIXME Review getShuffleMapStage
|
Caution
|
FIXME…IMAGE with ShuffleDependencies queried |
It continues traversing the chain for each NarrowDependency (using Dependency.rdd
).
Fault recovery - stage attempts
A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).
If TaskScheduler
reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent
with FetchFailed
, or an ExecutorLost event. DAGScheduler
will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets
for any lost stage(s) that compute the missing tasks.
Please note that tasks from the old attempts of a stage could still be running.
A stage object tracks multiple StageInfo
objects to pass to Spark listeners or the web UI.
The latest StageInfo
for the most recent attempt for a stage is accessible through latestInfo
.
Cache Tracking
DAGScheduler tracks which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of an RDD.
Caution
|
FIXME: A diagram, please |
If the storage level of an RDD is NONE, there is no caching and hence no partition cache locations are available. In such cases, whenever asked, DAGScheduler returns a collection with empty-location elements for each partition. The empty-location elements are to mark uncached partitions.
Otherwise, a collection of RDDBlockId
instances for each partition is created and spark-BlockManagerMaster.adoc[BlockManagerMaster] is asked for locations (using BlockManagerMaster.getLocations
). The result is then mapped to a collection of TaskLocation
for host and executor id.
Preferred Locations
DAGScheduler computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
Adaptive Query Planning
See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.
DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.
ScheduledExecutorService daemon services
DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):
-
dag-scheduler-message
- a daemon thread pool usingj.u.c.ScheduledThreadPoolExecutor
with core pool size1
. It is used to post a ResubmitFailedStages event whenFetchFailed
is reported.
They are created using ThreadUtils.newDaemonSingleThreadScheduledExecutor
method that uses Guava DSL to instantiate a ThreadFactory.
Submitting Missing Tasks for Stage and Job — submitMissingTasks
Internal Method
submitMissingTasks(stage: Stage, jobId: Int): Unit
submitMissingTasks
is a private method that…FIXME
When executed, it prints the following DEBUG message out to the logs:
DEBUG DAGScheduler: submitMissingTasks([stage])
The stage’s pendingPartitions
internal field is cleared (it is later filled out with the partitions to run tasks for).
The stage is asked for partitions to compute (see findMissingPartitions in Stages).
The method adds the stage to runningStages internal registry.
The stage is told to be started to OutputCommitCoordinator (using outputCommitCoordinator.stageStart
)
Caution
|
FIXME Review outputCommitCoordinator.stageStart
|
The mapping between task ids and task preferred locations is computed (see getPreferredLocs - Computing Preferred Locations for Tasks and Partitions).
A new stage attempt is created (using Stage.makeNewStageAttempt
).
SparkListenerStageSubmitted is posted.
The stage is serialized and broadcast to workers using SparkContext.broadcast method, i.e. it is Serializer.serialize
to calculate taskBinaryBytes
- an array of bytes of (rdd, func) for ResultStage and (rdd, shuffleDep) for ShuffleMapStage.
Caution
|
FIXME Review taskBinaryBytes .
|
When serializing the stage fails, the stage is removed from runningStages internal registry, the stage is aborted and the method stops.
At this point in time, the stage is on workers.
For each partition to compute for the stage, a collection of ShuffleMapTask for ShuffleMapStage or
ResultTask
for ResultStage is created.
Caution
|
FIXME Image with creating tasks for partitions in the stage. |
Any issue with creating a task leads to aborting the stage and removing the stage
from runningStages internal registry.
If there are tasks to launch (there are missing partitions in the stage), the following INFO and DEBUG messages are in the logs:
INFO DAGScheduler: Submitting [tasks.size] missing tasks from [stage] ([stage.rdd])
DEBUG DAGScheduler: New pending partitions: [stage.pendingPartitions]
All tasks in the collection become a TaskSet for TaskScheduler.submitTasks.
In case of no tasks to be submitted for a stage, a DEBUG message shows up in the logs.
For ShuffleMapStage:
DEBUG DAGScheduler: Stage [stage] is actually done; (available: [stage.isAvailable],available outputs: [stage.numAvailableOutputs],partitions: [stage.numPartitions])
For ResultStage:
DEBUG DAGScheduler: Stage [stage] is actually done; (partitions: [numPartitions])
Note
|
submitMissingTasks is called when…
|
Computing Preferred Locations for Tasks and Partitions — getPreferredLocs
Method
getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation]
Caution
|
FIXME Review + why does the method return a sequence of TaskLocations? |
Note
|
Task ids correspond to partition ids. |
Stopping DAGScheduler
— stop
Method
stop(): Unit
stop
stops the internal dag-scheduler-message
thread pool, dag-scheduler-event-loop, and TaskScheduler.
Metrics
Spark’s DAGScheduler uses Spark Metrics System (via DAGSchedulerSource
) to report metrics about internal status.
Caution
|
FIXME What is DAGSchedulerSource ?
|
The name of the source is DAGScheduler.
It emits the following numbers:
-
stage.failedStages - the number of failed stages
-
stage.runningStages - the number of running stages
-
stage.waitingStages - the number of waiting stages
-
job.allJobs - the number of all jobs
-
job.activeJobs - the number of active jobs
Updating Accumulators with Partial Values from Completed Tasks — updateAccumulators
Internal Method
updateAccumulators(event: CompletionEvent): Unit
The private updateAccumulators
method merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.
Note
|
It is called by handleTaskCompletion. |
For each AccumulableInfo in the CompletionEvent
, a partial value from a task is obtained (from AccumulableInfo.update
) and added to the driver’s accumulator (using Accumulable.++=
method).
For named accumulators with the update value being a non-zero value, i.e. not Accumulable.zero
:
-
stage.latestInfo.accumulables
for theAccumulableInfo.id
is set -
CompletionEvent.taskInfo.accumulables
has a new AccumulableInfo added.
Caution
|
FIXME Where are Stage.latestInfo.accumulables and CompletionEvent.taskInfo.accumulables used?
|