log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG
CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend
is a SchedulerBackend and ExecutorAllocationClient.
It is responsible for requesting resources from a cluster manager for executors to be able to launch tasks (on coarse-grained executors).
This backend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.
When being created, CoarseGrainedSchedulerBackend
requires a Task Scheduler, and a RPC Environment.
It uses LiveListenerBus.
It registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.
It tracks:
-
the total number of cores in the cluster (using
totalCoreCount
) -
the total number of executors that are currently registered
-
executors (
ExecutorData
) -
executors to be removed (
executorsPendingToRemove
) -
hosts and the number of possible tasks possibly running on them
-
lost executors with no real exit reason
-
tasks per slaves (
taskIdsOnSlave
)
Known Implementations:
-
Spark Standalone’s StandaloneSchedulerBackend
-
Spark on YARN’s YarnSchedulerBackend
-
Spark on Mesos’s MesosCoarseGrainedSchedulerBackend
Tip
|
Enable Add the following line to Refer to Logging. |
Creating CoarseGrainedSchedulerBackend
Instance
CoarseGrainedSchedulerBackend
requires a task scheduler and a RPC Environment when being created.
It initializes the following registries:
-
totalCoreCount to
0
-
totalRegisteredExecutors to
0
-
maxRpcMessageSize to spark.rpc.message.maxSize.
-
_minRegisteredRatio to spark.scheduler.minRegisteredResourcesRatio (between
0
and1
inclusive). -
maxRegisteredWaitingTimeMs to spark.scheduler.maxRegisteredResourcesWaitingTime.
-
createTime to the current time.
-
executorDataMap to an empty collection.
-
numPendingExecutors to
0
-
executorsPendingToRemove to an empty collection.
-
hostToLocalTaskCount to an empty collection.
-
localityAwareTasks to
0
It accesses the current LiveListenerBus and SparkConf through the constructor’s reference to TaskSchedulerImpl.
Getting Executor Ids — getExecutorIds
Method
When called, getExecutorIds
simply returns executor ids from the internal executorDataMap registry.
Note
|
It is called when SparkContext calculates executor ids. |
CoarseGrainedSchedulerBackend Contract
Caution
|
FIXME |
doRequestTotalExecutors
Method
doRequestTotalExecutors(requestedTotal: Int): Boolean = false
doRequestTotalExecutors
requests requestedTotal
executors from a cluster manager. It is a protected
method that returns false
by default (that coarse-grained scheduler backends are supposed to further customize).
Note
|
It is called when In fact, all the aforementioned methods are due to the ExecutorAllocationClient contract that |
Note
|
It is customized by the coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos. |
Internal Registries
currentExecutorIdCounter
Counter
currentExecutorIdCounter
is the last (highest) identifier of all allocated executors.
Note
|
It is exclusively used in YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message.
|
executorDataMap
Registry
executorDataMap = new HashMap[String, ExecutorData]
executorDataMap
tracks executor data by executor id.
It uses ExecutorData
that holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.
Note
|
A new executor (id, data) pair is added when DriverEndpoint receives RegisterExecutor message and removed when DriverEndpoint receives RemoveExecutor message or a remote host (with one or many executors) disconnects.
|
numPendingExecutors
Caution
|
FIXME |
numExistingExecutors
Caution
|
FIXME |
executorsPendingToRemove
Caution
|
FIXME |
localityAwareTasks
Caution
|
FIXME |
hostToLocalTaskCount
Caution
|
FIXME |
Requesting Additional Executors — requestExecutors
Method
requestExecutors(numAdditionalExecutors: Int): Boolean
requestExecutors
is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false
by default).
Note
|
requestExecutors method is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).
|
When called, you should see the following INFO message followed by DEBUG message in the logs:
INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]
The internal numPendingExecutors
is increased by the input numAdditionalExecutors
.
requestExecutors
requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.
If numAdditionalExecutors
is negative, a IllegalArgumentException
is thrown:
Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
Note
|
It is a final method that no other scheduler backends could customize further. |
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
Requesting Exact Number of Executors — requestTotalExecutors
Method
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors
is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false
by default).
Note
|
requestTotalExecutors is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting the exact number of executors.
|
It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors
and the executors pending removal decreased by the number of already-assigned executors.
If numExecutors
is negative, a IllegalArgumentException
is thrown:
Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
Note
|
It is a final method that no other scheduler backends could customize further. |
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
minRegisteredRatio
Property
minRegisteredRatio: Double
minRegisteredRatio
returns a ratio between 0
and 1
(inclusive). You can use spark.scheduler.minRegisteredResourcesRatio to control the value.
Starting CoarseGrainedSchedulerBackend
— start
Method
start
initializes CoarseGrainedScheduler RPC Endpoint.
Note
|
start is part of the SchedulerBackend Contract.
|
Note
|
The RPC Environment is passed on as an constructor parameter. |
Stopping CoarseGrainedSchedulerBackend
— stop
Method
stop
method stops executors and CoarseGrainedScheduler RPC endpoint.
Note
|
stop is part of the SchedulerBackend Contract.
|
Note
|
When called with no driverEndpoint both stop() and stopExecutors() do nothing. driverEndpoint is initialized in start and the initialization order matters.
|
It prints INFO to the logs:
INFO Shutting down all executors
It then sends StopExecutors message to driverEndpoint
. It disregards the response.
It sends StopDriver message to driverEndpoint
. It disregards the response.
Compute Default Level of Parallelism — defaultParallelism
Method
The default parallelism is controlled by spark.default.parallelism or is at least 2
or totalCoreCount
.
Note
|
defaultParallelism is part of the SchedulerBackend Contract.
|
Reviving Offers — reviveOffers
Method
Note
|
reviveOffers is part of the SchedulerBackend Contract.
|
reviveOffers
simply sends a ReviveOffers message to driverEndpoint (so it is processed asynchronously, i.e. on a separate thread, later on).
Killing Task — killTask
Method
killTask
simply sends a KillTask message to driverEndpoint.
Caution
|
FIXME Image |
Note
|
killTask is part of the SchedulerBackend Contract.
|
Delaying Task Launching — isReady
Method
isReady
is a custom implementation of isReady from the SchedulerBackend
Contract that allows to delay task launching until sufficient resources are registered or spark.scheduler.maxRegisteredResourcesWaitingTime passes.
Note
|
isReady is used exclusively by TaskSchedulerImpl.waitBackendReady.
|
It starts checking whether there are sufficient resources available (using sufficientResourcesRegistered method).
Note
|
By default sufficientResourcesRegistered always responds that sufficient resources are available.
|
If sufficient resources are available, you should see the following INFO message in the logs:
INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
The method finishes returning true
.
Note
|
minRegisteredRatio in the logs above is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.
|
In case there are no sufficient resources available yet (the above requirement does not hold), it checks whether the time from the startup (as createTime
) passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to submit tasks (despite minRegisteredRatio
not being reached yet).
You should see the following INFO message in the logs:
INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)
The method finishes returning true
.
Otherwise, when no sufficient resources are available and maxRegisteredWaitingTimeMs has not been passed, it finishes returning false
.
sufficientResourcesRegistered
Method
sufficientResourcesRegistered
always responds that sufficient resources are available.
Stop All Executors — stopExecutors
Method
stopExecutors
sends a blocking StopExecutors message to driverEndpoint (if already initialized).
Note
|
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.
|
You should see the following INFO message in the logs:
INFO CoarseGrainedSchedulerBackend: Shutting down all executors
Reset State — reset
Method
reset
resets the internal state:
-
Sets
numPendingExecutors
to 0 -
Clears
executorsPendingToRemove
-
Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal
executorDataMap
) to inform it aboutSlaveLost
with the message:Stale executor after cluster manager re-registered.
reset
is a method that is defined in CoarseGrainedSchedulerBackend
, but used and overriden exclusively by YarnSchedulerBackend.
Remove Executor — removeExecutor
Method
removeExecutor(executorId: String, reason: ExecutorLossReason)
removeExecutor
sends a blocking RemoveExecutor message to driverEndpoint.
Note
|
It is called by subclasses SparkDeploySchedulerBackend, CoarseMesosSchedulerBackend, and YarnSchedulerBackend. |
CoarseGrainedScheduler RPC Endpoint — driverEndpoint
When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.
Internally, it is a DriverEndpoint object available as the driverEndpoint
internal field.
Note
|
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).
|
It is called standalone scheduler’s driver endpoint internally.
It tracks:
-
Executor addresses (host and port) for executors (
addressToExecutorId
) - it is set when an executor connects to register itself. See RegisterExecutor RPC message. -
Total number of core count (
totalCoreCount
) - the sum of all cores on all executors. See RegisterExecutor RPC message. -
The number of executors available (
totalRegisteredExecutors
). See RegisterExecutor RPC message. -
ExecutorData
for each registered executor (executorDataMap
). See RegisterExecutor RPC message.
It uses driver-revive-thread
daemon single-thread thread pool for …FIXME
Caution
|
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.
|
-
spark.scheduler.revive.interval
(default:1s
) - time between reviving offers.
RPC Messages
KillTask(taskId, executorId, interruptThread)
RemoveExecutor
ReviveOffers
ReviveOffers
simply passes the call on to makeOffers.
Caution
|
FIXME When is an executor alive? What other states can an executor be in? |
StatusUpdate
StatusUpdate(
executorId: String,
taskId: Long,
state: TaskState,
data: SerializableBuffer)
extends CoarseGrainedClusterMessage
Caution
|
FIXME |
StopExecutors
StopExecutors
message is receive-reply and blocking. When received, the following INFO message appears in the logs:
INFO Asking each executor to shut down
It then sends a StopExecutor message to every registered executor (from executorDataMap
).
RegisterExecutor
RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
Note
|
RegisterExecutor is sent when CoarseGrainedExecutorBackend (RPC Endpoint) starts.
|
Only one executor can register under executorId
.
INFO Registered executor [executorRef] ([executorAddress]) with ID [executorId]
It does internal bookkeeping like updating addressToExecutorId
, totalCoreCount
, and totalRegisteredExecutors
, executorDataMap
.
When numPendingExecutors
is more than 0
, the following is printed out to the logs:
DEBUG Decremented number of pending executors ([numPendingExecutors] left)
It replies with RegisteredExecutor(executorAddress.host)
(consult RPC Messages of CoarseGrainedExecutorBackend).
It then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.
Ultimately, makeOffers is called.
DriverEndpoint
DriverEndpoint
is a ThreadSafeRpcEndpoint.
onDisconnected Callback
When called, onDisconnected
removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).
While removing, it calls removeExecutor with the reason being SlaveLost
and message:
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Note
|
onDisconnected is called when a remote host is lost.
|
Making Resource Offers — makeOffers
Method
makeOffers(): Unit
makeOffers
is a private method that takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer
resource offers for each (one per executor with the executor’s id, host and free cores).
Caution
|
Only free cores are considered in making offers. Memory is not! Why?! |
It then requests TaskSchedulerImpl
to process the resource offers to create a collection of TaskDescription
collections that it in turn uses to launch tasks.
Launching Tasks — launchTasks
Method
launchTasks(tasks: Seq[Seq[TaskDescription]])
launchTasks
is a private helper method that iterates over TaskDescription
objects in the tasks
input collection and …FIXME
Note
|
launchTasks gets called when CoarseGrainedSchedulerBackend is making resource offers.
|
Internally, it serializes a TaskDescription
(using the global closure Serializer) to a serialized task and checks the size of the serialized format of the task so it is less than maxRpcMessageSize
.
Caution
|
FIXME Describe maxRpcMessageSize .
|
If the serialized task’s size is over the maximum RPC message size, the task’s TaskSetManager
is aborted.
Caution
|
FIXME At that point, tasks have their executor assigned. When and how did that happen? |
If the serialized task’s size is correct, the task’s executor is looked up in the internal executorDataMap registry to record that the task is about to be launched and the number of free cores of the executor is decremented by the CPUS_PER_TASK
constant (i.e. spark.task.cpus).
Note
|
ExecutorData keeps track of the number of free cores of the executor (as freeCores ) as well as the RpcEndpointRef of the executor to send tasks to launch to (as executorEndpoint ).
|
You should see the following INFO in the logs:
INFO DriverEndpoint: Launching task [taskId] on executor id: [executorId] hostname: [executorHost].
Ultimately, launchTasks
sends a LaunchTask message to the executor’s RPC endpoint with the serialized task (wrapped in SerializableBuffer
).
Note
|
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is constrained by the number of cores available only. When submitting Spark application for execution both — memory and cores — can be specified explicitly. |
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map output size (serialized) information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size. |
|
Maximum of |
Default parallelism for the scheduler backend. |
|
|
Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks. See isReady in this document. |
|
|
Time to wait for sufficient resources available. See isReady in this document. |