Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup async state when multi-threaded shuffle readers fail #10637

Merged
merged 8 commits into from
Apr 1, 2024

Conversation

abellina
Copy link
Collaborator

Closes #10631

We had some reports of file descriptors left open from the multi-threaded shuffle reader, likely due to a wave of other exceptions. I started to look at it and found that it did not handle cases when tasks were cancelled correctly, nor errors such as stream close. I added code in the task completion callback to close out any accumulated future or to-be-processed batch.

I also fixed an issue where we would leak a batch if a stream closes after we materialize the batch, but before we read the next header. I found that one because I was testing this by introducing exceptions manually.

I am going to add a test in RapidsShuffleThreadedReaderSuite shortly.

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
@@ -770,20 +831,27 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](

private def deserializeTask(blockState: BlockState): Unit = {
val slot = RapidsShuffleInternalManagerBase.getNextReaderSlot
val tc = TaskContext.get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used and is showing as a compile error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, thanks


// close any materialized BlockState objects that are holding onto netty buffers or
// file descriptors
pendingIts.foreach(_.close)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safeClose #10502

@abellina abellina marked this pull request as ready for review March 27, 2024 15:20
@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

build

}
futures.clear()
if (fallbackIter != null) {
fallbackIter.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this throws and failedFuture was not empty at the time, we are potentially masking the root cause

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point, I can add it as suppressed, sec.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gerashegalov should be taken care of here: d58b471

@sameerz sameerz added the bug Something isn't working label Mar 27, 2024
@abellina
Copy link
Collaborator Author

build

Copy link
Collaborator

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@abellina abellina merged commit b14b01e into NVIDIA:branch-24.04 Apr 1, 2024
43 checks passed
@abellina abellina deleted the mt_shuffle_reader_cleanup branch April 1, 2024 21:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] pending BlockState leaks blocks if the shuffle read doesn't finish successfully
4 participants