Discretized Streams (DStreams)

Discretized Stream (DStream) is the fundamental concept of Spark Streaming. It is basically a stream of RDDs with elements being the data received from input streams over batch duration (possibly extended in scope by windowed or stateful operators).

There is no notion of input and output dstreams. DStreams are all instances of DStream abstract class (see DStream Contract in this document). You may however correctly assume that all dstreams are input. And it happens to be so until you register a dstream that marks it as output.

It is represented as org.apache.spark.streaming.dstream.DStream abstract class.

Tip

Enable INFO or DEBUG logging level for org.apache.spark.streaming.dstream.DStream logger to see what happens inside a DStream.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.dstream.DStream=DEBUG

Refer to Logging.

DStream Contract

A DStream is defined by the following properties (with the names of the corresponding methods that subclasses have to implement):

  • dstream dependencies, i.e. a collection of DStreams that this DStream depends on. They are often referred to as parent dstreams.

    def dependencies: List[DStream[_]]
  • slide duration (aka slide interval), i.e. a time interval after which the stream is requested to generate a RDD out of input data it consumes.

    def slideDuration: Duration
  • How to compute (generate) an optional RDD for the given batch if any. validTime is a point in time that marks the end boundary of slide duration.

    def compute(validTime: Time): Option[RDD[T]]

Creating DStreams

You can create dstreams through the built-in input stream constructors using streaming context or more specialized add-ons for external input data sources, e.g. Apache Kafka.

Note
DStreams can only be created before StreamingContext is started.

Zero Time (aka zeroTime)

Zero time (internally zeroTime) is the time when a dstream was initialized.

It serves as the initialization marker (via isInitialized method) and helps calculating intervals for RDD checkpointing (when checkpoint interval is set and the current batch time is a multiple thereof), slicing, and the time validation for a batch (when a dstream generates a RDD).

Remember Interval (aka rememberDuration)

Remember interval (internally rememberDuration) is the time interval for how long a dstream is supposed to remember (aka cache) RDDs created. This is a mandatory attribute of every dstream which is validated at startup.

Note
It is used for metadata cleanup of a dstream.

Initially, when a dstream is created, the remember interval is not set (i.e. null), but is set when the dstream is initialized.

It can be set to a custom value using remember method.

Note
You may see the current value of remember interval when a dstream is validated at startup and the log level is INFO.

generatedRDDs - Internal Cache of Batch Times and Corresponding RDDs

generatedRDDs is an internal collection of pairs of batch times and the corresponding RDDs that were generated for the batch. It acts as a cache when a dstream is requested to compute a RDD for batch (i.e. generatedRDDs may already have the RDD or gets a new RDD added).

generatedRDDs is empty initially, i.e. when a dstream is created.

It is a transient data structure so it is not serialized when a dstream is. It is initialized to an empty collection when deserialized. You should see the following DEBUG message in the logs when it happens:

DEBUG [the simple class name of dstream].readObject used

As new RDDs are added, dstreams offer a way to clear the old metadata during which the old RDDs are removed from generatedRDDs collection.

If checkpointing is used, generatedRDDs collection can be recreated from a storage.

Initializing DStreams (initialize method)

initialize(time: Time): Unit

initialize method sets zero time and optionally checkpoint interval (if the dstream must checkpoint and the interval was not set already) and remember duration.

Note
initialize method is called for output dstreams only when DStreamGraph is started.

The zero time of a dstream can only be set once or be set again to the same zero time. Otherwise, it throws SparkException as follows:

ZeroTime is already initialized to [zeroTime], cannot initialize it again to [time]

It verifies that checkpoint interval is defined when mustCheckpoint was enabled.

Note
The internal mustCheckpoint flag is disabled by default. It is set by custom dstreams like StateDStreams.

If mustCheckpoint is enabled and the checkpoint interval was not set, it is automatically set to the slide interval or 10 seconds, whichever is longer. You should see the following INFO message in the logs when the checkpoint interval was set automatically:

INFO [DStreamType]: Checkpoint interval automatically set to [checkpointDuration]

It then ensures that remember interval is at least twice the checkpoint interval (only if defined) or the slide duration.

At the very end, it initializes the parent dstreams (available as dependencies) that recursively initializes the entire graph of dstreams.

remember Method

remember(duration: Duration): Unit

remember sets remember interval for the current dstream and the dstreams it depends on (see dependencies).

If the input duration is specified (i.e. not null), remember allows setting the remember interval (only when the current value was not set already) or extend it (when the current value is shorter).

You should see the following INFO message in the logs when the remember interval changes:

INFO Duration for remembering RDDs set to [rememberDuration] for [dstream]

At the end, remember always sets the current remember interval (whether it was set, extended or did not change).

Checkpointing DStreams (checkpoint method)

checkpoint(interval: Duration): DStream[T]

You use checkpoint(interval: Duration) method to set up a periodic checkpointing every (checkpoint) interval.

You can only enable checkpointing and set the checkpoint interval before StreamingContext is started or UnsupportedOperationException is thrown as follows:

java.lang.UnsupportedOperationException: Cannot change checkpoint interval of an DStream after streaming context has started
  at org.apache.spark.streaming.dstream.DStream.checkpoint(DStream.scala:177)
  ... 43 elided

Internally, checkpoint method calls persist (that sets the default MEMORY_ONLY_SER storage level).

If checkpoint interval is set, the checkpoint directory is mandatory. Spark validates it when StreamingContext starts and throws a IllegalArgumentException exception if not set.

java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

You can see the value of the checkpoint interval for a dstream in the logs when it is validated:

INFO Checkpoint interval = [checkpointDuration]

