import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder.getOrCreate()
implicit val ctx = spark.sqlContext
// It uses two implicits: Encoder[Int] and SQLContext
val intsInput = MemoryStream[Int]
scala> val memoryQuery = ints.toDF.writeStream.format("memory").queryName("memStream").start
memoryQuery: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - memStream [state = ACTIVE]
scala> val zeroOffset = intsInput.addData(0, 1, 2)
zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0
memoryQuery.processAllAvailable()
val intsOut = spark.table("memStream").as[Int]
scala> intsOut.show
+-----+
|value|
+-----+
| 0|
| 1|
| 2|
+-----+
memoryQuery.stop()
MemoryStream
MemoryStream
is a streaming source that produces values (of type T
) stored in memory.
Caution
|
This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery. It is designed primarily for unit tests, tutorials and debugging. |
Caution
|
FIXME Finish the example |
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating MemoryStream Instance
apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A]
MemoryStream
object defines apply
method that you can use to create instances of MemoryStream
streaming sources.
Adding Data to Source (addData methods)
addData(data: A*): Offset
addData(data: TraversableOnce[A]): Offset
addData
methods add the input data
to batches internal collection.
When executed, addData
adds a DataFrame
(created using toDS implicit method) and increments the internal currentOffset
offset.
You should see the following DEBUG message in the logs:
DEBUG MemoryStream: Adding ds: [ds]
Getting Next Batch (getBatch method)
Note
|
getBatch is a part of Streaming Source contract.
|
When executed, getBatch
uses the internal batches collection to return requested offsets.
You should see the following DEBUG message in the logs:
DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]
StreamingExecutionRelation Logical Plan
MemoryStream
uses StreamingExecutionRelation
logical plan to build Datasets or DataFrames when requested.
StreamingExecutionRelation
is a leaf logical node that is created for a streaming source and a given output
collection of Attribute. It is a streaming logical plan with the name being the name of the source.
scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]
scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true
scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]