StreamExecution
is the execution environment of a single continuous query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.
Note
|
Continuous query, streaming query, continuous Dataset, streaming Dataset are synonyms, and StreamExecution uses analyzed logical plan internally to refer to it.
|
Note
|
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
|
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.minutes)).
start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Note
|
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink. |
StreamExecution
starts a thread of execution that runs the streaming query continuously and concurrently (and polls for new records in the streaming data sources to create a batch every trigger).
StreamExecution
can be in three states:
-
INITIALIZED
when the instance was created. -
ACTIVE
when batches are pulled from the sources. -
TERMINATED
when executing streaming batches has been terminated due to an error, all batches were successfully processed orStreamExecution
has been stopped.
StreamExecution
is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener
events.
StreamExecution
tracks streaming data sources in uniqueSources internal registry.
StreamExecution
collects durationMs
for the execution units of streaming batches.
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
scala> println(q.lastProgress)
{
"id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
"runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
"name" : null,
"timestamp" : "2017-08-14T20:30:00.004Z",
"batchId" : 1,
"numInputRows" : 432,
"inputRowsPerSecond" : 0.9993568953312452,
"processedRowsPerSecond" : 1380.1916932907347,
"durationMs" : {
"addBatch" : 237,
"getBatch" : 26,
"getOffset" : 0,
"queryPlanning" : 1,
"triggerExecution" : 313,
"walCommit" : 45
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : 0,
"endOffset" : 432,
"numInputRows" : 432,
"inputRowsPerSecond" : 0.9993568953312452,
"processedRowsPerSecond" : 1380.1916932907347
} ],
"sink" : {
"description" : "ConsoleSink[numRows=20, truncate=true]"
}
}
StreamExecution
uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.
Tip
|
Monitor offsets and commits metadata logs to know the progress of a streaming query.
|
StreamExecution
delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.
Name | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|
StreamProgress of the streaming sources with their available and unprocessed offsets.
Set when (in order):
Used when:
|
|||||||||
Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention). |
|||||||||
BatchCommitLog with
Used when
|
|||||||||
StreamProgress of the streaming sources and the committed offsets (i.e. processed already).
|
|||||||||
Current batch number
|
|||||||||
Unique identifier of the streaming query Set as the
|
|||||||||
Last IncrementalExecution |
|||||||||
Lazily-generated logical plan (i.e.
Initialized right after Used mainly when While initializing,
While initializing, |
|||||||||
Thread of execution to run a streaming query concurrently with the name as When started,
|
|||||||||
Registry of the streaming sources (in logical query plan) that have new data available in the current batch. The new data is a streaming
Set exclusively when Used exclusively when |
|||||||||
Flag whether there are any new offsets available for processing or not. Turned on (i.e. enabled) when constructing the next streaming batch when no new offsets are available. |
|||||||||
OffsetSeqLog with
Used when
|
|||||||||
|
|||||||||
Time delay before polling new data again when no data was available Set to spark.sql.streaming.pollingDelay Spark property. Used when |
|||||||||
Pretty-identified string for identification in logs (with name if defined).
|
|||||||||
Qualified path of the checkpoint directory (as defined using checkpointRoot when
Used when creating the path to the checkpoint directory and when Used for logicalPlan (while transforming analyzedPlan and planning
Internally,
|
|||||||||
Current run id |
|||||||||
All streaming Sources in logical query plan (that are the sources from |
|||||||||
Java’s java.util.concurrent.CountDownLatch with count Used when |
|||||||||
Java’s java.util.concurrent.atomic.AtomicReference for the three different states a streaming query execution can be:
|
|||||||||
|
|||||||||
|
|||||||||
TriggerExecutor per Trigger:
Used when
|
|||||||||
Unique streaming data sources in a streaming Dataset (after being collected as
Used when
|
Tip
|
Enable Add the following line to
Refer to Logging. |
runBatch(sparkSessionToRunBatch: SparkSession): Unit
runBatch
performs the following steps (aka phases):
-
getBatch Phase — Requesting New (and Hence Unprocessed) Data From Streaming Sources
-
triggerLogicalPlan Phase — Transforming Catalyst Expressions
-
queryPlanning Phase — Creating IncrementalExecution for Current Streaming Batch
-
nextBatch Phase — Creating Dataset (with IncrementalExecution for New Data)
-
awaitBatchLock Phase — Waking Up Threads Waiting For Stream to Progress
Note
|
runBatch is used exclusively when StreamExecution runs streaming batches.
|
Internally, runBatch
first requests the streaming sources for unprocessed data (and stores them as DataFrames
in newData internal registry).
In getBatch time-tracking section, runBatch
goes over the available offsets per source and processes the offsets that have not been committed yet.
runBatch
then requests every source for the data (as DataFrame
with the new records).
Note
|
runBatch requests the streaming sources for new DataFrames sequentially, source by source.
|
You should see the following DEBUG message in the logs:
DEBUG StreamExecution: Retrieving data from [source]: [current] -> [available]
You should then see the following DEBUG message in the logs:
DEBUG StreamExecution: getBatch took [timeTaken] ms
withNewSources Phase — Replacing StreamingExecutionRelations (in Logical Plan) With Relations With New Data or Empty LocalRelation
In withNewSources phase, runBatch
transforms logical query plan and replaces every StreamingExecutionRelation logical operator with the logical plan of the DataFrame
with the input data in a batch for the corresponding streaming source.
Note
|
StreamingExecutionRelation logical operator is used to represent a streaming source in the logical query plan of a streaming Dataset .
|
runBatch
finds the corresponding DataFrame
(with the input data) per streaming source in newData internal registry. If found, runBatch
takes the logical plan of the DataFrame
. If not, runBatch
creates a LocalRelation
logical relation (for the output schema).
Note
|
newData internal registry contains entries for streaming sources that have new data available in the current batch. |
While replacing StreamingExecutionRelation
operators, runBatch
records the output schema of the streaming source (from StreamingExecutionRelation
) and the DataFrame
with the new data (in replacements
temporary internal buffer).
runBatch
makes sure that the output schema of the streaming source with a new data in the batch has not changed. If the output schema has changed, runBatch
reports…FIXME
runBatch
transforms Catalyst expressions in withNewSources
new logical plan (using replacements
temporary internal buffer).
-
Catalyst
Attribute
is replaced with one if recorded inreplacements
internal buffer (that corresponds to the attribute in theDataFrame
with the new input data in the batch) -
CurrentTimestamp
andCurrentDate
Catalyst expressions are replaced withCurrentBatchTimestamp
expression (withbatchTimestampMs
from OffsetSeqMetadata).
Note
|
Find more about |
Note
|
Find more about |
In queryPlanning time-tracking section, runBatch
creates a new IncrementalExecution
with the following:
-
Transformed logical query plan with logical relations for every streaming source and corresponding attributes
-
the streaming query’s output mode
-
state
checkpoint directory for managing state
The new IncrementalExecution
is recorded in lastExecution property.
Before leaving queryPlanning section, runBatch
forces preparation of the physical plan for execution (i.e. requesting IncrementalExecution for executedPlan).
Note
|
executedPlan is a physical plan (i.e. SparkPlan ) ready for execution with state optimization rules applied.
|
runBatch
creates a DataFrame
with the new IncrementalExecution (as QueryExecution
) and its analyzed output schema.
Note
|
The new DataFrame represents the result of a streaming query.
|
In addBatch time-tracking section, runBatch
requests the one and only streaming Sink to add the results of a streaming query (as the DataFrame
created in nextBatch Phase).
Note
|
runBatch uses Sink.addBatch method to request the Sink to add the results.
|
Note
|
runBatch uses SQLExecution.withNewExecutionId to execute and track all the Spark actions (under one execution id) that Sink can use when requested to add the results.
|
Note
|
The new DataFrame will only be executed in Sink.addBatch .
|
Note
|
SQLExecution.withNewExecutionId posts a SparkListenerSQLExecutionStart event before executing Sink.addBatch and a SparkListenerSQLExecutionEnd event right afterwards.
|
Tip
|
Register You can find more information on |
In awaitBatchLock code block (it is not a time-tracking section), runBatch
acquires a lock on awaitBatchLock, wakes up all waiting threads on awaitBatchLockCondition and immediatelly releases awaitBatchLock lock.
Note
|
awaitBatchLockCondition is used mainly when StreamExecution processAllAvailable (and also when awaitOffset , but that seems mainly for testing).
|
runBatches(): Unit
runBatches
runs streaming batches of data (that are datasets from every streaming source).
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val out = spark.
readStream.
text("server-logs").
writeStream.
format("console").
queryName("debug").
trigger(Trigger.ProcessingTime(10.seconds))
scala> val debugStream = out.start
INFO StreamExecution: Starting debug [id = 8b57b0bd-fc4a-42eb-81a3-777d7ba5e370, runId = 920b227e-6d02-4a03-a271-c62120258cea]. Use file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-274f9ae1-1238-4088-b4a1-5128fc520c1f to store the query checkpoint.
debugStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@58a5b69c
// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
"id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
"runId" : "920b227e-6d02-4a03-a271-c62120258cea",
"name" : "debug",
"timestamp" : "2017-06-18T19:21:07.693Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 5,
"triggerExecution" : 9
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
}
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
Internally, runBatches
assigns the group id (to all the Spark jobs started by this thread) as runId (with the group description to display in web UI as getBatchDescriptionString and interruptOnCancel
flag enabled).
Note
|
You can find the details on |
runBatches
sets a local property sql.streaming.queryId
as id.
runBatches
registers a metric source when spark.sql.streaming.metricsEnabled property is enabled (which is disabled by default).
Caution
|
FIXME Metrics |
runBatches
notifies StreamingQueryListeners
that a streaming query has been started (by posting a QueryStartedEvent with id, runId and name).
runBatches
unblocks the main starting thread (by decrementing the count of startLatch that goes to 0
and lets the starting thread continue).
Caution
|
FIXME A picture with two parallel lanes for the starting thread and daemon one for the query. |
runBatches
updates the status message to Initializing sources followed by initialization of the logical plan (of the streaming Dataset).
runBatches
disables adaptive query execution (using spark.sql.adaptive.enabled
property which is disabled by default) as it could change the number of shuffle partitions.
runBatches
initializes offsetSeqMetadata internal variable.
runBatches
sets state to ACTIVE
(only when the current state is INITIALIZING
that prevents from repeating the initialization)
Note
|
runBatches does the work only when first started (i.e. when state is INITIALIZING ).
|
runBatches
decrements the count of initializationLatch.
Caution
|
FIXME initializationLatch so what?
|
runBatches
requests TriggerExecutor to start executing batches (aka triggers) by executing a batch runner.
Once TriggerExecutor has finished executing batches, runBatches
updates the status message to Stopped.
Note
|
TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED ).
|
Caution
|
FIXME Describe catch block for exception handling
|
Caution
|
FIXME Describe finally block for query termination
|
Note
|
runBatches is used exclusively when StreamExecution starts the execution thread for a streaming query (i.e. the thread that runs the micro-batches of this stream).
|
Batch Runner (aka batchRunner
) is an executable block executed by TriggerExecutor in runBatches.
batchRunner
starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED
), batchRunner
executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches
branches off per currentBatchId.
currentBatchId < 0 | currentBatchId >= 0 |
---|---|
|
If there is data available in the sources, batchRunner
marks currentStatus with isDataAvailable
enabled.
Note
|
You can check out the status of a streaming query using status method. scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for next trigger",
"isDataAvailable" : false,
"isTriggerActive" : false
} |
batchRunner
then updates the status message to Processing new data and runs the current streaming batch.
After triggerExecution section has finished, batchRunner
finishes the streaming batch for the trigger (and collects query execution statistics).
When there was data available in the sources, batchRunner
updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).
You should see the following DEBUG message in the logs:
DEBUG batch $currentBatchId committed
batchRunner
increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner
does the following:
-
Marks currentStatus with
isDataAvailable
disabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner
updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit
populateStartOffsets
requests OffsetSeqLog for the latest committed batch id with its metadata if available.
Note
|
The batch id could not be available in metadata log if a streaming query started with a new metadata log or no batch was committed before. |
With the latest committed batch id with the metadata (from OffsetSeqLog) populateStartOffsets
sets current batch id to the latest committed batch id, and availableOffsets to its offsets (considering them unprocessed yet).
Note
|
populateStartOffsets may re-execute the latest committed batch.
|
If the latest batch id is greater than 0
, populateStartOffsets
requests OffsetSeqLog for the second latest batch with its metadata (or reports a IllegalStateException
if not found). populateStartOffsets
sets committed offsets to the second latest committed offsets.
populateStartOffsets
updates the offset metadata.
Caution
|
FIXME Why is the update needed? |
populateStartOffsets
requests BatchCommitLog for the latest processed batch id with its metadata if available.
(only when the latest batch in OffsetSeqLog is also the latest batch in BatchCommitLog) With the latest processed batch id with the metadata (from BatchCommitLog), populateStartOffsets
sets current batch id as the next after the latest processed batch. populateStartOffsets
sets committed offsets to availableOffsets.
Caution
|
FIXME Describe what happens with availableOffsets .
|
populateStartOffsets
constructs the next streaming batch.
Caution
|
FIXME Describe the WARN message when latestCommittedBatchId < latestBatchId - 1 .
|
WARN Batch completion log latest batch id is [latestCommittedBatchId], which is not trailing batchid [latestBatchId] by one
You should see the following DEBUG message in the logs:
DEBUG Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]
Caution
|
FIXME Include an example of Resuming at batch |
When the latest committed batch id with the metadata could not be found in BatchCommitLog, populateStartOffsets
prints out the following INFO message to the logs:
INFO no commit log present
Caution
|
FIXME Include an example of the case when no commit log present. |
When the latest committed batch id with the metadata could not be found in OffsetSeqLog, it is assumed that the streaming query is started for the first time. You should see the following INFO message in the logs:
INFO StreamExecution: Starting new streaming query.
populateStartOffsets
sets current batch id to 0
and constructs the next streaming batch.
Note
|
populateStartOffsets is used exclusively when TriggerExecutor executes a batch runner for the first time (i.e. current batch id is negative).
|
toDebugString(includeLogicalPlan: Boolean): String
toDebugString
…FIXME
Note
|
toDebugString is used exclusively when StreamExecution runs streaming batches (and a streaming query terminated with exception).
|
start(): Unit
When called, start
prints the following INFO message to the logs:
INFO Starting [id]. Use [resolvedCheckpointRoot] to store the query checkpoint.
start
then sets microBatchThread as a daemon thread and starts it.
Note
|
start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
|
Note
|
When started, a streaming query runs in its own execution thread on JVM. |
In the end, start
waits until startLatch has counted down to zero (which is right after StreamExecution
has started running streaming batches so there is some pause in the main thread’s execution to wait till the streaming query execution thread starts).
Note
|
start is used exclusively when StreamingQueryManager is requested to start a streaming query.
|
StreamExecution
takes the following when created:
-
Output mode (that is only used when creating
IncrementalExecution
for a streaming batch in query planning)
StreamExecution
initializes the internal registries and counters.
checkpointFile(name: String): String
checkpointFile
gives the path of a directory with name
in checkpoint directory.
Note
|
checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path .
|
Note
|
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
|
constructNextBatch(): Unit
constructNextBatch
is made up of the following three parts:
-
Firstly, checking if there is new data available by requesting new offsets from every streaming source
-
There is some data to process (and so where the next batch is constructed)
Note
|
|
constructNextBatch
starts by checking whether or not a new data is available in any of the streaming sources (in the logical query plan).
constructNextBatch
acquires awaitBatchLock and gets the latest offset from every streaming data source.
Note
|
constructNextBatch checks out the latest offset in every streaming data source sequentially, i.e. one data source at a time.
|
Note
|
constructNextBatch uses the Source contract to get the latest offset (using Source.getOffset method).
|
constructNextBatch
updates the status message to Getting offsets from [source] for every streaming data source.
In getOffset time-tracking section, constructNextBatch
gets the offsets.
constructNextBatch
prints out the following DEBUG message to the logs:
DEBUG StreamExecution: getOffset took [time] ms
constructNextBatch
adds the streaming sources that have the available offsets to availableOffsets.
If there is no data available (i.e. no offsets unprocessed in any of the streaming data sources), constructNextBatch
turns noNewData flag on.
In the end (of this checking-data block), constructNextBatch
releases awaitBatchLock
When new data is available, constructNextBatch
updates the event time watermark (tracked using offsetSeqMetadata) if it finds one in the last IncrementalExecution.
If lastExecution is available (which may not when constructNextBatch
is executed the very first time), constructNextBatch
takes the executed physical plan (i.e. SparkPlan
) and collects all EventTimeWatermarkExec
physical operators with the count of eventTimeStats greater than 0
.
Note
|
The executed physical plan is available as executedPlan property of IncrementalExecution (which is a custom QueryExecution ).
|
You should see the following DEBUG message in the logs:
DEBUG StreamExecution: Observed event time stats: [eventTimeStats]
constructNextBatch
calculates the difference between the maximum value of eventTimeStats
and delayMs for every EventTimeWatermarkExec
physical operator.
Note
|
The maximum value of eventTimeStats is the youngest time, i.e. the time the closest to the current time.
|
constructNextBatch
then takes the first difference (if available at all) and uses it as a possible new event time watermark.
If the event time watermark candidate is greater than the current watermark (i.e. later time-wise), constructNextBatch
prints out the following INFO message to the logs:
INFO StreamExecution: Updating eventTime watermark to: [newWatermarkMs] ms
constructNextBatch
creates a new OffsetSeqMetadata with the new event time watermark and the current time.
Otherwise, if the eventTime watermark candidate is not greater than the current watermark, constructNextBatch
simply prints out the following DEBUG message to the logs:
DEBUG StreamExecution: Event time didn't move: [newWatermarkMs] <= [batchWatermarkMs]
constructNextBatch
creates a new OffsetSeqMetadata with just the current time.
Note
|
Although constructNextBatch collects all the EventTimeWatermarkExec physical operators in the executed physical plan of lastExecution, only the first matters if available.
|
Note
|
A physical plan can have as many EventTimeWatermarkExec physical operators as withWatermark operator was used to create a streaming query.
|
Note
|
Streaming watermark can be changed between a streaming query’s restarts (and be different between what is checkpointed and the current version of the query). FIXME True? Example? |
constructNextBatch
then adds the offsets to metadata log.
constructNextBatch
updates the status message to Writing offsets to log.
In walCommit time-tracking section,
constructNextBatch
adds the offsets in the batch to OffsetSeqLog.
Note
|
While writing the offsets to the metadata log,
|
constructNextBatch
reports a AssertionError
when writing to the metadata log has failed.
Concurrent update to the log. Multiple streaming jobs detected for [currentBatchId]
Tip
|
Use StreamingQuery.lastProgress to access scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit") |
Tip
|
Enable INFO logging level for
|
constructNextBatch
commits the offsets for the batch (only when current batch id is not 0
, i.e. when the query has just been started and constructNextBatch
is called the first time).
constructNextBatch
takes the previously-committed batch (from OffsetSeqLog), extracts the stored offsets per source.
Note
|
constructNextBatch uses OffsetSeq.toStreamProgress and sources registry to extract the offsets per source.
|
constructNextBatch
requests every streaming source to commit the offsets
Note
|
constructNextBatch uses the Source contract to commit the offsets (using Source.commit method).
|
constructNextBatch
reports a IllegalStateException
when current batch id is 0
.
batch [currentBatchId] doesn't exist
In the end, constructNextBatch
purges OffsetSeqLog and BatchCommitLog when current batch id is above spark.sql.streaming.minBatchesToRetain Spark property.
If there is no new data available, constructNextBatch
acquires a lock on awaitBatchLock, wakes up all waiting threads that are waiting for the stream to progress (using awaitBatchLockCondition), followed by releasing the lock on awaitBatchLock.
postEvent(event: StreamingQueryListener.Event): Unit
Note
|
postEvent is a part of ProgressReporter Contract.
|
postEvent
simply requests the StreamingQueryManager
to post the input event (to the StreamingQueryListenerBus in the current SparkSession
).
Note
|
postEvent uses SparkSession to access the current StreamingQueryManager .
|
Note
|
|
dataAvailable: Boolean
dataAvailable
finds the streaming sources in availableOffsets for which the offsets committed (as recorded in committedOffsets) are different or do not exist at all.
If there are any differences in the number of sources or their committed offsets, dataAvailable
is enabled (i.e. true
).
Note
|
dataAvailable is used when StreamExecution runs streaming batches and constructs the next streaming batch.
|
Waiting Until No Data Available in Sources or Query Has Been Terminated — processAllAvailable
Method
processAllAvailable(): Unit
Note
|
processAllAvailable is a part of StreamingQuery Contract.
|
processAllAvailable
reports streamDeathCause exception if defined (and returns).
Note
|
streamDeathCause is defined exclusively when StreamExecution runs streaming batches (and terminated with an exception).
|
processAllAvailable
returns when isActive flag is turned off (which is when StreamExecution
is in TERMINATED
state).
processAllAvailable
acquires a lock on awaitBatchLock and turns noNewData flag off.
processAllAvailable
keeps waiting 10 seconds for awaitBatchLockCondition until noNewData flag is turned on or StreamExecution
is no longer active.
Note
|
noNewData flag is turned on exclusively when StreamExecution constructs the next streaming batch (and finds that no data is available).
|
In the end, processAllAvailable
releases awaitBatchLock lock.