MemoryStream

MemoryStream is a streaming source that produces values (of type T) stored in memory.

It uses the internal batches collection of datasets.

Caution

This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.

It is designed primarily for unit tests, tutorials and debugging.

import org.apache.spark.sql.execution.streaming.MemoryStream

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder.getOrCreate()

implicit val ctx = spark.sqlContext

// It uses two implicits: Encoder[Int] and SQLContext
val intsInput = MemoryStream[Int]

scala> val memoryQuery = ints.toDF.writeStream.format("memory").queryName("memStream").start
memoryQuery: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - memStream [state = ACTIVE]

scala> val zeroOffset = intsInput.addData(0, 1, 2)
zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0

memoryQuery.processAllAvailable()
val intsOut = spark.table("memStream").as[Int]
scala> intsOut.show
+-----+
|value|
+-----+
|    0|
|    1|
|    2|
+-----+

memoryQuery.stop()
Caution
FIXME Finish the example
Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=DEBUG

Refer to Logging.

Creating MemoryStream Instance

apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A]

MemoryStream object defines apply method that you can use to create instances of MemoryStream streaming sources.

Adding Data to Source (addData methods)

addData(data: A*): Offset
addData(data: TraversableOnce[A]): Offset

addData methods add the input data to batches internal collection.

When executed, addData adds a DataFrame (created using toDS implicit method) and increments the internal currentOffset offset.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: Adding ds: [ds]

Getting Next Batch (getBatch method)

Note
getBatch is a part of Streaming Source contract.

When executed, getBatch uses the internal batches collection to return requested offsets.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]

StreamingExecutionRelation Logical Plan

MemoryStream uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.

StreamingExecutionRelation is a leaf logical node that is created for a streaming source and a given output collection of Attribute. It is a streaming logical plan with the name being the name of the source.

scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]

scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true

scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]

Schema (schema method)

MemoryStream works with the data of the schema as described by the Encoder (of the Dataset).

results matching ""

    No results matching ""