Shuffle Manager

Spark comes with a pluggable mechanism for shuffle systems.

Shuffle Manager (aka Shuffle Service) is a Spark service that tracks shuffle dependencies for ShuffleMapStage. The driver and executors all have their own Shuffle Service.

The setting spark.shuffle.manager sets up the default shuffle manager.

The driver registers shuffles with a shuffle manager, and executors (or tasks running locally in the driver) can ask to read and write data.

It is network-addressable, i.e. it is available on a host and port.

There can be many shuffle services running simultaneously and a driver registers with all of them when CoarseGrainedSchedulerBackend is used.

The service is available under SparkEnv.get.shuffleManager.

When ShuffledRDD is computed it reads partitions from it.

The name appears here, twice in the build’s output and others.

Review the code in network/shuffle module.

  • When is data eligible for shuffling?

  • Get the gist of "The shuffle files are not currently cleaned up when using Spark on Mesos with the external shuffle service"

ShuffleManager Contract

Note
org.apache.spark.shuffle.ShuffleManager is a private[spark] Scala trait.

Every ShuffleManager offers the following services:

  • Is identified by a short name (as shortName)

  • Registers shuffles so they are addressable by a ShuffleHandle (using registerShuffle)

  • Returns a ShuffleWriter for a partition (using getWriter)

  • Returns a ShuffleReader for a range of partitions (using getReader)

  • Removes shuffles (using unregisterShuffle)

  • Returns a ShuffleBlockResolver (using shuffleBlockResolver)

  • Can be stopped (using stop)

Available Implementations

Spark comes with two implementations of ShuffleManager contract:

Caution
FIXME Exercise for a custom implementation of Shuffle Manager using private[spark] ShuffleManager trait.

SortShuffleManager

SortShuffleManager is a shuffle manager with the short name being sort.

It uses IndexShuffleBlockResolver as the shuffleBlockResolver.

Settings

spark.shuffle.manager

spark.shuffle.manager (default: sort) sets the default shuffle manager by a short name or the fully-qualified class name of a custom implementation.

  1. hash or org.apache.spark.shuffle.hash.HashShuffleManager

  2. sort or org.apache.spark.shuffle.sort.SortShuffleManager

  3. tungsten-sort or org.apache.spark.shuffle.sort.SortShuffleManager

spark.shuffle.spill

spark.shuffle.spill (default: true) - no longer used, and when false the following WARNING shows in the logs:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

Further reading or watching

  1. (slides) Spark shuffle introduction by Raymond Liu (aka colorant).

results matching ""

    No results matching ""