diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 7bc881c125421..2981dc62d2ff0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -247,8 +247,7 @@ public MetadataCache getMetadataCache(MetadataSerde serde, MetadataCac @Override public CompletableFuture> get(String path) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -276,8 +275,7 @@ public CompletableFuture put(String path, byte[] value, Optional exp @Override public final CompletableFuture> getChildren(String path) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } if (!isValidPath(path)) { return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); @@ -288,8 +286,7 @@ public final CompletableFuture> getChildren(String path) { @Override public final CompletableFuture exists(String path) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } if (!isValidPath(path)) { return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); @@ -351,8 +348,7 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -401,8 +397,7 @@ private CompletableFuture deleteInternal(String path, Optional expec @Override public CompletableFuture deleteRecursive(String path) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } return getChildren(path) .thenCompose(children -> FutureUtil.waitForAll( @@ -426,8 +421,7 @@ protected abstract CompletableFuture storePut(String path, byte[] data, Op public final CompletableFuture put(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { if (isClosed()) { - return FutureUtil.failedFuture( - new MetadataStoreException.AlreadyClosedException()); + return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -507,10 +501,15 @@ protected void receivedSessionEvent(SessionEvent event) { } } - private boolean isClosed() { + protected boolean isClosed() { return isClosed.get(); } + protected static CompletableFuture alreadyClosedFailedFuture() { + return FutureUtil.failedFuture( + new MetadataStoreException.AlreadyClosedException()); + } + @Override public void close() throws Exception { executor.shutdownNow(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 39f7edd5ceed5..06f7b26053693 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -375,6 +375,9 @@ public CompletableFuture> storeGet(String path) { } try { dbStateLock.readLock().lock(); + if (isClosed()) { + return alreadyClosedFailedFuture(); + } byte[] value = db.get(optionCache, toBytes(path)); if (value == null) { return CompletableFuture.completedFuture(Optional.empty()); @@ -407,6 +410,9 @@ protected CompletableFuture> getChildrenFromStore(String path) { } try { dbStateLock.readLock().lock(); + if (isClosed()) { + return alreadyClosedFailedFuture(); + } try (RocksIterator iterator = db.newIterator(optionDontCache)) { Set result = new HashSet<>(); String firstKey = path.equals("/") ? path : path + "/"; @@ -449,6 +455,9 @@ protected CompletableFuture existsFromStore(String path) { } try { dbStateLock.readLock().lock(); + if (isClosed()) { + return alreadyClosedFailedFuture(); + } byte[] value = db.get(optionDontCache, toBytes(path)); if (log.isDebugEnabled()) { if (value != null) { @@ -471,6 +480,9 @@ protected CompletableFuture storeDelete(String path, Optional expect } try { dbStateLock.readLock().lock(); + if (isClosed()) { + return alreadyClosedFailedFuture(); + } try (Transaction transaction = db.beginTransaction(writeOptions)) { byte[] pathBytes = toBytes(path); byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true); @@ -507,6 +519,9 @@ protected CompletableFuture storePut(String path, byte[] data, Optional