Dependencies

Dependency (represented by Dependency class) is a connection between RDDs after applying a transformation.

You can use RDD.dependencies method to know the collection of dependencies of a RDD (Seq[Dependency[_]]).

scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:18

scala> val r2 = sc.parallelize(0 to 9)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:18

scala> val r3 = sc.parallelize(0 to 9)
r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:18

scala> val r4 = sc.union(r1, r2, r3)
r4: org.apache.spark.rdd.RDD[Int] = UnionRDD[23] at union at <console>:24

scala> r4.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@6f2ab3f6, org.apache.spark.RangeDependency@7aa0e351, org.apache.spark.RangeDependency@26468)

scala> r4.toDebugString
res1: String =
(24) UnionRDD[23] at union at <console>:24 []
 |   ParallelCollectionRDD[20] at parallelize at <console>:18 []
 |   ParallelCollectionRDD[21] at parallelize at <console>:18 []
 |   ParallelCollectionRDD[22] at parallelize at <console>:18 []

scala> r4.collect
...
res2: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

Kinds of Dependencies

Dependency is the base abstract class with a single def rdd: RDD[T] method.

scala> val r = sc.parallelize(0 to 9).groupBy(identity)
r: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[12] at groupBy at <console>:18

scala> r.dependencies.map(_.rdd).foreach(println)
MapPartitionsRDD[11] at groupBy at <console>:18

There are the following more specialized Dependency extensions:

ShuffleDependency

A ShuffleDependency represents a dependency on the output of a shuffle map stage.

scala> val r = sc.parallelize(0 to 9).groupBy(identity)
r: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[12] at groupBy at <console>:18

scala> r.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@493b0b09)

A ShuffleDependency belongs to a single pair RDD (available as rdd of type RDD[Product2[K, V]]).

A ShuffleDependency has a shuffleId (FIXME from SparkContext.newShuffleId).

It uses partitioner to partition the shuffle output. It also uses ShuffleManager to register itself (using ShuffleManager.registerShuffle) and ContextCleaner to register itself for cleanup (using ContextCleaner.registerShuffleForCleanup).

Every ShuffleDependency is registered to MapOutputTrackerMaster using the shuffle id and the number of the partitions of a RDD.

The places where ShuffleDependency is used:

  • CoGroupedRDD and SubtractedRDD when partitioner differs among RDDs

  • ShuffledRDD and ShuffledRowRDD that are RDDs from a shuffle

The RDD operations that may or may not use the above RDDs and hence shuffling:

  • coalesce

    • repartition

  • cogroup

    • intersection

  • subtractByKey

    • subtract

  • sortByKey

    • sortBy

  • repartitionAndSortWithinPartitions

  • combineByKeyWithClassTag

    • combineByKey

    • aggregateByKey

    • foldByKey

    • reduceByKey

    • countApproxDistinctByKey

    • groupByKey

  • partitionBy

Note
There may be other dependent methods that use the above.

NarrowDependency

NarrowDependency is an abstract extension of Dependency with narrow (limited) number of partitions of the parent RDD that are required to compute a partition of the child RDD. Narrow dependencies allow for pipelined execution.

NarrowDependency extends the base with the additional method:

def getParents(partitionId: Int): Seq[Int]

to get the parent partitions for a partition partitionId of the child RDD.

OneToOneDependency

OneToOneDependency is a narrow dependency that represents a one-to-one dependency between partitions of the parent and child RDDs.

scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:18

scala> val r3 = r1.map((_, 1))
r3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at <console>:20

scala> r3.dependencies
res32: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@7353a0fb)

scala> r3.toDebugString
res33: String =
(8) MapPartitionsRDD[19] at map at <console>:20 []
 |  ParallelCollectionRDD[13] at parallelize at <console>:18 []

PruneDependency

PruneDependency is a narrow dependency that represents a dependency between the PartitionPruningRDD and its parent.

RangeDependency

RangeDependency is a narrow dependency that represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.

It is used in UnionRDD for SparkContext.union, RDD.union transformation to list only a few.

scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:18

scala> val r2 = sc.parallelize(10 to 19)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:18

scala> val unioned = sc.union(r1, r2)
unioned: org.apache.spark.rdd.RDD[Int] = UnionRDD[16] at union at <console>:22

scala> unioned.dependencies
res19: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@28408ad7, org.apache.spark.RangeDependency@6e1d2e9f)

scala> unioned.toDebugString
res18: String =
(16) UnionRDD[16] at union at <console>:22 []
 |   ParallelCollectionRDD[13] at parallelize at <console>:18 []
 |   ParallelCollectionRDD[14] at parallelize at <console>:18 []

results matching ""

    No results matching ""