Skip to content

Commit

Permalink
Downgrade writelock to readlock during translog upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 21, 2023
1 parent 3369069 commit 1bec919
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -237,7 +236,7 @@ public static TranslogTransferManager buildTranslogTransferManager(

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
try {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
Expand Down Expand Up @@ -270,6 +269,11 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1);
current = createWriter(current.getGeneration() + 1);
}
assert writeLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock";
// Here we are downgrading the write lock by acquiring the read lock and releasing the write lock
// This ensures that other threads can still acquire the read locks while also protecting the
// readers and writer to not be mutated any further.
readLock.acquire();
} catch (final Exception e) {
tragedy.setTragicException(e);
closeOnTragicEvent(e);
Expand All @@ -278,7 +282,10 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
} else if (generation < current.getGeneration()) {
return false;
}
}

assert readLock.isHeldByCurrentThread() == true;
try (Releasable ignored = readLock; Releasable ignoredGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration())) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
Expand Down Expand Up @@ -317,10 +324,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
Translog::getCommitCheckpointFileName
).build()
) {
Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration()));
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm)
new RemoteFsTranslogTransferListener(generation, primaryTerm)
);
}

Expand Down Expand Up @@ -499,16 +505,17 @@ protected void onDelete() {
translogTransferManager.delete();
}

// Visible for testing
boolean isRemoteGenerationDeletionPermitsAvailable() {
return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS;
}

/**
* TranslogTransferListener implementation for RemoteFsTranslog
*
* @opensearch.internal
*/
private class RemoteFsTranslogTransferListener implements TranslogTransferListener {
/**
* Releasable instance for the translog
*/
private final Releasable transferReleasable;

/**
* Generation for the translog
Expand All @@ -520,25 +527,20 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
*/
private final Long primaryTerm;

RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) {
this.transferReleasable = transferReleasable;
RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) {
this.generation = generation;
this.primaryTerm = primaryTerm;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

public class RemoteFsTranslogTests extends OpenSearchTestCase {

protected final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down Expand Up @@ -621,6 +620,7 @@ public void testSimpleOperationsUpload() throws Exception {

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
Expand Down

0 comments on commit 1bec919

Please sign in to comment.