def stateSnapshots(): DStream[(KeyType, StateType)]
MapWithStateDStream
MapWithStateDStream
is the result of mapWithState stateful operator.
It extends DStream Contract with the following additional method:
Note
|
MapWithStateDStream is a Scala sealed abstract class (and hence all the available implementations are in the source file).
|
Note
|
MapWithStateDStreamImpl is the only implementation of MapWithStateDStream (see below in this document for more coverage).
|
MapWithStateDStreamImpl
MapWithStateDStreamImpl
is an internal DStream with dependency on the parent dataStream
key-value dstream. It uses a custom internal dstream called internalStream
(of type InternalMapWithStateDStream).
slideDuration
is exactly the slide duration of the internal stream internalStream
.
dependencies
returns a single-element collection with the internal stream internalStream
.
The compute
method may or may not return a RDD[MappedType]
by getOrCompute
on the internal stream and…TK
Caution
|
FIXME |
InternalMapWithStateDStream
InternalMapWithStateDStream
is an internal dstream to support MapWithStateDStreamImpl and uses dataStream
(as parent
of type DStream[(K, V)]
) as well as StateSpecImpl[K, V, S, E]
(as spec
).
It is a DStream[MapWithStateRDDRecord[K, S, E]]
.
It uses StorageLevel.MEMORY_ONLY
storage level by default.
It uses the StateSpec’s partitioner or HashPartitioner (with SparkContext’s defaultParallelism).
slideDuration
is the slide duration of parent
.
dependencies
is a single-element collection with the parent
stream.
It forces checkpointing (i.e. mustCheckpoint
flag is enabled).
When initialized, if checkpoint interval is not set, it sets it as ten times longer than the slide duration of the parent
stream (the multiplier is not configurable and always 10
).
Computing a RDD[MapWithStateRDDRecord[K, S, E]]
(i.e. compute
method) first looks up a previous RDD for the last slideDuration
.
If the RDD is found, it is returned as is given the partitioners of the RDD and the stream are equal. Otherwise, when the partitioners are different, the RDD is "repartitioned" using MapWithStateRDD.createFromRDD
.
Caution
|
FIXME MapWithStateRDD.createFromRDD
|