YarnSchedulerBackend — Coarse-Grained Scheduler Backend for YARN

YarnSchedulerBackend is an abstract CoarseGrainedSchedulerBackend for YARN that contains common logic for the client and cluster YARN scheduler backends, i.e. YarnClientSchedulerBackend and YarnClusterSchedulerBackend respectively.

YarnSchedulerBackend is available in the RPC Environment as YarnScheduler RPC Endpoint (or yarnSchedulerEndpointRef internally).

YarnSchedulerBackend expects TaskSchedulerImpl and SparkContext to initialize itself.

It works for a single Spark application (as appId of type ApplicationId)

Caution
FIXME It may be a note for scheduler backends in general.

attemptId Internal Attribute

attemptId: Option[ApplicationAttemptId] = None

attemptId is the application attempt ID for this run of a Spark application. It is only available for cluster deploy mode.

It is explicitly set to None when YarnClientSchedulerBackend starts (and bindToYarn is called).

It is set to the current attempt id (using YARN API’s ApplicationMaster.getAttemptId) when YarnClusterSchedulerBackend starts (and bindToYarn is called).

Note
attemptId is exposed using applicationAttemptId which is a part of SchedulerBackend Contract.

applicationAttemptId

Note
applicationAttemptId is a part of SchedulerBackend Contract.
applicationAttemptId(): Option[String]

applicationAttemptId returns the application attempt id of a Spark application.

Resetting YarnSchedulerBackend

Note
reset is a part of CoarseGrainedSchedulerBackend Contract.

reset resets the parent CoarseGrainedSchedulerBackend scheduler backend and ExecutorAllocationManager (accessible by SparkContext.executorAllocationManager).

doRequestTotalExecutors

def doRequestTotalExecutors(requestedTotal: Int): Boolean
Note
doRequestTotalExecutors is a part of the CoarseGrainedSchedulerBackend Contract.
spark YarnSchedulerBackend doRequestTotalExecutors.png
Figure 1. Requesting Total Executors in YarnSchedulerBackend (doRequestTotalExecutors method)

doRequestTotalExecutors simply sends a blocking RequestExecutors message to YarnScheduler RPC Endpoint with the input requestedTotal and the internal localityAwareTasks and hostToLocalTaskCount attributes.

Caution
FIXME The internal attributes are already set. When and how?

Reference to YarnScheduler RPC Endpoint (yarnSchedulerEndpointRef attribute)

yarnSchedulerEndpointRef is the reference to YarnScheduler RPC Endpoint.

totalExpectedExecutors

totalExpectedExecutors is a value that is 0 initially when a YarnSchedulerBackend instance is created but later changes when Spark on YARN starts (in client mode or cluster mode).

Note
After Spark on YARN is started, totalExpectedExecutors is initialized to a proper value.
Caution
FIXME Where is this used?

Creating YarnSchedulerBackend Instance

When created, YarnSchedulerBackend sets the internal minRegisteredRatio which is 0.8 when spark.scheduler.minRegisteredResourcesRatio is not set or the parent’s minRegisteredRatio.

totalExpectedExecutors is set to 0.

It creates a YarnSchedulerEndpoint (as yarnSchedulerEndpoint) and registers it as YarnScheduler with the RPC Environment.

It sets the internal askTimeout Spark timeout for RPC ask operations using the SparkContext constructor parameter.

It sets optional appId (of type ApplicationId), attemptId (for cluster mode only and of type ApplicationAttemptId).

It also creates SchedulerExtensionServices object (as services).

Caution
FIXME What is SchedulerExtensionServices?

The internal shouldResetOnAmRegister flag is turned off.

sufficientResourcesRegistered

sufficientResourcesRegistered checks whether totalRegisteredExecutors is greater than or equals to totalExpectedExecutors multiplied by minRegisteredRatio.

Caution
FIXME Where’s this used?

minRegisteredRatio

minRegisteredRatio is set when YarnSchedulerBackend is created.

Starting the Backend (start method)

start creates a SchedulerExtensionServiceBinding object (using SparkContext, appId, and attemptId) and starts it (using SchedulerExtensionServices.start(binding)).

Note
A SchedulerExtensionServices object is created when YarnSchedulerBackend is initialized and available as services.

Ultimately, it calls the parent’s CoarseGrainedSchedulerBackend.start.

Note

start throws IllegalArgumentException when the internal appId has not been set yet.

java.lang.IllegalArgumentException: requirement failed: application ID unset

Stopping the Backend (stop method)

stop calls the parent’s CoarseGrainedSchedulerBackend.requestTotalExecutors (using (0, 0, Map.empty) parameters).

Caution
FIXME Explain what 0, 0, Map.empty means after the method’s described for the parent.

It calls the parent’s CoarseGrainedSchedulerBackend.stop.

Ultimately, it stops the internal SchedulerExtensionServiceBinding object (using services.stop()).

Caution
FIXME Link the description of services.stop() here.

Recording Application and Attempt Ids (bindToYarn method)

bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit

bindToYarn sets the internal appId and attemptId to the value of the input parameters, appId and attemptId, respectively.

Note
start requires appId.

Internal Registries

shouldResetOnAmRegister flag

When YarnSchedulerBackend is created, shouldResetOnAmRegister is disabled (i.e. false).

It allows resetting internal state after the initial ApplicationManager failed and a new one was registered.

Note
It can only happen in client deploy mode.

Settings

spark.scheduler.minRegisteredResourcesRatio

spark.scheduler.minRegisteredResourcesRatio (default: 0.8)

results matching ""

    No results matching ""