Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing unblock condition for index create block #9437

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))
- Fix range reads in respository-s3 ([9512](https://github.com/opensearch-project/OpenSearch/issues/9512))
- Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291))
- Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,21 @@ public void onNewInfo(ClusterInfo info) {
if ((state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()) == false)
&& nodes.size() > 0
&& nodesOverHighThreshold.size() == nodes.size()) {
logger.warn(
"Putting index create block on cluster as all nodes are breaching high disk watermark. "
+ "Number of nodes above high watermark: {}.",
nodesOverHighThreshold.size()
);
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()) {
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()
&& nodesOverHighThreshold.size() < nodes.size()) {
logger.warn(
"Removing index create block on cluster as all nodes are no longer breaching high disk watermark. "
+ "Number of nodes above high watermark: {}. Total numbers of nodes: {}.",
nodesOverHighThreshold.size(),
nodes.size()
);
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -67,6 +69,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -581,12 +584,16 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
);

advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
monitor,
aboveHighWatermark,
final List<String> messages = new ArrayList<>();
messages.add(
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
);
messages.add(
"Putting index create block on cluster as all nodes are breaching high disk watermark. "
+ "Number of nodes above high watermark: 1."
);
assertMultipleWarningMessages(monitor, aboveHighWatermark, messages);

advanceTime.set(true);
assertRepeatedWarningMessages(
Expand All @@ -605,22 +612,11 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC

relocatingShardSizeRef.set(-5L);
advanceTime.set(true);
assertSingleInfoMessage(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to be below the high disk watermark when these relocations are complete"
);

relocatingShardSizeRef.set(0L);
timeSupplier.getAsLong(); // advance time long enough to do another reroute
advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
);
assertMultipleWarningMessages(monitor, aboveHighWatermark, messages);

advanceTime.set(true);
assertRepeatedWarningMessages(
Expand Down Expand Up @@ -722,6 +718,113 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
assertTrue(countBlocksCalled.get() == 0);
}

public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() {
AllocationService allocation = createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.blocks.create_index.enabled", false)
.build()
);
Metadata metadata = Metadata.builder()
.put(
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
IndexMetadata.builder("test")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node2"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_1")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_2")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.addAsNew(metadata.index("test_1"))
.addAsNew(metadata.index("test_2"))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.blocks(ClusterBlocks.builder().addGlobalBlock(Metadata.CLUSTER_CREATE_INDEX_BLOCK).build())
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build(),
allocation
);
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicInteger countBlocksCalled = new AtomicInteger();
AtomicInteger countUnblockBlocksCalled = new AtomicInteger();
AtomicLong currentTime = new AtomicLong();
Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
currentTime::get,
(reason, priority, listener) -> {
listener.onResponse(null);
}
) {

@Override
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
assertTrue(readOnly);
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
if (indexCreateBlock == true) {
countBlocksCalled.set(countBlocksCalled.get() + 1);
} else {
countUnblockBlocksCalled.set(countUnblockBlocksCalled.get() + 1);
}

listener.onResponse(null);
}
};

Map<String, DiskUsage> builder = new HashMap<>();

// Initially all the nodes are breaching high watermark and IndexCreateBlock is already present on the cluster.
// Since block is already present, DiskThresholdMonitor should not again try to apply block.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 9));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 9));
monitor.onNewInfo(clusterInfo(builder));
// Since Block is already present and nodes are below high watermark so neither block nor unblock will be called.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

// Ensure DiskThresholdMonitor does not try to remove block in the next iteration if all nodes are breaching high watermark.
monitor.onNewInfo(clusterInfo(builder));
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

builder = new HashMap<>();

// If any node is no longer breaching high watermark, DiskThresholdMonitor should remove IndexCreateBlock.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 19));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 1));
// Need to add delay in current time to allow nodes to be removed high watermark list.
currentTime.addAndGet(randomLongBetween(60001, 120000));

monitor.onNewInfo(clusterInfo(builder));
// Block will be removed if any nodes is no longer breaching high watermark.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 1);
}

private void assertNoLogging(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages) throws IllegalAccessException {
try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) {
mockAppender.addExpectation(
Expand Down Expand Up @@ -756,10 +859,11 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, final M
}
}

private void assertSingleWarningMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
private void assertMultipleWarningMessages(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, List<String> messages)
throws IllegalAccessException {
assertLogging(monitor, diskUsages, Level.WARN, message);
assertNoLogging(monitor, diskUsages);
for (int index = 0; index < messages.size(); index++) {
assertLogging(monitor, diskUsages, Level.WARN, messages.get(index));
}
}

private void assertSingleInfoMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
Expand Down
Loading