scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:18
scala> val r2 = sc.parallelize(0 to 9)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:18
scala> val r3 = sc.parallelize(0 to 9)
r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:18
scala> val r4 = sc.union(r1, r2, r3)
r4: org.apache.spark.rdd.RDD[Int] = UnionRDD[23] at union at <console>:24
scala> r4.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@6f2ab3f6, org.apache.spark.RangeDependency@7aa0e351, org.apache.spark.RangeDependency@26468)
scala> r4.toDebugString
res1: String =
(24) UnionRDD[23] at union at <console>:24 []
| ParallelCollectionRDD[20] at parallelize at <console>:18 []
| ParallelCollectionRDD[21] at parallelize at <console>:18 []
| ParallelCollectionRDD[22] at parallelize at <console>:18 []
scala> r4.collect
...
res2: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
Dependencies
Dependency (represented by Dependency class) is a connection between RDDs after applying a transformation.
You can use RDD.dependencies
method to know the collection of dependencies of a RDD (Seq[Dependency[_]]
).
Kinds of Dependencies
Dependency
is the base abstract class with a single def rdd: RDD[T]
method.
scala> val r = sc.parallelize(0 to 9).groupBy(identity)
r: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[12] at groupBy at <console>:18
scala> r.dependencies.map(_.rdd).foreach(println)
MapPartitionsRDD[11] at groupBy at <console>:18
There are the following more specialized Dependency
extensions:
ShuffleDependency
A ShuffleDependency represents a dependency on the output of a shuffle map stage.
scala> val r = sc.parallelize(0 to 9).groupBy(identity)
r: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[12] at groupBy at <console>:18
scala> r.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@493b0b09)
A ShuffleDependency belongs to a single pair RDD (available as rdd
of type RDD[Product2[K, V]]
).
A ShuffleDependency has a shuffleId (FIXME from SparkContext.newShuffleId
).
It uses partitioner to partition the shuffle output. It also uses ShuffleManager to register itself (using ShuffleManager.registerShuffle) and ContextCleaner to register itself for cleanup (using ContextCleaner.registerShuffleForCleanup
).
Every ShuffleDependency
is registered to MapOutputTrackerMaster
using the shuffle id and the number of the partitions of a RDD.
The places where ShuffleDependency is used:
-
CoGroupedRDD
andSubtractedRDD
when partitioner differs among RDDs -
ShuffledRDD and
ShuffledRowRDD
that are RDDs from a shuffle
The RDD operations that may or may not use the above RDDs and hence shuffling:
-
-
repartition
-
-
cogroup
-
intersection
-
-
subtractByKey
-
subtract
-
-
sortByKey
-
sortBy
-
-
repartitionAndSortWithinPartitions
-
-
combineByKey
-
aggregateByKey
-
foldByKey
-
reduceByKey
-
countApproxDistinctByKey
-
groupByKey
-
-
partitionBy
Note
|
There may be other dependent methods that use the above. |
NarrowDependency
NarrowDependency
is an abstract extension of Dependency
with narrow (limited) number of partitions of the parent RDD that are required to compute a partition of the child RDD. Narrow dependencies allow for pipelined execution.
NarrowDependency
extends the base with the additional method:
def getParents(partitionId: Int): Seq[Int]
to get the parent partitions for a partition partitionId
of the child RDD.
OneToOneDependency
OneToOneDependency
is a narrow dependency that represents a one-to-one dependency between partitions of the parent and child RDDs.
scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:18
scala> val r3 = r1.map((_, 1))
r3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at <console>:20
scala> r3.dependencies
res32: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@7353a0fb)
scala> r3.toDebugString
res33: String =
(8) MapPartitionsRDD[19] at map at <console>:20 []
| ParallelCollectionRDD[13] at parallelize at <console>:18 []
PruneDependency
PruneDependency
is a narrow dependency that represents a dependency between the PartitionPruningRDD
and its parent.
RangeDependency
RangeDependency
is a narrow dependency that represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
It is used in UnionRDD
for SparkContext.union
, RDD.union
transformation to list only a few.
scala> val r1 = sc.parallelize(0 to 9)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:18
scala> val r2 = sc.parallelize(10 to 19)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:18
scala> val unioned = sc.union(r1, r2)
unioned: org.apache.spark.rdd.RDD[Int] = UnionRDD[16] at union at <console>:22
scala> unioned.dependencies
res19: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@28408ad7, org.apache.spark.RangeDependency@6e1d2e9f)
scala> unioned.toDebugString
res18: String =
(16) UnionRDD[16] at union at <console>:22 []
| ParallelCollectionRDD[13] at parallelize at <console>:18 []
| ParallelCollectionRDD[14] at parallelize at <console>:18 []