ExternalShuffleService

ExternalShuffleService is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.

You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.

Note
There is a custom external shuffle service for Spark on YARN — YarnShuffleService.
Tip

Enable INFO logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.ExternalShuffleService=INFO

Refer to Logging.

start-shuffle-service.sh shell script

start-shuffle-service.sh

start-shuffle-service.sh shell script allows you to launch ExternalShuffleService. The script is under sbin directory.

When executed, it runs sbin/spark-config.sh and bin/load-spark-env.sh shell scripts. It then executes sbin/spark-daemon.sh with start command and the parameters: org.apache.spark.deploy.ExternalShuffleService and 1.

$ ./sbin/start-shuffle-service.sh
starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out

$ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out
Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: [email protected]
16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false
Tip

You can also use spark-class to launch ExternalShuffleService.

spark-class org.apache.spark.deploy.ExternalShuffleService

Launching ExternalShuffleService (main method)

When started, it executes Utils.initDaemon(log).

Caution
FIXME Utils.initDaemon(log)? See spark-submit.

It loads default Spark properties and creates a SecurityManager.

A ExternalShuffleService is created and started.

A shutdown hook is registered so when ExternalShuffleService is shut down, it prints the following INFO message to the logs and the stop method is executed.

INFO ExternalShuffleService: Shutting down shuffle service.
Tip

Enable DEBUG logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockResolver=DEBUG

Refer to Logging.

You should see the following INFO message in the logs:

INFO ExternalShuffleBlockResolver: Registered executor [AppExecId] with [executorInfo]

You should also see the following messages when a SparkContext is closed:

INFO ExternalShuffleBlockResolver: Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs]
INFO ExternalShuffleBlockResolver: Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs
DEBUG ExternalShuffleBlockResolver: Successfully cleaned up directory: [localDir]

Creating ExternalShuffleService Instance

ExternalShuffleService requires a SparkConf and SecurityManager.

When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337) configuration settings. It also checks whether authentication is enabled.

Caution
FIXME Review securityManager.isAuthenticationEnabled()

It then creates a TransportConf (as transportConf).

It creates a ExternalShuffleBlockHandler (as blockHandler) and TransportContext (as transportContext).

Caution
FIXME TransportContext?

No internal TransportServer (as server) is created.

Starting ExternalShuffleService (start method)

start(): Unit

start starts a ExternalShuffleService.

When start is executed, you should see the following INFO message in the logs:

INFO ExternalShuffleService: Starting shuffle service on port [port] with useSasl = [useSasl]

If useSasl is enabled, a SaslServerBootstrap is created.

Caution
FIXME SaslServerBootstrap?

The internal server reference (a TransportServer) is created (which will attempt to bind to port).

Stopping ExternalShuffleService (stop method)

stop(): Unit

stop closes the internal server reference and clears it (i.e. sets it to null).

ExternalShuffleBlockHandler

ExternalShuffleBlockHandler is a RpcHandler (i.e. a handler for sendRPC() messages sent by TransportClients).

When created, ExternalShuffleBlockHandler requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile to create a ExternalShuffleBlockResolver.

Tip

Enable TRACE logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockHandler=TRACE

Refer to Logging.

handleMessage method

handleMessage(
  BlockTransferMessage msgObj,
  TransportClient client,
  RpcResponseCallback callback)

handleMessage handles two types of BlockTransferMessage messages:

For any other BlockTransferMessage message it throws a UnsupportedOperationException:

Unexpected message: [msgObj]

OpenBlocks

OpenBlocks(String appId, String execId, String[] blockIds)

When OpenBlocks is received, handleMessage authorizes the client.

Caution
FIXME checkAuth?

It then gets block data for each block id in blockIds (using ExternalShuffleBlockResolver).

Finally, it registers a stream and does callback.onSuccess with a serialized byte buffer (for the streamId and the number of blocks in msg).

Caution
FIXME callback.onSuccess?

You should see the following TRACE message in the logs:

TRACE Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress]

RegisterExecutor

RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo)

RegisterExecutor

ExternalShuffleBlockResolver

Caution
FIXME

getBlockData method

ManagedBuffer getBlockData(String appId, String execId, String blockId)

getBlockData parses blockId (in the format of shuffle_[shuffleId]_[mapId]_[reduceId]) and returns the FileSegmentManagedBuffer that corresponds to shuffle_[shuffleId]_[mapId]_0.data.

getBlockData splits blockId to 4 parts using _ (underscore). It works exclusively with shuffle block ids with the other three parts being shuffleId, mapId, and reduceId.

It looks up an executor (i.e. a ExecutorShuffleInfo in executors private registry) for appId and execId to search for a ManagedBuffer.

The ManagedBuffer is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index (that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data (that is returned as FileSegmentManagedBuffer).

It throws a IllegalArgumentException for block ids with less than four parts:

Unexpected block id format: [blockId]

or for non-shuffle block ids:

Expected shuffle block id, got: [blockId]

It throws a RuntimeException when no ExecutorShuffleInfo could be found.

Executor is not registered (appId=[appId], execId=[execId])"

OneForOneStreamManager

Caution
FIXME

registerStream method

long registerStream(String appId, Iterator<ManagedBuffer> buffers)
Caution
FIXME

Settings

spark.shuffle.service.enabled

spark.shuffle.service.enabled flag (default: false) controls whether the External Shuffle Service is used or not. When enabled (true), the driver registers with the shuffle service.

spark.shuffle.service.enabled has to be enabled for dynamic allocation of executors.

It is used in CoarseMesosSchedulerBackend to instantiate MesosExternalShuffleClient.

It is explicitly disabled for LocalSparkCluster (and any attempts to set it will fall short).

spark.shuffle.service.port

spark.shuffle.service.port (default: 7337)

results matching ""

    No results matching ""