Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve multiple translog generations #24015

Merged
merged 22 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
Expand Down Expand Up @@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I so wish we could come up with a better name for this one - I find it confusing every time. How about translogGenForRecovery?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment, this is a new local variable that was never there before. Did you maybe mean to comment on Translog#lastCommittedTranslogFileGeneration which we have discussed the confusing nature of before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of them suffer from the same problem. This is just the first one I run into while reviewing. I think I get why it's named like this now (the translog id that was committed into lucene's metadata) but there is much room for ambiguity - is the the maximum generation for which all ops are in lucene (i.e., committed)? I wonder every time and end up double checking the code. If people find this name better, I'm good with keeping it - it's a subjective matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One naming thing that gets to me is prepareCommit and commit in Translog. I don't think commit commits anything. It trims segments based on a commit in permanent storage. I wonder if it'd make the naming easier to stop using the word "commit" for the thing that the Translog is doing. Then any time you see the word "commit" it is about a lucene commit. If you renamed Translog#commit into Translog#updatePersistedGeneration or something then it'd be more clear that it is reacting to a commit in Lucene. Then maybe the other variable names just fall out of that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to focus on the naming in a small follow-up PR immediately after this PR.

logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
translog.commit();
translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
Expand Down Expand Up @@ -1680,55 +1680,62 @@ protected void doRun() throws Exception {
}
}

private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
/**
* Commits the specified index writer.
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @return the local checkpoint committed with the specified index writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this is correct anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 0763e29c6486a566344a0e8b5e0868db4932623d.

* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we bring these local vars back it's easier to reason about the code when it's not inline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 5f3d2d8.


final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
final Map<String, String> commitData = new HashMap<>(5);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

writer.commit();
} catch (Exception ex) {
return translogGeneration.translogFileGeneration;
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (AssertionError e) {
// IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
// randomly throw FNFE/NSFE can also hit this:
} catch (final AssertionError e) {
/*
* If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
* throw FileNotFoundException or NoSuchFileException can also hit this.
*/
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
} catch (Exception inner) {
} catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
Expand Down Expand Up @@ -1812,7 +1819,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
Map<String, String> commitData = new HashMap<>(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eye.

for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
Expand Down
85 changes: 57 additions & 28 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
* generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a <tt>translog-${gen}.ckp</tt> associated with it which is an fsynced copy of the it's last <tt>translog.ckp</tt> such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
* </p>
*/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {

/*
* TODO
Expand Down Expand Up @@ -1347,6 +1347,28 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}

/**
* Gets the minimum generation that could contain the sequence number, or the current generation if there is no generation with the
* specified sequence number between the minimum and maximum sequence numbers.
*
* @param seqNo the sequence number
* @return the minimum generation for the sequence number, or the current generation
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
final long minTranslogFileGeneration = readers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just use a for loop and Math.min this is much harder for me to read instead of:

long minTranslogFileGeneration = this.currentFileGeneration;
for (Reader r : readers) {
  if (seqNo <= r.getCheckpoint().maxSeqId) {
     minTranslogFileGeneration = Math.min(minTranslogFileGeneration, r. getGeneration());
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 435fe25.

.stream()
.filter(r -> {
final Checkpoint checkpoint = r.getCheckpoint();
return checkpoint.minSeqNo <= seqNo && seqNo <= checkpoint.maxSeqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... why do we need to check the lower bound? isn't it simpler to check seqNo <= checkpoint.maxSeqNo alone? I read the logic to be "min generation that has ops that are required if we recover from seqNo onwards", which maps to "has ops about this point"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that was addressed by c63543b which I think I pushed while you were reviewing.

})
.mapToLong(TranslogReader::getGeneration)
.min()
.orElseGet(this::currentFileGeneration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. When is this or else actually triggers? I guess it can only happens if you potentially have bwc readers and and one or more empty current ones. Should we assert for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen because we commit when we open the translog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear. This is maybe worth a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another point is that when we are flushing the engine, we ask the translog for the minimum generation for the local checkpoint plus one which might not yet be in the translog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's another very good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 435fe25.

return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
}
}

/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
Expand Down Expand Up @@ -1375,25 +1397,35 @@ public void rollGeneration() throws IOException {
}
}

@Override
public long prepareCommit() throws IOException {
/**
* Prepares a translog commit by setting the current committing generation and rolling the translog generation.
*
* @throws IOException if an I/O exception occurred while rolling the translog generation
*/
public void prepareCommit() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we don't use the 2phase commit interface anymore we can rename these methods IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's do this in a small PR that immediately follows this PR.

try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
final String message = String.format(
Locale.ROOT,
"already committing a translog with generation [%d]",
currentCommittingGeneration);
final String message =
String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
return 0;
}

@Override
public long commit() throws IOException {
/**
* Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
* will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
*
* If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
* generation to be rolled.
*
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
* @throws IOException if an I/O exception occurred preparing the translog commit
*/
public void commit(final long committedGeneration) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this parameter name is confusing given that we have currentCommittingGeneration. See previous suggestion for a name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's focus on naming in a folow-up PR.

try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration == NOT_SET_GENERATION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this makes sense anymore? should we just forbid it with an exception? or better yet require that the generation given to use is <= current and be done with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that we should require preparing a commit (invoking Translog#prepareCommit) before committing (invoking Translog#commit)?

Expand All @@ -1403,26 +1435,30 @@ public long commit() throws IOException {
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
return 0;
}

/**
* Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
* and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
*/
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event - don't delete anything
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
long minReferencedGen = Math.min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did something change here that is related it's hard to tell maybe we can revert this unnecessary modification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a small improvement here which is removing the need for the extra local variable finalMinReferencedGen and so avoiding overwriting minReferencedGen. I find it easier to read this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

lastCommittedTranslogFileGeneration,
outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
final List<TranslogReader> unreferenced =
readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
final Path translogPath = unreferencedReader.path();
logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
Expand All @@ -1442,13 +1478,6 @@ void closeFilesIfNoPendingViews() throws IOException {
}
}


@Override
public void rollback() throws IOException {
ensureOpen();
close();
}

/**
* References a transaction log generation
*/
Expand Down
Loading