Skip to content

Commit

Permalink
Merge branch 'main' into validate-index-name
Browse files Browse the repository at this point in the history
  • Loading branch information
fang-xing-esql committed Aug 23, 2024
2 parents 316f204 + 14b7170 commit f675063
Show file tree
Hide file tree
Showing 28 changed files with 247 additions and 75 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/109414.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 109414
summary: Don't fail retention lease sync actions due to capacity constraints
area: CRUD
type: bug
issues:
- 105926
12 changes: 6 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.mixed.FieldExtractorIT
method: testScaledFloat
issue: https://github.com/elastic/elasticsearch/issues/112003
- class: org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT
method: testForceSleepsProfile {SYNC}
issue: https://github.com/elastic/elasticsearch/issues/112039
- class: org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT
method: testForceSleepsProfile {ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/112049
- class: org.elasticsearch.xpack.inference.InferenceRestIT
method: test {p0=inference/80_random_rerank_retriever/Random rerank retriever predictably shuffles results}
issue: https://github.com/elastic/elasticsearch/issues/111999
Expand All @@ -182,6 +176,12 @@ tests:
- class: org.elasticsearch.xpack.esql.EsqlAsyncSecurityIT
method: testLimitedPrivilege
issue: https://github.com/elastic/elasticsearch/issues/112110
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {stats.ByTwoCalculatedSecondOverwrites SYNC}
issue: https://github.com/elastic/elasticsearch/issues/112117
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {stats.ByTwoCalculatedSecondOverwritesReferencingFirst SYNC}
issue: https://github.com/elastic/elasticsearch/issues/112118

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BackgroundRetentionLeaseSyncActionIT extends ESIntegTestCase {

public void testActionCompletesWhenReplicaCircuitBreakersAreAtCapacity() throws Exception {
internalCluster().startMasterOnlyNodes(1);
String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate("test").setSettings(
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
);

String replica = internalCluster().startDataOnlyNode();
ensureGreen("test");

try (var ignored = fullyAllocateCircuitBreakerOnNode(replica, CircuitBreaker.IN_FLIGHT_REQUESTS)) {
final ClusterState state = internalCluster().clusterService().state();
final Index testIndex = resolveIndex("test");
final ShardId testIndexShardZero = new ShardId(testIndex, 0);
final String testLeaseId = "test-lease/123";
RetentionLeases newLeases = addTestLeaseToRetentionLeases(primary, testIndex, testLeaseId);
internalCluster().getInstance(RetentionLeaseSyncer.class, primary)
.backgroundSync(
testIndexShardZero,
state.routingTable().shardRoutingTable(testIndexShardZero).primaryShard().allocationId().getId(),
state.term(),
newLeases
);

// Wait for test lease to appear on replica
IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class, replica);
assertBusy(() -> {
RetentionLeases retentionLeases = replicaIndicesService.indexService(testIndex).getShard(0).getRetentionLeases();
assertTrue(retentionLeases.contains(testLeaseId));
});
}
}

private static RetentionLeases addTestLeaseToRetentionLeases(String primaryNodeName, Index index, String leaseId) {
IndicesService primaryIndicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName);
RetentionLeases currentLeases = primaryIndicesService.indexService(index).getShard(0).getRetentionLeases();
RetentionLease newLease = new RetentionLease(leaseId, 0, System.currentTimeMillis(), "test source");
return new RetentionLeases(
currentLeases.primaryTerm(),
currentLeases.version() + 1,
Stream.concat(currentLeases.leases().stream(), Stream.of(newLease)).toList()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RetentionLeaseSyncActionIT extends ESIntegTestCase {

public void testActionCompletesWhenReplicaCircuitBreakersAreAtCapacity() {
internalCluster().startMasterOnlyNodes(1);
String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate("test").setSettings(
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
);

String replica = internalCluster().startDataOnlyNode();
ensureGreen("test");

try (var ignored = fullyAllocateCircuitBreakerOnNode(replica, CircuitBreaker.IN_FLIGHT_REQUESTS)) {
assertThatRetentionLeaseSyncCompletesSuccessfully(primary);
}
}

public void testActionCompletesWhenPrimaryIndexingPressureIsAtCapacity() {
internalCluster().startMasterOnlyNodes(1);
String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate("test").setSettings(
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
);

String replica = internalCluster().startDataOnlyNode();
ensureGreen("test");

try (Releasable ignored = fullyAllocatePrimaryIndexingCapacityOnNode(primary)) {
assertThatRetentionLeaseSyncCompletesSuccessfully(primary);
}
}

private static void assertThatRetentionLeaseSyncCompletesSuccessfully(String primaryNodeName) {
RetentionLeaseSyncer instance = internalCluster().getInstance(RetentionLeaseSyncer.class, primaryNodeName);
PlainActionFuture<ReplicationResponse> retentionLeaseSyncResult = new PlainActionFuture<>();
ClusterState state = internalCluster().clusterService().state();
ShardId testIndexShardZero = new ShardId(resolveIndex("test"), 0);
ShardRouting primaryShard = state.routingTable().shardRoutingTable(testIndexShardZero).primaryShard();
instance.sync(
testIndexShardZero,
primaryShard.allocationId().getId(),
state.term(),
RetentionLeases.EMPTY,
retentionLeaseSyncResult
);
safeGet(retentionLeaseSyncResult);
}

/**
* Fully allocate primary indexing capacity on a node
*
* @param targetNode The name of the node on which to allocate
* @return A {@link Releasable} which will release the capacity when closed
*/
private static Releasable fullyAllocatePrimaryIndexingCapacityOnNode(String targetNode) {
return internalCluster().getInstance(IndexingPressure.class, targetNode)
.markPrimaryOperationStarted(
1,
IndexingPressure.MAX_INDEXING_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
true
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RetentionLeaseBackgroundSyncAction(
threadPool.executor(ThreadPool.Names.MANAGEMENT),
SyncGlobalCheckpointAfterOperation.DoNotSync,
PrimaryActionExecution.RejectOnOverload,
ReplicaActionExecution.SubjectToCircuitBreaker
ReplicaActionExecution.BypassCircuitBreaker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ public RetentionLeaseSyncAction(
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
new ManagementOnlyExecutorFunction(threadPool),
PrimaryActionExecution.RejectOnOverload,
PrimaryActionExecution.Force,
indexingPressure,
systemIndices,
ReplicaActionExecution.SubjectToCircuitBreaker
ReplicaActionExecution.BypassCircuitBreaker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.hamcrest.StringDescription;
import org.hamcrest.TypeSafeMatcher;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -292,17 +294,24 @@ static void describeEntryUnexepectedButOk(Object value, Description description)
}

static void describeEntryValue(int keyWidth, Matcher<?> matcher, Object v, Description description) {
if (v instanceof Map && matcher instanceof MapMatcher) {
((MapMatcher) matcher).describePotentialMismatch(keyWidth + INDENT, (Map<?, ?>) v, description);
if (v instanceof Map && matcher instanceof MapMatcher mm) {
mm.describePotentialMismatch(keyWidth + INDENT, (Map<?, ?>) v, description);
return;
}
if (v instanceof List && matcher instanceof ListMatcher) {
((ListMatcher) matcher).describePotentialMismatch(keyWidth + INDENT, (List<?>) v, description);
if (v instanceof List && matcher instanceof ListMatcher lm) {
lm.describePotentialMismatch(keyWidth + INDENT, (List<?>) v, description);
return;
}
if (false == matcher.matches(v)) {
description.appendText("expected ").appendDescriptionOf(matcher).appendText(" but ");
matcher.describeMismatch(v, description);
try {
description.appendText("expected ").appendDescriptionOf(matcher).appendText(" but ");
matcher.describeMismatch(v, description);
} catch (Exception e) {
description.appendText("error describing ");
StringWriter trace = new StringWriter();
e.printStackTrace(new PrintWriter(trace));
description.appendValue(trace);
}
return;
}
description.appendValue(v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.StringDescription;
import org.hamcrest.TypeSafeMatcher;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -24,7 +26,9 @@
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

Expand Down Expand Up @@ -395,6 +399,45 @@ public void testSubMapDescribeTo() {
baz: <0>"""));
}

public void testSubMatcherDescribeFails() {
assertMismatch(Map.of("foo", 2.0, "bar", 2), matchesMap().entry("foo", new TypeSafeMatcher<Object>() {
@Override
public void describeTo(Description description) {
throw new IllegalStateException("intentional failure");
}

@Override
protected boolean matchesSafely(Object o) {
return false;
}
}).entry("bar", 2), both(containsString("""
a map containing
foo: expected error describing <java.lang.IllegalStateException: intentional failure""")).and(containsString("""
bar: <2>""")));
}

public void testSubMatcherMismatchFails() {
assertMismatch(Map.of("foo", 2.0, "bar", 2), matchesMap().entry("foo", new TypeSafeMatcher<Object>() {
@Override
protected void describeMismatchSafely(Object item, Description mismatchDescription) {
throw new IllegalStateException("intentional failure");
}

@Override
public void describeTo(Description description) {
description.appendValue("foo");
}

@Override
protected boolean matchesSafely(Object o) {
return false;
}
}).entry("bar", 2), both(containsString("""
a map containing
foo: expected "foo" but error describing <java.lang.IllegalStateException: intentional failure""")).and(containsString("""
bar: <2>""")));
}

static <T> void assertMismatch(T v, Matcher<? super T> matcher, Matcher<String> mismatchDescriptionMatcher) {
assertMap(v, not(matcher));
StringDescription description = new StringDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,3 @@ template:
index:
default_pipeline: logs-apm.app@default-pipeline
final_pipeline: apm@pipeline
lifecycle:
name: logs-apm.app_logs-default_policy
prefer_ilm: false
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,3 @@ template:
index:
default_pipeline: logs-apm.error@default-pipeline
final_pipeline: apm@pipeline
lifecycle:
name: logs-apm.error_logs-default_policy
prefer_ilm: false
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@ template:
index:
default_pipeline: metrics-apm.app@default-pipeline
final_pipeline: metrics-apm@pipeline
lifecycle:
name: metrics-apm.app_metrics-default_policy
prefer_ilm: false
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ template:
index:
default_pipeline: metrics-apm.internal@default-pipeline
final_pipeline: metrics-apm@pipeline
lifecycle:
name: metrics-apm.internal_metrics-default_policy
prefer_ilm: false
mappings:
properties:
data_stream.dataset:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ template:
index:
default_pipeline: metrics-apm.service_destination@default-pipeline
final_pipeline: metrics-apm@pipeline
lifecycle:
name: metrics-apm.service_destination_10m_metrics-default_policy
prefer_ilm: false
mappings:
properties:
metricset.interval:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ template:
index:
default_pipeline: metrics-apm.service_destination@default-pipeline
final_pipeline: metrics-apm@pipeline
lifecycle:
name: metrics-apm.service_destination_1m_metrics-default_policy
prefer_ilm: false
mappings:
properties:
metricset.interval:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ template:
index:
default_pipeline: metrics-apm.service_destination@default-pipeline
final_pipeline: metrics-apm@pipeline
lifecycle:
name: metrics-apm.service_destination_60m_metrics-default_policy
prefer_ilm: false
mappings:
properties:
metricset.interval:
Expand Down
Loading

0 comments on commit f675063

Please sign in to comment.