Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove checkpoint tracker bit sets setting #27191

Merged
merged 2 commits into from
Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,16 @@ public class LocalCheckpointTracker {

/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
* demand and cleaning up while completed. This setting controls the size of the arrays.
* demand and cleaning up while completed. This constant controls the size of the arrays.
*/
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE =
Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope);
private static final int BIT_ARRAYS_SIZE = 1024;

/**
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
* marks the sequence number the fist bit in the first array corresponds to.
*/
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();

/**
* The size of each bit set representing processed sequence numbers.
*/
private final int bitArraysSize;

/**
* The sequence number that the first bit in the first array corresponds to.
*/
Expand All @@ -70,11 +64,10 @@ public class LocalCheckpointTracker {
* {@link SequenceNumbers#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
* or {@link SequenceNumbers#NO_OPS_PERFORMED}.
*
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbers#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbers#NO_OPS_PERFORMED}
*/
public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] "
Expand All @@ -84,7 +77,6 @@ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxS
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint;
Expand Down Expand Up @@ -183,7 +175,7 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 :
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
Expand All @@ -196,10 +188,10 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
firstProcessedSeqNo += BIT_ARRAYS_SIZE;
assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE;
current = processedSeqNo.peekFirst();
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
Expand All @@ -218,13 +210,13 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE;
if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
}
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize));
processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE));
}
return processedSeqNo.get((int) bitSetOffset);
}
Expand All @@ -239,7 +231,7 @@ private FixedBitSet getBitSetForSeqNo(final long seqNo) {
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {

logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);

final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
private static final int SMALL_CHUNK_SIZE = 4;

public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker(
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED
);
return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait - I think we need to allow testing it with a small value - to make sure multiple arrays are used? I was thinking of adding a (protected/package private/whatever) constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had a miscommunication, which we have now clarified via another channel. I have pushed so that we test with a lot of operations to cover the default size of the bit sets we are using now, as opposed to the small size that we were using before.

}

@Override
Expand Down