Checkpointing

DStreams can checkpoint input data at specified time intervals.

The following settings are internal to a dstream and define how it checkpoints the input data if any.

  • mustCheckpoint (default: false) is an internal private flag that marks a dstream as being checkpointed (true) or not (false). It is an implementation detail and the author of a DStream implementation sets it.

    Refer to Initializing DStreams (initialize method) to learn how it is used to set the checkpoint interval, i.e. checkpointDuration.

  • checkpointDuration is a configurable property that says how often a dstream checkpoints data. It is often called checkpoint interval. If not set explicitly, but the dstream is checkpointed, it will be while initializing dstreams.

  • checkpointData is an instance of DStreamCheckpointData.

  • restoredFromCheckpointData (default: false) is an internal flag to describe the initial state of a dstream, i.e.. whether (true) or not (false) it was started by restoring state from checkpoint.

Validating Setup at Startup (validateAtStart method)

Caution
FIXME Describe me!

Registering Output Streams (register method)

register(): DStream[T]

DStream by design has no notion of being an output stream. It is DStreamGraph to know and be able to differentiate between input and output streams.

DStream comes with internal register method that registers a DStream as an output stream.

The internal private foreachRDD method uses register to register output streams to DStreamGraph. Whenever called, it creates ForEachDStream and calls register upon it. That is how streams become output streams.

Generating Streaming Jobs (generateJob method)

generateJob(time: Time): Option[Job]

The internal generateJob method generates a streaming job for a batch time for a (output) dstream. It may or may not generate a streaming job for the requested batch time.

It computes an RDD for the batch and, if there is one, returns a streaming job for the batch time and a job function that will run a Spark job (with the generated RDD and the job function) when executed.

Note
The Spark job uses an empty function to calculate partitions of a RDD.
Caution
FIXME What happens when SparkContext.runJob(rdd, emptyFunc) is called with the empty function, i.e. (iterator: Iterator[T]) ⇒ {}?

Computing RDD for Batch (getOrCompute method)

The internal (private final) getOrCompute(time: Time) method returns an optional RDD for a batch (time).

It uses generatedRDDs to return the RDD if it has already been generated for the time. If not, it generates one by computing the input stream (using compute(validTime: Time) method).

If there was anything to process in the input stream, i.e. computing the input stream returned a RDD, the RDD is first persisted (only if storageLevel for the input stream is different from StorageLevel.NONE).

You should see the following DEBUG message in the logs:

DEBUG Persisting RDD [id] for time [time] to [storageLevel]

The generated RDD is checkpointed if checkpointDuration is defined and the time interval between current and zero times is a multiple of checkpointDuration.

You should see the following DEBUG message in the logs:

DEBUG Marking RDD [id] for time [time] for checkpointing

The generated RDD is saved in the internal generatedRDDs registry.

Caching and Persisting

Caution
FIXME

Checkpoint Cleanup

Caution
FIXME

restoreCheckpointData

restoreCheckpointData(): Unit

restoreCheckpointData does its work only when the internal transient restoredFromCheckpointData flag is disabled (i.e. false) and is so initially.

Note
restoreCheckpointData method is called when DStreamGraph is requested to restore state of output dstreams.

If restoredFromCheckpointData is disabled, you should see the following INFO message in the logs:

INFO ...DStream: Restoring checkpoint data

DStreamCheckpointData.restore() is executed. And then restoreCheckpointData method is executed for every dstream the current dstream depends on (see DStream Contract).

Once completed, the internal restoredFromCheckpointData flag is enabled (i.e. true) and you should see the following INFO message in the logs:

INFO Restored checkpoint data

Metadata Cleanup

clearMetadata(time: Time) is called to remove old RDDs that have been generated so far (and collected in generatedRDDs). It is a sort of garbage collector.

When clearMetadata(time: Time) is called, it checks spark.streaming.unpersist flag (default enabled).

It collects generated RDDs (from generatedRDDs) that are older than rememberDuration.

You should see the following DEBUG message in the logs:

DEBUG Clearing references to old RDDs: [[time] -> [rddId], ...]

Regardless of spark.streaming.unpersist flag, all the collected RDDs are removed from generatedRDDs.

When spark.streaming.unpersist flag is set (it is by default), you should see the following DEBUG message in the logs:

DEBUG Unpersisting old RDDs: [id1, id2, ...]

For every RDD in the list, it unpersists them (without blocking) one by one and explicitly removes blocks for BlockRDDs. You should see the following INFO message in the logs:

INFO Removing blocks of RDD [blockRDD] of time [time]

After RDDs have been removed from generatedRDDs (and perhaps unpersisted), you should see the following DEBUG message in the logs:

DEBUG Cleared [size] RDDs that were older than [time]: [time1, time2, ...]

The stream passes the call to clear metadata to its dependencies.

updateCheckpointData

updateCheckpointData(currentTime: Time): Unit

When updateCheckpointData is called, you should see the following DEBUG message in the logs:

DEBUG Updating checkpoint data for time [currentTime] ms

It then executes DStreamCheckpointData.update(currentTime) and calls updateCheckpointData method on each dstream the dstream depends on.

When updateCheckpointData finishes, you should see the following DEBUG message in the logs:

DEBUG Updated checkpoint data for time [currentTime]: [checkpointData]

Internal Registries

DStream implementations maintain the following internal properties:

  • storageLevel (default: NONE) is the StorageLevel of the RDDs in the DStream.

  • restoredFromCheckpointData is a flag to inform whether it was restored from checkpoint.

  • graph is the reference to DStreamGraph.

results matching ""

    No results matching ""