Skip to content

Commit

Permalink
[fix][meta] Check if metadata store is closed in RocksdbMetadataStore (
Browse files Browse the repository at this point in the history
…apache#22852)

(cherry picked from commit 7419287)
  • Loading branch information
lhotari committed Jun 6, 2024
1 parent bbc4b4c commit 54ad6cd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCac
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -276,8 +275,7 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp
@Override
public final CompletableFuture<List<String>> getChildren(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
Expand All @@ -288,8 +286,7 @@ public final CompletableFuture<List<String>> getChildren(String path) {
@Override
public final CompletableFuture<Boolean> exists(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
Expand Down Expand Up @@ -351,8 +348,7 @@ public void accept(Notification n) {
@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -401,8 +397,7 @@ private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expec
@Override
public CompletableFuture<Void> deleteRecursive(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
return getChildren(path)
.thenCompose(children -> FutureUtil.waitForAll(
Expand All @@ -426,8 +421,7 @@ protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Op
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
Expand Down Expand Up @@ -507,10 +501,15 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

private boolean isClosed() {
protected boolean isClosed() {
return isClosed.get();
}

protected static <T> CompletableFuture<T> alreadyClosedFailedFuture() {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}

@Override
public void close() throws Exception {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
byte[] value = db.get(optionCache, toBytes(path));
if (value == null) {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -407,6 +410,9 @@ protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
Set<String> result = new HashSet<>();
String firstKey = path.equals("/") ? path : path + "/";
Expand Down Expand Up @@ -449,6 +455,9 @@ protected CompletableFuture<Boolean> existsFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
byte[] value = db.get(optionDontCache, toBytes(path));
if (log.isDebugEnabled()) {
if (value != null) {
Expand All @@ -471,6 +480,9 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down Expand Up @@ -507,6 +519,9 @@ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Lo
}
try {
dbStateLock.readLock().lock();
if (isClosed()) {
return alreadyClosedFailedFuture();
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down

0 comments on commit 54ad6cd

Please sign in to comment.