diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index c93286802244c..33d4cb1f2f465 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -47,6 +47,7 @@ import java.lang.invoke.VarHandle; import java.lang.reflect.Array; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -708,7 +709,7 @@ int getFreq(CacheFileRegion cacheFileRegion) { @Override public void close() { - sharedBytes.decRef(); + sharedBytes.close(); } // used by tests @@ -884,7 +885,7 @@ private static void throwAlreadyEvicted() { */ boolean tryRead(ByteBuffer buf, long offset) throws IOException { SharedBytes.IO ioRef = this.io; - if (ioRef != null && ioRef.tryIncRef()) { + if (ioRef != null) { try { int readBytes = ioRef.read(buf, getRegionRelativePosition(offset)); if (isEvicted()) { @@ -892,8 +893,9 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException { return false; } return true; - } finally { - ioRef.decRef(); + } catch (ClosedChannelException e) { + // the cache file channel has been closed + return false; } } else { // taken by someone else diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java index 46e7b2c7d3889..2e4653cb92266 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java @@ -9,13 +9,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.blobcache.BlobCacheUtils; import org.elasticsearch.blobcache.common.ByteBufferReference; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Streams; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.env.Environment; @@ -32,7 +31,7 @@ import java.nio.file.StandardOpenOption; import java.util.function.IntConsumer; -public class SharedBytes extends AbstractRefCounted { +public class SharedBytes extends AbstractRefCounted implements Releasable { /** * Thread local direct byte buffer to aggregate multiple positional writes to the cache file. @@ -118,6 +117,16 @@ public class SharedBytes extends AbstractRefCounted { this.readBytes = readBytes; } + @Override + public void close() { + if (ios != null) { + for (var io : ios) { + io.decRef(); + } + } + decRef(); + } + /** * Tries to find a suitable path to a searchable snapshots shared cache file in the data paths founds in the environment. * @@ -287,10 +296,9 @@ public IO getFileChannel(int sharedBytesPos) { return ios[sharedBytesPos]; } - public final class IO implements RefCounted { + public final class IO extends AbstractRefCounted { private final long pageStart; - private final MappedByteBuffer mappedByteBuffer; private IO(final int sharedBytesPos, MappedByteBuffer mappedByteBuffer) { @@ -298,28 +306,12 @@ private IO(final int sharedBytesPos, MappedByteBuffer mappedByteBuffer) { assert physicalOffset <= (long) numRegions * regionSize; this.pageStart = physicalOffset; this.mappedByteBuffer = mappedByteBuffer; + SharedBytes.this.incRef(); } @Override - public boolean tryIncRef() { - return SharedBytes.this.tryIncRef(); - } - - @Override - public void incRef() { - if (tryIncRef() == false) { - throw new AlreadyClosedException("File channel is closed"); - } - } - - @Override - public boolean decRef() { - return SharedBytes.this.decRef(); - } - - @Override - public boolean hasReferences() { - return SharedBytes.this.hasReferences(); + protected void closeInternal() { + SharedBytes.this.decRef(); } @SuppressForbidden(reason = "Use positional reads on purpose")