Skip to content

Commit

Permalink
Add shard id to remote store logs (opensearch-project#8574)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi authored and dzane17 committed Jul 12, 2023
1 parent ee685a9 commit 9ad2cf9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4589,7 +4589,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
Expand All @@ -22,6 +21,7 @@
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.engine.EngineException;
Expand Down Expand Up @@ -60,7 +60,7 @@
*/
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {

private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
private final Logger logger;

/**
* The initial retry interval at which the retry job gets scheduled after a failure.
Expand Down Expand Up @@ -117,6 +117,7 @@ public RemoteStoreRefreshListener(
SegmentReplicationCheckpointPublisher checkpointPublisher,
RemoteRefreshSegmentTracker segmentTracker
) {
logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand Down Expand Up @@ -155,7 +156,7 @@ public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file));
}
}, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile);
}, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile, logger);
}

@Override
Expand Down Expand Up @@ -470,6 +471,8 @@ private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long
*/
private static class FileUploader {

private final Logger logger;

private final UploadTracker uploadTracker;

private final RemoteSegmentStoreDirectory remoteDirectory;
Expand All @@ -482,12 +485,14 @@ public FileUploader(
UploadTracker uploadTracker,
RemoteSegmentStoreDirectory remoteDirectory,
Directory storeDirectory,
CheckedFunction<String, String, IOException> checksumProvider
CheckedFunction<String, String, IOException> checksumProvider,
Logger logger
) {
this.uploadTracker = uploadTracker;
this.remoteDirectory = remoteDirectory;
this.storeDirectory = storeDirectory;
this.checksumProvider = checksumProvider;
this.logger = logger;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.util.FileSystemUtils;
Expand All @@ -32,6 +32,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
Expand All @@ -49,7 +50,7 @@
*/
public class RemoteFsTranslog extends Translog {

private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class);
private final Logger logger;
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
Expand Down Expand Up @@ -82,16 +83,19 @@ public RemoteFsTranslog(
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);
try {
download(translogTransferManager, location);
download(translogTransferManager, location, logger);
Checkpoint checkpoint = readCheckpoint(location);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId);
logger.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
boolean success = false;
current = null;
Expand Down Expand Up @@ -120,8 +124,13 @@ public RemoteFsTranslog(
}
}

public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, Logger logger)
throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
"%s repository should be instance of BlobStoreRepository",
shardId
);
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
Expand All @@ -130,11 +139,11 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location);
RemoteFsTranslog.download(translogTransferManager, location, logger);
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId());
public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.trace("Downloading translog files from remote");
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand All @@ -156,7 +165,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId());
logger.trace("Downloaded translog files from remote");
}

public static TranslogTransferManager buildTranslogTransferManager(
Expand Down Expand Up @@ -321,8 +330,8 @@ public boolean syncNeeded() {

@Override
public void close() throws IOException {
assert Translog.calledFromOutsideOrViaTragedyClose()
: "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
assert Translog.calledFromOutsideOrViaTragedyClose() : shardId
+ "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
sync();
Expand All @@ -340,12 +349,14 @@ protected long getMinReferencedGen() throws IOException {
minGenerationForSeqNo(minSeqNoToKeep, current, readers)
);

assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
assert minReferencedGen >= getMinFileGeneration() : shardId
+ " deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
+ getMinFileGeneration()
+ "]";
assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of ["
assert minReferencedGen <= currentFileGeneration() : shardId
+ " deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] which is higher than the current generation ["
+ currentFileGeneration()
Expand All @@ -356,7 +367,7 @@ protected long getMinReferencedGen() throws IOException {
protected void setMinSeqNoToKeep(long seqNo) {
if (seqNo < this.minSeqNoToKeep) {
throw new IllegalArgumentException(
"min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
shardId + " min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
);
}
this.minSeqNoToKeep = seqNo;
Expand Down Expand Up @@ -416,7 +427,7 @@ private void deleteStaleRemotePrimaryTerms() {
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
// First we delete all stale primary terms folders from remote store
assert readers.isEmpty() == false : "Expected non-empty readers";
assert readers.isEmpty() == false : shardId + " Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
Expand All @@ -21,6 +20,7 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -61,8 +61,7 @@ public class TranslogTransferManager {

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class);

private final Logger logger;
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

Expand All @@ -84,6 +83,7 @@ public TranslogTransferManager(
this.remoteDataTransferPath = remoteBaseTransferPath.add(DATA_DIR);
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
}

public ShardId getShardId() {
Expand Down Expand Up @@ -200,7 +200,7 @@ public TranslogTransferMetadata readMetadata() throws IOException {
exceptionSetOnce.set(e);
}
}, e -> {
logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e);
logger.error(() -> new ParameterizedMessage("Exception while listing metadata files"), e);
exceptionSetOnce.set((IOException) e);
}),
latch
Expand Down Expand Up @@ -295,7 +295,7 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
* @param minPrimaryTermToKeep all primary terms below this primary term are deleted.
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId);
logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep);
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Expand Down Expand Up @@ -333,7 +333,7 @@ private void deletePrimaryTermAsync(long primaryTerm) {
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted primary term {} for {}", primaryTerm, shardId);
logger.info("Deleted primary term {}", primaryTerm);
}

@Override
Expand All @@ -349,12 +349,12 @@ public void delete() {
transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted all remote translog data for {}", shardId);
logger.info("Deleted all remote translog data");
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while cleaning translog ", e);
logger.error("Exception occurred while cleaning translog", e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void setUp() throws Exception {
primaryTerm = randomNonNegativeLong();
generation = randomNonNegativeLong();
shardId = mock(ShardId.class);
when(shardId.getIndex()).thenReturn(new Index("index", "indexUUid"));
minTranslogGeneration = randomLongBetween(0, generation);
remoteBaseTransferPath = new BlobPath().add("base_path");
transferService = mock(TransferService.class);
Expand Down

0 comments on commit 9ad2cf9

Please sign in to comment.