Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

And later in the document:

Explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

sparkcontext broadcast executors.png
Figure 1. Broadcasting a value to executors

To use a broadcast value in a transformation you have to create it first using SparkContext.broadcast() and then use value method to access the shared value. Learn it in Introductory Example section.

The Broadcast feature in Spark uses SparkContext to create broadcast values and BroadcastManager and ContextCleaner to manage their lifecycle.

sparkcontext broadcastmanager contextcleaner.png
Figure 2. SparkContext to broadcast using BroadcastManager and ContextCleaner

Broadcast Variable Contract

The contract of broadcast variables in Spark is described by the abstract Broadcast class.

Table 1. Broadcast API
Method Name Description

toString

The string representation

id

The unique identifier

value

The value

unpersist

Asynchronously deletes cached copies of this broadcast on the executors.

destroy

Destroys all data and metadata related to this broadcast variable.

Lifecycle of Broadcast Variable

You can create a broadcast variable of type T using SparkContext.broadcast method.

scala> val b = sc.broadcast(1)
b: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(0)
Tip

Enable DEBUG logging level for org.apache.spark.storage.BlockManager logger to debug broadcast method.

Read BlockManager to find out how to enable the logging level.

With DEBUG logging level enabled, you should see the following messages in the logs:

DEBUG BlockManager: Put block broadcast_0 locally took  430 ms
DEBUG BlockManager: Putting block broadcast_0 without replication took  431 ms
DEBUG BlockManager: Told master about block broadcast_0_piece0
DEBUG BlockManager: Put block broadcast_0_piece0 locally took  4 ms
DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took  4 ms

After creating an instance of a broadcast variable, you can then reference the value using value method.

scala> b.value
res0: Int = 1
Note
value method is the only way to access the value of a broadcast variable.

With DEBUG logging level enabled, you should see the following messages in the logs:

DEBUG BlockManager: Getting local block broadcast_0
DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)

When you are done with a broadcast variable, you should destroy it to release memory.

scala> b.destroy

With DEBUG logging level enabled, you should see the following messages in the logs:

DEBUG BlockManager: Removing broadcast 0
DEBUG BlockManager: Removing block broadcast_0_piece0
DEBUG BlockManager: Told master about block broadcast_0_piece0
DEBUG BlockManager: Removing block broadcast_0

Before destroying a broadcast variable, you may want to unpersist it.

scala> b.unpersist

Getting the Value of Broadcast Variable (value method)

value: T

value returns the value of the broadcast variable. You can only access the value until it is destroyed after which you will see the following SparkException exception in the logs:

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
  at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69)
  ... 48 elided

Unpersist Broadcast Variable

unpersist(): Unit
unpersist(blocking: Boolean): Unit

Removing Broadcast Variable From Memory (destroy method)

destroy(): Unit

destroy destroys all data and metadata of a broadcast variable.

Note
Once a broadcast variable has been destroyed, it cannot be used again.

You can only destroy a broadcast variable once or you will see the following SparkException exception in the logs:

scala> b.destroy
org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
  at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
  at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107)
  at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
  ... 48 elided

Introductory Example

Let’s start with an introductory example to check out how to use broadcast variables and build your initial understanding.

You’re going to use a static mapping of interesting projects with their websites, i.e. Map[String, String] that the tasks, i.e. closures (anonymous functions) in transformations, use.

scala> val pws = Map("Apache Spark" -> "http://spark.apache.org/", "Scala" -> "http://www.scala-lang.org/")
pws: scala.collection.immutable.Map[String,String] = Map(Apache Spark -> http://spark.apache.org/, Scala -> http://www.scala-lang.org/)

scala> val websites = sc.parallelize(Seq("Apache Spark", "Scala")).map(pws).collect
...
websites: Array[String] = Array(http://spark.apache.org/, http://www.scala-lang.org/)

It works, but is very ineffective as the pws map is sent over the wire to executors while it could have been there already. If there were more tasks that need the pws map, you could improve their performance by minimizing the number of bytes that are going to be sent over the network for task execution.

Enter broadcast variables.

val pwsB = sc.broadcast(pws)
val websites = sc.parallelize(Seq("Apache Spark", "Scala")).map(pwsB.value).collect
// websites: Array[String] = Array(http://spark.apache.org/, http://www.scala-lang.org/)

Semantically, the two computations - with and without the broadcast value - are exactly the same, but the broadcast-based one wins performance-wise when there are more executors spawned to execute many tasks that use pws map.

Introduction

Broadcast is part of Spark that is responsible for broadcasting information across nodes in a cluster.

You use broadcast variable to implement map-side join, i.e. a join using a map. For this, lookup tables are distributed across nodes in a cluster using broadcast and then looked up inside map (to do the join implicitly).

When you broadcast a value, it is copied to executors only once (while it is copied multiple times for tasks otherwise). It means that broadcast can help to get your Spark application faster if you have a large value to use in tasks or there are more tasks than executors.

It appears that a Spark idiom emerges that uses broadcast with collectAsMap to create a Map for broadcast. When an RDD is map over to a smaller dataset (column-wise not record-wise), collectAsMap, and broadcast, using the very big RDD to map its elements to the broadcast RDDs is computationally faster.

val acMap = sc.broadcast(myRDD.map { case (a,b,c,b) => (a, c) }.collectAsMap)
val otherMap = sc.broadcast(myOtherRDD.collectAsMap)

myBigRDD.map { case (a, b, c, d) =>
  (acMap.value.get(a).get, otherMap.value.get(c).get)
}.collect

Use large broadcasted HashMaps over RDDs whenever possible and leave RDDs with a key to lookup necessary data as demonstrated above.

Spark comes with a BitTorrent implementation.

It is not enabled by default.

SparkContext.broadcast

Read about SparkContext.broadcast method in Creating broadcast variables.

TorrentBroadcast

The default implementation of Broadcast Contract is org.apache.spark.broadcast.TorrentBroadcast (see spark.broadcast.factory). It uses a BitTorrent-like protocol to do the distribution.

sparkcontext broadcast bittorrent.png
Figure 3. TorrentBroadcast - broadcasting using BitTorrent

TorrentBroadcastFactory is the factory of TorrentBroadcast-based broadcast values.

When a new broadcast value is created using SparkContext.broadcast() method, a new instance of TorrentBroadcast is created. It is divided into blocks that are put in Block Manager.

sparkcontext broadcast bittorrent newBroadcast.png
Figure 4. TorrentBroadcast puts broadcast chunks to driver’s BlockManager

Further Reading

results matching ""

    No results matching ""