CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a SchedulerBackend and ExecutorAllocationClient.

It is responsible for requesting resources from a cluster manager for executors to be able to launch tasks (on coarse-grained executors).

This backend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

When being created, CoarseGrainedSchedulerBackend requires a Task Scheduler, and a RPC Environment.

It uses LiveListenerBus.

It registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

It tracks:

  • the total number of cores in the cluster (using totalCoreCount)

  • the total number of executors that are currently registered

  • executors (ExecutorData)

  • executors to be removed (executorsPendingToRemove)

  • hosts and the number of possible tasks possibly running on them

  • lost executors with no real exit reason

  • tasks per slaves (taskIdsOnSlave)

Known Implementations:

  1. Spark Standalone’s StandaloneSchedulerBackend

  2. Spark on YARN’s YarnSchedulerBackend

  3. Spark on Mesos’s MesosCoarseGrainedSchedulerBackend

Tip

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG

Refer to Logging.

Creating CoarseGrainedSchedulerBackend Instance

CoarseGrainedSchedulerBackend requires a task scheduler and a RPC Environment when being created.

It initializes the following registries:

It accesses the current LiveListenerBus and SparkConf through the constructor’s reference to TaskSchedulerImpl.

Getting Executor Ids — getExecutorIds Method

When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.

Note
It is called when SparkContext calculates executor ids.

CoarseGrainedSchedulerBackend Contract

Caution
FIXME

doRequestTotalExecutors Method

doRequestTotalExecutors(requestedTotal: Int): Boolean = false

doRequestTotalExecutors requests requestedTotal executors from a cluster manager. It is a protected method that returns false by default (that coarse-grained scheduler backends are supposed to further customize).

Note

It is called when CoarseGrainedSchedulerBackend requests additional or total number of executors, or when killing unneeded executors.

In fact, all the aforementioned methods are due to the ExecutorAllocationClient contract that CoarseGrainedSchedulerBackend follows.

Note
It is customized by the coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos.

Internal Registries

currentExecutorIdCounter Counter

currentExecutorIdCounter is the last (highest) identifier of all allocated executors.

executorDataMap Registry

executorDataMap = new HashMap[String, ExecutorData]

executorDataMap tracks executor data by executor id.

It uses ExecutorData that holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.

numPendingExecutors

Caution
FIXME

numExistingExecutors

Caution
FIXME

executorsPendingToRemove

Caution
FIXME

localityAwareTasks

Caution
FIXME

hostToLocalTaskCount

Caution
FIXME

Requesting Additional Executors — requestExecutors Method

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

Note
requestExecutors method is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).

When called, you should see the following INFO message followed by DEBUG message in the logs:

INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]

The internal numPendingExecutors is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors — requestTotalExecutors Method

requestTotalExecutors(
  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

minRegisteredRatio Property

minRegisteredRatio: Double

minRegisteredRatio returns a ratio between 0 and 1 (inclusive). You can use spark.scheduler.minRegisteredResourcesRatio to control the value.

Starting CoarseGrainedSchedulerBackend — start Method

CoarseGrainedScheduler rpc endpoint.png
Figure 1. CoarseGrainedScheduler Endpoint
Note
start is part of the SchedulerBackend Contract.
Note
The RPC Environment is passed on as an constructor parameter.

Stopping CoarseGrainedSchedulerBackend — stop Method

Note
stop is part of the SchedulerBackend Contract.
Note
When called with no driverEndpoint both stop() and stopExecutors() do nothing. driverEndpoint is initialized in start and the initialization order matters.

It prints INFO to the logs:

INFO Shutting down all executors

It then sends StopExecutors message to driverEndpoint. It disregards the response.

It sends StopDriver message to driverEndpoint. It disregards the response.

Compute Default Level of Parallelism — defaultParallelism Method

The default parallelism is controlled by spark.default.parallelism or is at least 2 or totalCoreCount.

Note
defaultParallelism is part of the SchedulerBackend Contract.

Reviving Offers — reviveOffers Method

Note
reviveOffers is part of the SchedulerBackend Contract.

reviveOffers simply sends a ReviveOffers message to driverEndpoint (so it is processed asynchronously, i.e. on a separate thread, later on).

CoarseGrainedExecutorBackend reviveOffers.png
Figure 2. Reviving Offers by CoarseGrainedExecutorBackend

Killing Task — killTask Method

killTask simply sends a KillTask message to driverEndpoint.

Caution
FIXME Image
Note
killTask is part of the SchedulerBackend Contract.

Delaying Task Launching — isReady Method

isReady is a custom implementation of isReady from the SchedulerBackend Contract that allows to delay task launching until sufficient resources are registered or spark.scheduler.maxRegisteredResourcesWaitingTime passes.

Note
isReady is used exclusively by TaskSchedulerImpl.waitBackendReady.

It starts checking whether there are sufficient resources available (using sufficientResourcesRegistered method).

Note
By default sufficientResourcesRegistered always responds that sufficient resources are available.

If sufficient resources are available, you should see the following INFO message in the logs:

INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]

The method finishes returning true.

Note
minRegisteredRatio in the logs above is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.

In case there are no sufficient resources available yet (the above requirement does not hold), it checks whether the time from the startup (as createTime) passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to submit tasks (despite minRegisteredRatio not being reached yet).

You should see the following INFO message in the logs:

INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

The method finishes returning true.

Otherwise, when no sufficient resources are available and maxRegisteredWaitingTimeMs has not been passed, it finishes returning false.

