Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve host memory spill interfaces #10065

Merged
merged 5 commits into from
Dec 19, 2023

Conversation

jbrennan333
Copy link
Collaborator

This fixes #10004

This changes the host memory spill to look more like SpillableColumnarBatch. Instead of using withHostMemoryReadOnly and withHostMemoryWriteLock, we can instead just the SpillableHostBuffer.getHostBuffer().

This also changes RapidsDiskBuffer.getMemoryBuffer to no longer retain a HostMemoryBuffer, and to use HostAlloc.alloc() for obtaining the HostMemoryBuffer it returns. This may require callers to handle retries.

I have also made changes to InternalRowToColumnarBatchIterator to use this new interface.

I am putting this up as a draft so I can get some feedback on the approach.

@jbrennan333 jbrennan333 added bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Dec 17, 2023
@jbrennan333 jbrennan333 self-assigned this Dec 17, 2023
@jbrennan333
Copy link
Collaborator Author

build

) {
used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate);
}
hBufs = getHostBuffersWithRetry(sdb, sob);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain there is a good reason to have a second block here. I did it this way as a test, because previously I was hitting problems with the with-write-lock followed by with-read-only blocks. But at this point I think I could just combine these two blocks.

case Some(existingBuffer) => existingBuffer
case None =>
val maybeNewBuffer = hostStorage.copyBuffer(buffer, this, stream)
maybeNewBuffer.map { newBuffer =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer is initially created as spillable. But it changes to unspillable when the caller does a getHostMemoryBuffer on it. Not sure if I should be concerned about this brief window of spillability?

maybeNewBuffer.map { newBuffer =>
logDebug(s"got new RapidsHostMemoryStore buffer ${newBuffer.id}")
newBuffer.addReference() // add a reference since we are about to use it
updateTiers(BufferSpill(buffer, Some(newBuffer)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug log output from update tiers make this sound like a spill, where in fact this is an unspill. Perhaps I should modify updateTiers to compare the storageTiers and change the log appropriately.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was created only with spill in mind, not unspill. Could BufferSpill be part of a hierarchy of classes:

SpillAction and BufferSpill is a SpillAction and BufferUnspill is also a SpillAction?

val serializerManager = diskBlockManager.getSerializerManager()
val memBuffer = if (serializerManager.isRapidsSpill(id)) {
// Only go through serializerManager's stream wrapper for spill case
closeOnExcept(HostAlloc.alloc(uncompressedSize)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this HostAlloc.alloc() is what really increases how much heap memory I can get away with when running queries. This also requires that any code that ends up in here will likely need a retry.

case _ =>
throw new IllegalStateException("copying from buffer without device memory")
// If the other is from the local disk store, we are unspilling to host memory.
if (other.storageTier == StorageTier.DISK) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally tested this without this block for StorageTier.DISK, and it worked.
But in that case we do a HostAlloc here and another one in the disk store, and then copy the buffer to this one. I changed it to just take over the buffer from the disk store to eliminate the extra alloc/copy.

rapidsBuffer.getHostMemoryBuffer
}
}

/**
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not remove withHostBufferReadOnly/withHostBufferWriteLock yet, but I don't think we need them if we switch to this alternate approach?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove them if they are not useful

@abellina abellina self-requested a review December 18, 2023 15:18
@jbrennan333
Copy link
Collaborator Author

I have tested this by running existing unit and integration tests, and I have also been using nds queries to test this.
By setting these configs:

 --conf spark.rapids.sql.exec.ShuffleExchangeExec=false \
 --conf spark.rapids.memory.host.offHeapLimit.enabled=true \
 --conf spark.rapids.memory.host.offHeapLimit.size=32G \

I am able to force a lot of InternalRowToColumnarBatchIterator activity. I have verified that I can run the full nds power run and get correct results with memory restricted to 32G. Ideally we should be able to go lower, but I am going to leave that as a separate investigation.
I have also done a benchmark test on an 8-node A100 cluster to verify that there are no performance impacts from this change (this was done without limiting off-heap memory).

updateTiers(BufferSpill(buffer, Some(newBuffer)))
buffer.safeFree()
newBuffer
}.get // the GPU store has to return a buffer here for now, or throw OOM
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}.get // the GPU store has to return a buffer here for now, or throw OOM
}.get // the host store has to return a buffer here for now, or throw OOM

@abellina
Copy link
Collaborator

This LGTM @jbrennan333

try (
SpillableHostBuffer sdb = bufsAndNumRows._1[0];
int used[];
try (SpillableHostBuffer sdb = bufsAndNumRows._1[0];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced by this PR but since it touches them, can we update these variables to be more mnemonic than sdb and sob to improve code readability

@jbrennan333 jbrennan333 marked this pull request as ready for review December 18, 2023 23:13
@jbrennan333
Copy link
Collaborator Author

build

Copy link
Collaborator

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still trying to understand big picture, just nits

@@ -208,6 +209,23 @@ public ColumnarBatch next() {
}
}

private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer sdb, SpillableHostBuffer sob) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer sdb, SpillableHostBuffer sob) {
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) {

Comment on lines 214 to 225
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
try {
hBufs[0] = sdb.getHostBuffer();
hBufs[1] = sob.getHostBuffer();
return hBufs;
} finally {
// If the second buffer is null, we must have thrown, so close the first one.
if ((hBufs[1] == null) && (hBufs[0] != null)) {
hBufs[0].close();
hBufs[0] = null;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to test for the exception implicitly, would this also work?

Suggested change
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
try {
hBufs[0] = sdb.getHostBuffer();
hBufs[1] = sob.getHostBuffer();
return hBufs;
} finally {
// If the second buffer is null, we must have thrown, so close the first one.
if ((hBufs[1] == null) && (hBufs[0] != null)) {
hBufs[0].close();
hBufs[0] = null;
}
}
HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer();
HostMemoryBuffer offsetsBuffer = null;
try {
offsetsBuffer = spillableOffestBuffer.getHostBuffer();
} catch (Throwable t) {
dataBuffer.close();
}
return new HostMemoryBuffer[] {dataBuffer, offsetsBuffer};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gerashegalov. I have simplified this by just doing a try-with-resources for both buffers and incrementing the refcounts in the body so they are retained if didn't throw.

@jbrennan333
Copy link
Collaborator Author

build

@jbrennan333 jbrennan333 merged commit 2edf82d into NVIDIA:branch-24.02 Dec 19, 2023
37 of 38 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] If a host memory buffer is spilled, it cannot be unspilled
3 participants