CoarseGrainedExecutorBackend

CoarseGrainedExecutorBackend is an ExecutorBackend to manage a single coarse-grained executor (that lives as long as the owning executor backend).

CoarseGrainedExecutorBackend registers itself as a ThreadSafeRpcEndpoint under the name Executor to communicate with the driver.

Note
The internal executor reference is created after a connection to the driver is established.
CoarseGrainedExecutorBackend.png
Figure 1. CoarseGrainedExecutorBackend and Others

When launched, CoarseGrainedExecutorBackend immediately connects to the owning CoarseGrainedSchedulerBackend to inform that it can run tasks. It is launched as a command-line application by:

  1. Spark Standalone’s StandaloneSchedulerBackend

  2. Spark on YARN’s ExecutorRunnable

  3. Spark on Mesos’s MesosCoarseGrainedSchedulerBackend.

When it cannot connect to driverUrl, it terminates (with the exit code 1).

Caution
What are SPARK_LOG_URL_ env vars? Who sets them?

When the driver terminates, CoarseGrainedExecutorBackend exits (with exit code 1).

ERROR Driver [remoteAddress] disassociated! Shutting down.

All task status updates are sent along to driverRef as StatusUpdate messages.

Tip

Enable INFO logging level for org.apache.spark.executor.CoarseGrainedExecutorBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.CoarseGrainedExecutorBackend=INFO

Extracting Log URLs — extractLogUrls Method

Caution
FIXME

Creating CoarseGrainedExecutorBackend Instance

