cache(): this.type = persist()
RDD Caching and Persistence
Caching or persistence are optimisation techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages. These interim results as RDDs are thus kept in memory (default) or more solid storages like disk and/or replicated.
The difference between cache
and persist
operations is purely syntactic. cache
is a synonym of persist
or persist(MEMORY_ONLY)
, i.e. cache
is merely persist
with the default storage level MEMORY_ONLY
.
Note
|
Due to the very small and purely syntactic difference between caching and persistence of RDDs the two terms are often used interchangeably and I will follow the "pattern" here. |
RDDs can also be unpersisted to remove RDD from a permanent storage like memory and/or disk.
Caching RDD — cache
Method
cache
is a synonym of persist with StorageLevel.MEMORY_ONLY storage level.
Persisting RDD — persist
Method
persist(): this.type
persist(newLevel: StorageLevel): this.type
persist
marks a RDD for persistence using newLevel
storage level.
You can only change the storage level once or a UnsupportedOperationException
is thrown:
Cannot change storage level of an RDD after it was already assigned a level
Note
|
You can pretend to change the storage level of an RDD with already-assigned storage level only if the storage level is the same as it is currently assigned. |
If the RDD is marked as persistent the first time, the RDD is registered to ContextCleaner
(if available) and SparkContext
.
The internal storageLevel
attribute is set to the input newLevel
storage level.
Storage Levels
StorageLevel
describes how an RDD is persisted (and addresses the following concerns):
-
Does RDD use disk?
-
How much of RDD is in memory?
-
Does RDD use off-heap memory?
-
Should an RDD be serialized (while persisting)?
-
How many replicas (default:
1
) to use (can only be less than40
)?
There are the following StorageLevel
(number _2
in the name denotes 2 replicas):
-
NONE
(default) -
DISK_ONLY
-
DISK_ONLY_2
-
MEMORY_ONLY
(default forcache()
operation) -
MEMORY_ONLY_2
-
MEMORY_ONLY_SER
-
MEMORY_ONLY_SER_2
-
MEMORY_AND_DISK
-
MEMORY_AND_DISK_2
-
MEMORY_AND_DISK_SER
-
MEMORY_AND_DISK_SER_2
-
OFF_HEAP
You can check out the storage level using getStorageLevel()
operation.
val lines = sc.textFile("README.md")
scala> lines.getStorageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(disk=false, memory=false, offheap=false, deserialized=false, replication=1)
Unpersisting RDDs (Clearing Blocks) (unpersist method)
unpersist(blocking: Boolean = true): this.type
When called, unpersist
prints the following INFO message to the logs:
INFO [RddName]: Removing RDD [id] from persistence list
It then calls SparkContext.unpersistRDD(id, blocking) and sets StorageLevel.NONE as the current storage level.