ForeachSink

ForeachSink is a typed Sink that passes records (of the type T) to ForeachWriter (one record at a time per partition).

It is used exclusively in foreach operator.

val records = spark.readStream
  .format("text")
  .load("server-logs/*.out")
  .as[String]

import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: String) = println(value)
  override def close(errorOrNull: Throwable) = {}
}

records.writeStream
  .queryName("server-logs processor")
  .foreach(writer)
  .start

Internally, addBatch (the only method from the Sink Contract) takes records from the input DataFrame (as data), transforms them to expected type T (of this ForeachSink) and (now as a Dataset) processes each partition.

addBatch(batchId: Long, data: DataFrame): Unit

It then opens the constructor’s ForeachWriter (for the current partition and the input batch) and passes the records to process (one at a time per partition).

Caution
FIXME Why does Spark track whether the writer failed or not? Why couldn’t it finally and do close?
Caution
FIXME Can we have a constant for "foreach" for source in DataStreamWriter?

ForeachWriter

Caution
FIXME

results matching ""

    No results matching ""