sufficientResourcesRegistered Method

sufficientResourcesRegistered always responds that sufficient resources are available.

Stop All Executors — stopExecutors Method

stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).

Note
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.

You should see the following INFO message in the logs:

INFO CoarseGrainedSchedulerBackend: Shutting down all executors

Reset State — reset Method

reset resets the internal state:

  1. Sets numPendingExecutors to 0

  2. Clears executorsPendingToRemove

  3. Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message:

    Stale executor after cluster manager re-registered.

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.

Remove Executor — removeExecutor Method

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.

CoarseGrainedScheduler RPC Endpoint — driverEndpoint

When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.

Internally, it is a DriverEndpoint object available as the driverEndpoint internal field.

Note
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler’s driver endpoint internally.

It tracks:

  • Executor addresses (host and port) for executors (addressToExecutorId) - it is set when an executor connects to register itself. See RegisterExecutor RPC message.

  • Total number of core count (totalCoreCount) - the sum of all cores on all executors. See RegisterExecutor RPC message.

  • The number of executors available (totalRegisteredExecutors). See RegisterExecutor RPC message.

  • ExecutorData for each registered executor (executorDataMap). See RegisterExecutor RPC message.

It uses driver-revive-thread daemon single-thread thread pool for …​FIXME

Caution
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.
  • spark.scheduler.revive.interval (default: 1s) - time between reviving offers.

RPC Messages

KillTask(taskId, executorId, interruptThread)

RemoveExecutor

RetrieveSparkProps

ReviveOffers

ReviveOffers simply passes the call on to makeOffers.

Caution
FIXME When is an executor alive? What other states can an executor be in?

StatusUpdate

StatusUpdate(
  executorId: String,
  taskId: Long,
  state: TaskState,
  data: SerializableBuffer)
extends CoarseGrainedClusterMessage
Caution
FIXME

StopDriver

StopDriver message stops the RPC endpoint.

StopExecutors

StopExecutors message is receive-reply and blocking. When received, the following INFO message appears in the logs:

INFO Asking each executor to shut down

It then sends a StopExecutor message to every registered executor (from executorDataMap).

RegisterExecutor

RegisterExecutor(
  executorId: String,
  executorRef: RpcEndpointRef,
  hostname: String,
  cores: Int,
  logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
Note
RegisterExecutor is sent when CoarseGrainedExecutorBackend (RPC Endpoint) starts.
CoarseGrainedSchedulerBackend RegisterExecutor event.png
Figure 3. Executor registration (RegisterExecutor RPC message flow)

Only one executor can register under executorId.

INFO Registered executor [executorRef] ([executorAddress]) with ID [executorId]

It does internal bookkeeping like updating addressToExecutorId, totalCoreCount, and totalRegisteredExecutors, executorDataMap.

When numPendingExecutors is more than 0, the following is printed out to the logs:

DEBUG Decremented number of pending executors ([numPendingExecutors] left)

It replies with RegisteredExecutor(executorAddress.host) (consult RPC Messages of CoarseGrainedExecutorBackend).

It then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.

Ultimately, makeOffers is called.

DriverEndpoint

DriverEndpoint is a ThreadSafeRpcEndpoint.

onDisconnected Callback

When called, onDisconnected removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).

While removing, it calls removeExecutor with the reason being SlaveLost and message:

Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Note
onDisconnected is called when a remote host is lost.

Making Resource Offers — makeOffers Method

makeOffers(): Unit

makeOffers is a private method that takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).

Caution
Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Launching Tasks — launchTasks Method

launchTasks(tasks: Seq[Seq[TaskDescription]])

launchTasks is a private helper method that iterates over TaskDescription objects in the tasks input collection and …​FIXME

Note
launchTasks gets called when CoarseGrainedSchedulerBackend is making resource offers.

Internally, it serializes a TaskDescription (using the global closure Serializer) to a serialized task and checks the size of the serialized format of the task so it is less than maxRpcMessageSize.

Caution
FIXME Describe maxRpcMessageSize.

If the serialized task’s size is over the maximum RPC message size, the task’s TaskSetManager is aborted.

Caution
FIXME At that point, tasks have their executor assigned. When and how did that happen?

If the serialized task’s size is correct, the task’s executor is looked up in the internal executorDataMap registry to record that the task is about to be launched and the number of free cores of the executor is decremented by the CPUS_PER_TASK constant (i.e. spark.task.cpus).

Note
ExecutorData keeps track of the number of free cores of the executor (as freeCores) as well as the RpcEndpointRef of the executor to send tasks to launch to (as executorEndpoint).

You should see the following INFO in the logs:

INFO DriverEndpoint: Launching task [taskId] on executor id: [executorId] hostname: [executorHost].

Ultimately, launchTasks sends a LaunchTask message to the executor’s RPC endpoint with the serialized task (wrapped in SerializableBuffer).

Note
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is constrained by the number of cores available only. When submitting Spark application for execution both — memory and cores — can be specified explicitly.

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.rpc.message.maxSize

128

Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map output size (serialized) information sent between executors and the driver.

Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size.

spark.default.parallelism

Maximum of totalCoreCount and 2

Default parallelism for the scheduler backend.

spark.scheduler.minRegisteredResourcesRatio

0

Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks.

See isReady in this document.

spark.scheduler.maxRegisteredResourcesWaitingTime

30s

Time to wait for sufficient resources available.

See isReady in this document.

results matching ""

    No results matching ""