val records = spark.readStream
.format("text")
.load("server-logs/*.out")
.as[String]
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
override def open(partitionId: Long, version: Long) = true
override def process(value: String) = println(value)
override def close(errorOrNull: Throwable) = {}
}
records.writeStream
.queryName("server-logs processor")
.foreach(writer)
.start
ForeachSink
ForeachSink
is a typed Sink that passes records (of the type T
) to ForeachWriter (one record at a time per partition).
It is used exclusively in foreach operator.
Internally, addBatch
(the only method from the Sink Contract) takes records from the input DataFrame (as data
), transforms them to expected type T
(of this ForeachSink
) and (now as a Dataset) processes each partition.
addBatch(batchId: Long, data: DataFrame): Unit
It then opens the constructor’s ForeachWriter (for the current partition and the input batch) and passes the records to process (one at a time per partition).
Caution
|
FIXME Why does Spark track whether the writer failed or not? Why couldn’t it finally and do close ?
|
Caution
|
FIXME Can we have a constant for "foreach" for source in DataStreamWriter ?
|
ForeachWriter
Caution
|
FIXME |