diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9d4d30b066f1f..ed686bf9236e5 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -120,7 +120,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, IndexSettings.INDEX_CHECK_ON_STARTUP, - LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE, IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD, IndexSettings.MAX_SLICES_PER_SCROLL, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 6a7844057fd00..5380a3b2b7f90 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -34,10 +34,9 @@ public class LocalCheckpointTracker { /** * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on - * demand and cleaning up while completed. This setting controls the size of the arrays. + * demand and cleaning up while completed. This constant controls the size of the arrays. */ - public static Setting SETTINGS_BIT_ARRAYS_SIZE = - Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope); + static final int BIT_ARRAYS_SIZE = 1024; /** * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which @@ -45,11 +44,6 @@ public class LocalCheckpointTracker { */ final LinkedList processedSeqNo = new LinkedList<>(); - /** - * The size of each bit set representing processed sequence numbers. - */ - private final int bitArraysSize; - /** * The sequence number that the first bit in the first array corresponds to. */ @@ -70,11 +64,10 @@ public class LocalCheckpointTracker { * {@link SequenceNumbers#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint, * or {@link SequenceNumbers#NO_OPS_PERFORMED}. * - * @param indexSettings the index settings * @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbers#NO_OPS_PERFORMED} * @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbers#NO_OPS_PERFORMED} */ - public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { + public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) { if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { throw new IllegalArgumentException( "local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] " @@ -84,7 +77,6 @@ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxS throw new IllegalArgumentException( "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); } - bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; checkpoint = localCheckpoint; @@ -183,7 +175,7 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep @SuppressForbidden(reason = "Object#notifyAll") private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : + assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 : "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; @@ -196,10 +188,10 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) checkpoint++; // the checkpoint always falls in the first bit set or just before. If it falls // on the last bit of the current bit set, we can clean it. - if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { + if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) { processedSeqNo.removeFirst(); - firstProcessedSeqNo += bitArraysSize; - assert checkpoint - firstProcessedSeqNo < bitArraysSize; + firstProcessedSeqNo += BIT_ARRAYS_SIZE; + assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE; current = processedSeqNo.peekFirst(); } } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); @@ -218,13 +210,13 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) private FixedBitSet getBitSetForSeqNo(final long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; - final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; + final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE; if (bitSetOffset > Integer.MAX_VALUE) { throw new IndexOutOfBoundsException( "sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); } while (bitSetOffset >= processedSeqNo.size()) { - processedSeqNo.add(new FixedBitSet(bitArraysSize)); + processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE)); } return processedSeqNo.get((int) bitSetOffset); } @@ -239,7 +231,7 @@ private FixedBitSet getBitSetForSeqNo(final long seqNo) { private int seqNoToBitSetOffset(final long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo; - return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; + return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE; } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 1c0b320558400..1b46eedacc457 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -56,7 +56,7 @@ public SequenceNumbersService( final long localCheckpoint, final long globalCheckpoint) { super(shardId, indexSettings); - localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint); + localCheckpointTracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint); globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c717e29353b65..5f692d8e8f5fa 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -243,7 +243,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index ab513c787c3cb..ae167bb59f04b 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -21,10 +21,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; import java.util.ArrayList; @@ -38,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -46,19 +45,8 @@ public class LocalCheckpointTrackerTests extends ESTestCase { private LocalCheckpointTracker tracker; - private static final int SMALL_CHUNK_SIZE = 4; - public static LocalCheckpointTracker createEmptyTracker() { - return new LocalCheckpointTracker( - IndexSettingsModule.newIndexSettings( - "test", - Settings - .builder() - .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build()), - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED - ); + return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); } @Override @@ -98,7 +86,7 @@ public void testSimpleReplica() { public void testSimpleOverFlow() { List seqNoList = new ArrayList<>(); final boolean aligned = randomBoolean(); - final int maxOps = SMALL_CHUNK_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, SMALL_CHUNK_SIZE - 1)); + final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1)); for (int i = 0; i < maxOps; i++) { seqNoList.add(i); @@ -109,7 +97,7 @@ public void testSimpleOverFlow() { } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testConcurrentPrimary() throws InterruptedException { @@ -150,7 +138,7 @@ protected void doRun() throws Exception { tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testConcurrentReplica() throws InterruptedException { @@ -198,7 +186,7 @@ protected void doRun() throws Exception { assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {