Skip to content

Commit

Permalink
[SPARK-48889][SS] testStream to unload state stores before finishing
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the end of each testStream() call, unload all state stores from the executor

### Why are the changes needed?
Currently, after a test, we don't unload state store or disable maintenance task. So after a test, the maintenance task can run and fail as the checkpoint directory is already deleted. This might cause an issue and fail the next test.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
See existing tests to pass

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47339 from siying/SPARK-48889.

Authored-by: Siying Dong <siying.dong@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
siying authored and HeartSaVioR committed Jul 17, 2024
1 parent b2e0a4d commit 3a24555
Showing 1 changed file with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,13 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
case (key, None) => sparkSession.conf.unset(key)
}
sparkSession.streams.removeListener(listener)
// The state store is stopped here to unload all state stores and terminate all maintenance
// threads. It is necessary because the temp directory used by the checkpoint directory
// may be deleted soon after, and the maintenance thread may see unexpected error and
// cause unexpected behavior. Doing it after a test finishes might be too late because
// sometimes the checkpoint directory is under `withTempDir`, and in this case the temp
// directory is deleted before the test finishes.
StateStore.stop()
}
}

Expand Down

0 comments on commit 3a24555

Please sign in to comment.