CoarseGrainedExecutorBackend(
  override val rpcEnv: RpcEnv,
  driverUrl: String,
  executorId: String,
  hostname: String,
  cores: Int,
  userClassPath: Seq[URL],
  env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend

While being created, CoarseGrainedExecutorBackend initializes the internal properties (e.g. executor and driver) and creates a SerializerInstance (using SparkEnv.closureSerializer).

Note
CoarseGrainedExecutorBackend is created when…​FIXME

Starting RpcEndpoint — onStart Method

Note
onStart is a RpcEndpoint callback method that is executed before a RPC endpoint starts to handle messages.

When executed, you should see the following INFO message in the logs:

INFO CoarseGrainedExecutorBackend: Connecting to driver: [driverUrl]

It then retrieves the RpcEndpointRef of the driver asynchronously (using the constructor’s driverUrl) and initializes the internal driver property that it will send a blocking RegisterExecutor message to.

If there is an issue while registering the executor, you should see the following ERROR message in the logs and process exits (with the exit code 1).

ERROR Cannot register with driver: [driverUrl]
Note
The RegisterExecutor message contains executorId, the RpcEndpointRef to itself, cores, and log URLs of the CoarseGrainedExecutorBackend.

driver RpcEndpointRef

driver is an optional RpcEndpointRef for the driver.

Tip
See Starting RpcEndpoint — onStart Method for how driver is initialized.

Driver’s URL

The driver’s URL is of the format spark://[RpcEndpoint name]@[hostname]:[port], e.g. spark://[email protected]:64859.

Launching CoarseGrainedExecutorBackend As Standalone Application — main Method

CoarseGrainedExecutorBackend is a command-line application (it comes with main method).

It accepts the following options:

  • --executor-id (required) - the executor’s id

  • --hostname (required) - the name of the host

  • --cores (required) - the number of cores (must be more than 0)

  • --app-id (required) - the id of the application

  • --worker-url - the worker’s URL, e.g. spark://[email protected]:64557

  • --user-class-path - a URL/path to a resource to be added to CLASSPATH; can be specified multiple times.

Unrecognized options or required options missing cause displaying usage help and exit.

$ ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend

Usage: CoarseGrainedExecutorBackend [options]

 Options are:
   --driver-url <driverUrl>
   --executor-id <executorId>
   --hostname <hostname>
   --cores <cores>
   --app-id <appid>
   --worker-url <workerUrl>
   --user-class-path <url>

It first fetches Spark properties from CoarseGrainedSchedulerBackend (using the driverPropsFetcher RPC Environment and the endpoint reference given in driver’s URL).

For this, it creates SparkConf, reads spark.executor.port setting (defaults to 0) and creates the driverPropsFetcher RPC Environment in client mode. The RPC environment is used to resolve the driver’s endpoint to post RetrieveSparkProps message.

It sends a (blocking) RetrieveSparkProps message to the driver (using the value for driverUrl command-line option). When the response (the driver’s SparkConf) arrives it adds spark.app.id (using the value for appid command-line option) and creates a brand new SparkConf.

If spark.yarn.credentials.file is set, …​FIXME

A SparkEnv is created using SparkEnv.createExecutorEnv (with isLocal being false).

Caution
FIXME

Setting Up Executor RPC Endpoint (and WorkerWatcher Perhaps) — run Internal Method

run(driverUrl: String,
  executorId: String,
  hostname: String,
  cores: Int,
  appId: String,
  workerUrl: Option[String],
  userClassPath: scala.Seq[URL]): Unit

run requests the driver for the Spark properties and sets up the Executor RPC endpoint (with CoarseGrainedExecutorBackend as the RPC endpoint) and optionally the WorkerWatcher RPC endpoint. It keeps running (yet the main thread is blocked and only the RPC endpoints process RPC messages) until the RpcEnv terminates.

When executed, you should see the following INFO message in the logs:

INFO Started daemon with process name: [processName]

run then runs in a secured environment as a Spark user.

run first creates a brand new SparkConf to get spark.executor.port from. It then creates a RpcEnv called driverPropsFetcher.

Note
The host name and port for the driverPropsFetcher RpcEnv are given as the input argument hostname and got from SparkConf, respectively.
Caution
FIXME What’s clientMode in RpcEnv.create?

run uses the driverPropsFetcher RpcEnv to request driverUrl endpoint for the Spark properties to use only. The Spark properties are extended with spark.app.id Spark property with the value of appId.

run uses the Spark properties to create a SparkEnv for the executor (with isLocal disabled).

Note
executorId, hostname, and cores to create the SparkEnv are the input arguments of run.
Caution
FIXME Describe spark.yarn.credentials.file.

After the SparkEnv has been created, run sets up the endpoint under the name Executor with CoarseGrainedExecutorBackend as the RPC endpoint.

If the optional workerUrl is specified, run sets up another endpoint under the name WorkerWatcher and WorkerWatcher RPC endpoint.

Caution
FIXME When is workerUrl specified?

run's thread is blocked until RpcEnv terminates (and so the other threads of the RPC endpoints could run).

Once RpcEnv has terminated, run stops the thread for credential updates.

Caution
FIXME Think of the place for Utils.initDaemon, Utils.getProcessName et al.

start Method

stop Method

requestTotalExecutors

executor Internal Property

executor is the internal reference to a coarse-grained executor…​FIXME

Caution
FIXME

RPC Messages

RegisteredExecutor

RegisteredExecutor
extends CoarseGrainedClusterMessage with RegisterExecutorResponse

When a RegisteredExecutor message arrives, you should see the following INFO in the logs:

INFO CoarseGrainedExecutorBackend: Successfully registered with driver

The internal executor is created (passing in the constructor’s parameters) with isLocal disabled.

RegisterExecutorFailed

RegisterExecutorFailed(message)

When a RegisterExecutorFailed message arrives, the following ERROR is printed out to the logs:

ERROR CoarseGrainedExecutorBackend: Slave registration failed: [message]

CoarseGrainedExecutorBackend then exits with the exit code 1.

LaunchTask

LaunchTask(data: SerializableBuffer)

The LaunchTask handler deserializes TaskDescription from data (using the global closure Serializer).

Note
LaunchTask message is sent by CoarseGrainedSchedulerBackend.launchTasks.
INFO CoarseGrainedExecutorBackend: Got assigned task [taskId]

It then launches the task on the executor (using Executor.launchTask method).

If however the internal executor field has not been created yet, it prints out the following ERROR to the logs:

ERROR CoarseGrainedExecutorBackend: Received LaunchTask command but executor was null

And it then exits.

KillTask

KillTask(taskId, _, interruptThread) message kills a task (calls Executor.killTask).

If an executor has not been initialized yet (FIXME: why?), the following ERROR message is printed out to the logs and CoarseGrainedExecutorBackend exits:

ERROR Received KillTask command but executor was null

StopExecutor

StopExecutor message handler is receive-reply and blocking. When received, the handler prints the following INFO message to the logs:

INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown

It then sends a Shutdown message to itself.

Shutdown

Shutdown stops the executor, itself and RPC Environment.

results matching ""

    No results matching ""