Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve multiple translog generations #24015

Merged
merged 22 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ public static Type fromId(byte id) {

long seqNo();

long primaryTerm();

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
Expand Down Expand Up @@ -953,6 +955,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1104,6 +1107,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1180,6 +1184,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1446,10 +1451,10 @@ public void commit(final long committedGeneration) throws IOException {
}

private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
assert committedGeneration > current.generation
assert committedGeneration <= current.generation
: "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
assert committedGeneration < min
assert committedGeneration >= min
: "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -2185,32 +2186,38 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
public void testMinGenerationForSeqNo() throws IOException {
final long initialGeneration = translog.getGeneration().translogFileGeneration;
final int operations = randomIntBetween(1, 4096);
final List<Long> seqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);

for (int i = 0; i < operations; i++) {
final Long seqNo = seqNos.get(i);
translog.add(new Translog.NoOp(seqNo, 0, "test"));
final List<Long> shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
Randomness.shuffle(shuffledSeqNos);
final List<Tuple<Long, Long>> seqNos = new ArrayList<>();
final Map<Long, Long> terms = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this highestTermPerSeqNo?

for (final Long seqNo : shuffledSeqNos) {
seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L)));
Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList()));
seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1)));
}

for (final Tuple<Long, Long> tuple : seqNos) {
translog.add(new Translog.NoOp(tuple.v1(), tuple.v2(), "test"));
if (rarely()) {
translog.rollGeneration();
}
}

Map<Long, Set<Long>> generations = new HashMap<>();
Map<Long, Set<Tuple<Long, Long>>> generations = new HashMap<>();

translog.commit(initialGeneration);
for (long seqNo = 0; seqNo < operations; seqNo++) {
final Set<Long> seenSeqNos = new HashSet<>();
final Set<Tuple<Long, Long>> seenSeqNos = new HashSet<>();
final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
for (long g = generation; g < translog.currentFileGeneration(); g++) {
if (!generations.containsKey(g)) {
final Set<Long> generationSeenSeqNos = new HashSet<>();
final Set<Tuple<Long, Long>> generationSeenSeqNos = new HashSet<>();
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
Translog.Snapshot snapshot = reader.newSnapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
generationSeenSeqNos.add(operation.seqNo());
generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm()));
}
}
generations.put(g, generationSeenSeqNos);
Expand All @@ -2219,7 +2226,8 @@ public void testMinGenerationForSeqNo() throws IOException {
seenSeqNos.addAll(generations.get(g));
}

final Set<Long> expected = LongStream.range(seqNo, operations).boxed().collect(Collectors.toSet());
final long seqNoLowerBound = seqNo;
final Set<Tuple<Long, Long>> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet());
seenSeqNos.retainAll(expected);
assertThat(seenSeqNos, equalTo(expected));
}
Expand Down