From f048225d6241579e05681c74d2674b69e24db67e Mon Sep 17 00:00:00 2001 From: Shan Swanlow <40004835+shans96@users.noreply.github.com> Date: Thu, 1 Feb 2024 10:14:06 +0000 Subject: [PATCH 01/86] Share field mapper tests with other modules (#99142) --- docs/changelog/99142.yaml | 6 +++ .../extras/ScaledFloatFieldMapperTests.java | 43 +++++++++++++++++-- .../index/mapper/ByteFieldMapperTests.java | 11 +++-- .../index/mapper/DoubleFieldMapperTests.java | 29 ++++++++++--- .../index/mapper/FloatFieldMapperTests.java | 13 +++--- .../mapper/HalfFloatFieldMapperTests.java | 15 +++---- .../index/mapper/IntegerFieldMapperTests.java | 11 +++-- .../index/mapper/LongFieldMapperTests.java | 15 +++---- .../index/mapper/ShortFieldMapperTests.java | 11 +++-- .../index/mapper/NumberFieldMapperTests.java | 7 ++- .../mapper/NumberTypeOutOfRangeSpec.java | 41 ++++++++++++++++++ .../mapper/WholeNumberFieldMapperTests.java | 0 .../UnsignedLongFieldMapperTests.java | 39 ++++++++++++++++- 13 files changed, 184 insertions(+), 57 deletions(-) create mode 100644 docs/changelog/99142.yaml rename {server/src/test => test/framework/src/main}/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java (98%) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/mapper/NumberTypeOutOfRangeSpec.java rename {server/src/test => test/framework/src/main}/java/org/elasticsearch/index/mapper/WholeNumberFieldMapperTests.java (100%) diff --git a/docs/changelog/99142.yaml b/docs/changelog/99142.yaml new file mode 100644 index 0000000000000..885946cec909b --- /dev/null +++ b/docs/changelog/99142.yaml @@ -0,0 +1,6 @@ +pr: 99142 +summary: Reuse number field mapper tests in other modules +area: Search +type: enhancement +issues: + - 92947 diff --git a/modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapperTests.java b/modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapperTests.java index efdf3c09bbe92..d6eb55dfb23e4 100644 --- a/modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapperTests.java +++ b/modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapperTests.java @@ -21,7 +21,8 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.MapperTestCase; +import org.elasticsearch.index.mapper.NumberFieldMapperTests; +import org.elasticsearch.index.mapper.NumberTypeOutOfRangeSpec; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.TimeSeriesParams; @@ -35,6 +36,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.function.Function; @@ -45,7 +47,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notANumber; -public class ScaledFloatFieldMapperTests extends MapperTestCase { +public class ScaledFloatFieldMapperTests extends NumberFieldMapperTests { @Override protected Collection getPlugins() { @@ -199,7 +201,7 @@ public void testStore() throws Exception { assertEquals(1230, storedField.numericValue().longValue()); } - public void testCoerce() throws Exception { + public void testCoerce() throws IOException { DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); ParsedDocument doc = mapper.parse( new SourceToParse( @@ -452,6 +454,41 @@ protected IngestScriptSupport ingestScriptSupport() { throw new AssumptionViolatedException("not supported"); } + @Override + protected List outOfRangeSpecs() { + // No outOfRangeSpecs are specified because ScaledFloatFieldMapper doesn't extend NumberFieldMapper and doesn't use a + // NumberFieldMapper.NumberType that is present in OutOfRangeSpecs + return Collections.emptyList(); + } + + @Override + public void testIgnoreMalformedWithObject() {} // TODO: either implement this, remove it, or update ScaledFloatFieldMapper's behaviour + + @Override + public void testAllowMultipleValuesField() {} // TODO: either implement this, remove it, or update ScaledFloatFieldMapper's behaviour + + @Override + public void testScriptableTypes() {} // TODO: either implement this, remove it, or update ScaledFloatFieldMapper's behaviour + + @Override + public void testDimension() {} // TODO: either implement this, remove it, or update ScaledFloatFieldMapper's behaviour + + @Override + protected Number missingValue() { + return 0.123; + } + + @Override + protected Number randomNumber() { + /* + * The source parser and doc values round trip will both reduce + * the precision to 32 bits if the value is more precise. + * randomDoubleBetween will smear the values out across a wide + * range of valid values. + */ + return randomBoolean() ? randomDoubleBetween(-Float.MAX_VALUE, Float.MAX_VALUE, true) : randomFloat(); + } + public void testEncodeDecodeExactScalingFactor() { double v = randomValue(); assertThat(encodeDecode(1 / v, v), equalTo(1 / v)); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/ByteFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/ByteFieldMapperTests.java index e1c4043f42963..883fdfb132b80 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/ByteFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/ByteFieldMapperTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -23,12 +22,12 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberType.BYTE, "128", "is out of range for a byte"), - OutOfRangeSpec.of(NumberType.BYTE, "-129", "is out of range for a byte"), - OutOfRangeSpec.of(NumberType.BYTE, 128, "is out of range for a byte"), - OutOfRangeSpec.of(NumberType.BYTE, -129, "is out of range for a byte") + NumberTypeOutOfRangeSpec.of(NumberType.BYTE, "128", "is out of range for a byte"), + NumberTypeOutOfRangeSpec.of(NumberType.BYTE, "-129", "is out of range for a byte"), + NumberTypeOutOfRangeSpec.of(NumberType.BYTE, 128, "is out of range for a byte"), + NumberTypeOutOfRangeSpec.of(NumberType.BYTE, -129, "is out of range for a byte") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DoubleFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DoubleFieldMapperTests.java index b6b9dfcfea9ff..a04712bd1b16b 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DoubleFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DoubleFieldMapperTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.index.mapper; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.script.DoubleFieldScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContext; @@ -27,13 +26,29 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, "1.7976931348623157E309", "[double] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, "-1.7976931348623157E309", "[double] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, Double.NaN, "[double] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, Double.POSITIVE_INFINITY, "[double] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, Double.NEGATIVE_INFINITY, "[double] supports only finite values") + NumberTypeOutOfRangeSpec.of( + NumberFieldMapper.NumberType.DOUBLE, + "1.7976931348623157E309", + "[double] supports only finite values" + ), + NumberTypeOutOfRangeSpec.of( + NumberFieldMapper.NumberType.DOUBLE, + "-1.7976931348623157E309", + "[double] supports only finite values" + ), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.DOUBLE, Double.NaN, "[double] supports only finite values"), + NumberTypeOutOfRangeSpec.of( + NumberFieldMapper.NumberType.DOUBLE, + Double.POSITIVE_INFINITY, + "[double] supports only finite values" + ), + NumberTypeOutOfRangeSpec.of( + NumberFieldMapper.NumberType.DOUBLE, + Double.NEGATIVE_INFINITY, + "[double] supports only finite values" + ) ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/FloatFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/FloatFieldMapperTests.java index 3798129ccff29..556d4312bedca 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/FloatFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/FloatFieldMapperTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.index.mapper; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -24,13 +23,13 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, "3.4028235E39", "[float] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, "-3.4028235E39", "[float] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.NaN, "[float] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.POSITIVE_INFINITY, "[float] supports only finite values"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.NEGATIVE_INFINITY, "[float] supports only finite values") + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, "3.4028235E39", "[float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, "-3.4028235E39", "[float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.NaN, "[float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.POSITIVE_INFINITY, "[float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.FLOAT, Float.NEGATIVE_INFINITY, "[float] supports only finite values") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/HalfFloatFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/HalfFloatFieldMapperTests.java index cc024efb5f307..8e3cd5d1b3202 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/HalfFloatFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/HalfFloatFieldMapperTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -26,14 +25,14 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberType.HALF_FLOAT, "65520", "[half_float] supports only finite values"), - OutOfRangeSpec.of(NumberType.HALF_FLOAT, "-65520", "[half_float] supports only finite values"), - OutOfRangeSpec.of(NumberType.HALF_FLOAT, "-65520", "[half_float] supports only finite values"), - OutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.NaN, "[half_float] supports only finite values"), - OutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.POSITIVE_INFINITY, "[half_float] supports only finite values"), - OutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.NEGATIVE_INFINITY, "[half_float] supports only finite values") + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, "65520", "[half_float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, "-65520", "[half_float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, "-65520", "[half_float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.NaN, "[half_float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.POSITIVE_INFINITY, "[half_float] supports only finite values"), + NumberTypeOutOfRangeSpec.of(NumberType.HALF_FLOAT, Float.NEGATIVE_INFINITY, "[half_float] supports only finite values") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IntegerFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IntegerFieldMapperTests.java index 994b74a25743c..ac29ff52dee43 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/IntegerFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/IntegerFieldMapperTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -24,12 +23,12 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberType.INTEGER, "2147483648", "is out of range for an integer"), - OutOfRangeSpec.of(NumberType.INTEGER, "-2147483649", "is out of range for an integer"), - OutOfRangeSpec.of(NumberType.INTEGER, 2147483648L, " out of range of int"), - OutOfRangeSpec.of(NumberType.INTEGER, -2147483649L, " out of range of int") + NumberTypeOutOfRangeSpec.of(NumberType.INTEGER, "2147483648", "is out of range for an integer"), + NumberTypeOutOfRangeSpec.of(NumberType.INTEGER, "-2147483649", "is out of range for an integer"), + NumberTypeOutOfRangeSpec.of(NumberType.INTEGER, 2147483648L, " out of range of int"), + NumberTypeOutOfRangeSpec.of(NumberType.INTEGER, -2147483649L, " out of range of int") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java index f2d4431e5c79f..79c89f425c8fe 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/LongFieldMapperTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.script.LongFieldScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContext; @@ -33,14 +32,14 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "9223372036854775808", "out of range for a long"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "1e999999999", "out of range for a long"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "-9223372036854775809", "out of range for a long"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "-1e999999999", "out of range for a long"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, new BigInteger("9223372036854775808"), "out of range of long"), - OutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, new BigInteger("-9223372036854775809"), "out of range of long") + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "9223372036854775808", "out of range for a long"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "1e999999999", "out of range for a long"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "-9223372036854775809", "out of range for a long"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, "-1e999999999", "out of range for a long"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, new BigInteger("9223372036854775808"), "out of range of long"), + NumberTypeOutOfRangeSpec.of(NumberFieldMapper.NumberType.LONG, new BigInteger("-9223372036854775809"), "out of range of long") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/ShortFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/ShortFieldMapperTests.java index b78cdbb8f2bfb..71cfe8b6bb50a 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/ShortFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/ShortFieldMapperTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -24,12 +23,12 @@ protected Number missingValue() { } @Override - protected List outOfRangeSpecs() { + protected List outOfRangeSpecs() { return List.of( - OutOfRangeSpec.of(NumberType.SHORT, "32768", "is out of range for a short"), - OutOfRangeSpec.of(NumberType.SHORT, "-32769", "is out of range for a short"), - OutOfRangeSpec.of(NumberType.SHORT, 32768, "out of range of Java short"), - OutOfRangeSpec.of(NumberType.SHORT, -32769, "out of range of Java short") + NumberTypeOutOfRangeSpec.of(NumberType.SHORT, "32768", "is out of range for a short"), + NumberTypeOutOfRangeSpec.of(NumberType.SHORT, "-32769", "is out of range for a short"), + NumberTypeOutOfRangeSpec.of(NumberType.SHORT, 32768, "out of range of Java short"), + NumberTypeOutOfRangeSpec.of(NumberType.SHORT, -32769, "out of range of Java short") ); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java similarity index 98% rename from server/src/test/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java rename to test/framework/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java index 7b91c84a05c53..81848b5a50114 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapperTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.mapper.NumberFieldTypeTests.OutOfRangeSpec; import org.elasticsearch.script.DoubleFieldScript; import org.elasticsearch.script.LongFieldScript; import org.elasticsearch.script.Script; @@ -45,7 +44,7 @@ public abstract class NumberFieldMapperTests extends MapperTestCase { /** * @return a List of OutOfRangeSpec to test for this number type */ - protected abstract List outOfRangeSpecs(); + protected abstract List outOfRangeSpecs(); /** * @return an appropriate value to use for a missing value for this number type @@ -234,7 +233,7 @@ public void testNullValue() throws IOException { } public void testOutOfRangeValues() throws IOException { - for (OutOfRangeSpec item : outOfRangeSpecs()) { + for (NumberTypeOutOfRangeSpec item : outOfRangeSpecs()) { DocumentMapper mapper = createDocumentMapper(fieldMapping(b -> b.field("type", item.type.typeName()))); Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source(item::write))); assertThat( @@ -317,7 +316,7 @@ public void testMetricAndDocvalues() { } @Override - protected final Object generateRandomInputValue(MappedFieldType ft) { + protected Object generateRandomInputValue(MappedFieldType ft) { Number n = randomNumber(); return randomBoolean() ? n : n.toString(); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/NumberTypeOutOfRangeSpec.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/NumberTypeOutOfRangeSpec.java new file mode 100644 index 0000000000000..cf8c9a06088db --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/NumberTypeOutOfRangeSpec.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.mapper; + +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.math.BigInteger; + +public class NumberTypeOutOfRangeSpec { + + final NumberFieldMapper.NumberType type; + final Object value; + final String message; + + public static NumberTypeOutOfRangeSpec of(NumberFieldMapper.NumberType t, Object v, String m) { + return new NumberTypeOutOfRangeSpec(t, v, m); + } + + NumberTypeOutOfRangeSpec(NumberFieldMapper.NumberType t, Object v, String m) { + type = t; + value = v; + message = m; + } + + public void write(XContentBuilder b) throws IOException { + if (value instanceof BigInteger) { + b.rawField("field", new ByteArrayInputStream(value.toString().getBytes("UTF-8")), XContentType.JSON); + } else { + b.field("field", value); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/WholeNumberFieldMapperTests.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/WholeNumberFieldMapperTests.java similarity index 100% rename from server/src/test/java/org/elasticsearch/index/mapper/WholeNumberFieldMapperTests.java rename to test/framework/src/main/java/org/elasticsearch/index/mapper/WholeNumberFieldMapperTests.java diff --git a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java index 95fe8f0a530ba..fc783ef92a112 100644 --- a/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java +++ b/x-pack/plugin/mapper-unsigned-long/src/test/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapperTests.java @@ -19,9 +19,10 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.MapperTestCase; +import org.elasticsearch.index.mapper.NumberTypeOutOfRangeSpec; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.index.mapper.WholeNumberFieldMapperTests; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AssumptionViolatedException; @@ -29,6 +30,7 @@ import java.io.IOException; import java.math.BigInteger; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.function.Function; @@ -38,7 +40,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.matchesPattern; -public class UnsignedLongFieldMapperTests extends MapperTestCase { +public class UnsignedLongFieldMapperTests extends WholeNumberFieldMapperTests { @Override protected Collection getPlugins() { @@ -161,6 +163,9 @@ public void testNullValue() throws IOException { } } + @Override + public void testCoerce() {} // coerce is unimplemented + @Override protected boolean supportsIgnoreMalformed() { return true; @@ -367,6 +372,36 @@ protected IngestScriptSupport ingestScriptSupport() { } @Override + protected List outOfRangeSpecs() { + return Collections.emptyList(); // unimplemented + } + + @Override + public void testIgnoreMalformedWithObject() {} // unimplemented + + @Override + public void testAllowMultipleValuesField() {} // unimplemented + + @Override + public void testScriptableTypes() {} // unimplemented + + @Override + protected Number missingValue() { + return 123L; + } + + @Override + protected Number randomNumber() { + if (randomBoolean()) { + return randomLong(); + } + if (randomBoolean()) { + return randomDouble(); + } + assumeFalse("https://github.com/elastic/elasticsearch/issues/70585", true); + return randomDoubleBetween(0L, Long.MAX_VALUE, true); + } + protected Function loadBlockExpected() { return v -> { // Numbers are in the block as a long but the test needs to compare them to their BigInteger value parsed from xcontent. From 81be06ea48a81dd612a2c8ddc2a3cc119486b98f Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Thu, 1 Feb 2024 11:35:37 +0100 Subject: [PATCH 02/86] Fix blob cache testGetMultiThreaded (#105002) The verification that we always find a free region could fail due to racing against incRef'ers (which is not locked). Closes #104997 --- .../shared/SharedBlobCacheServiceTests.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 66a6cf4dbd949..049197edd97df 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -423,10 +423,15 @@ public void testMassiveDecay() throws IOException { * @throws IOException */ public void testGetMultiThreaded() throws IOException { - int threads = between(2, 10); - int regionCount = between(1, 20); + final int threads = between(2, 10); + final int regionCount = between(1, 20); + final boolean incRef = randomBoolean(); // if we have enough regions, a get should always have a result (except for explicit evict interference) - final boolean allowAlreadyClosed = regionCount < threads; + // if we incRef, we risk the eviction racing against that, leading to no available region, so allow + // the already closed exception in that case. + final boolean allowAlreadyClosed = regionCount < threads || incRef; + + logger.info("{} {} {}", threads, regionCount, allowAlreadyClosed); Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep()) @@ -466,7 +471,7 @@ public void testGetMultiThreaded() throws IOException { assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation") : e; throw e; } - if (cacheFileRegion.tryIncRef()) { + if (incRef && cacheFileRegion.tryIncRef()) { if (yield[i] == 0) { Thread.yield(); } From cc097edd4355991cc801ce3bd2ef09849c8bbd95 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 1 Feb 2024 22:19:21 +1100 Subject: [PATCH 03/86] Remove S3RequestRetryStats in favor of APM metrics (#104973) APM metrics based s3 request stats have been available since #102505. It has collected sufficient data that we can now remove the log based stats introduced in #100272. Resolves: ES-7500 --- .../repositories/s3/S3BlobStore.java | 19 +--- .../repositories/s3/S3RequestRetryStats.java | 89 ------------------- 2 files changed, 1 insertion(+), 107 deletions(-) delete mode 100644 modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index c045e05a6f8e0..6b58026aba4f4 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -85,10 +85,6 @@ class S3BlobStore implements BlobStore { private final StatsCollectors statsCollectors = new StatsCollectors(); - private static final TimeValue RETRY_STATS_WINDOW = TimeValue.timeValueMinutes(5); - - private volatile S3RequestRetryStats s3RequestRetryStats; - S3BlobStore( S3Service service, String bucket, @@ -112,23 +108,10 @@ class S3BlobStore implements BlobStore { this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); this.repositoriesMetrics = repositoriesMetrics; - s3RequestRetryStats = new S3RequestRetryStats(getMaxRetries()); - threadPool.scheduleWithFixedDelay(() -> { - var priorRetryStats = s3RequestRetryStats; - s3RequestRetryStats = new S3RequestRetryStats(getMaxRetries()); - priorRetryStats.emitMetrics(); - }, RETRY_STATS_WINDOW, threadPool.generic()); } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { - var collector = statsCollectors.getMetricCollector(operation, purpose); - return new RequestMetricCollector() { - @Override - public void collectMetrics(Request request, Response response) { - s3RequestRetryStats.addRequest(request); - collector.collectMetrics(request, response); - } - }; + return statsCollectors.getMetricCollector(operation, purpose); } public Executor getSnapshotExecutor() { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java deleted file mode 100644 index b7c37c6d95fde..0000000000000 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RequestRetryStats.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.repositories.s3; - -import com.amazonaws.Request; -import com.amazonaws.util.AWSRequestMetrics; -import com.amazonaws.util.TimingInfo; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.ESLogMessage; -import org.elasticsearch.common.util.Maps; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongArray; - -/** - * This class emit aws s3 metrics as logs until we have a proper apm integration - */ -public class S3RequestRetryStats { - public static final String MESSAGE_FIELD = "message"; - - private static final Logger logger = LogManager.getLogger(S3RequestRetryStats.class); - - private final AtomicLong requests = new AtomicLong(); - private final AtomicLong exceptions = new AtomicLong(); - private final AtomicLong throttles = new AtomicLong(); - private final AtomicLongArray exceptionsHistogram; - private final AtomicLongArray throttlesHistogram; - - public S3RequestRetryStats(int maxRetries) { - this.exceptionsHistogram = new AtomicLongArray(maxRetries + 1); - this.throttlesHistogram = new AtomicLongArray(maxRetries + 1); - } - - public void addRequest(Request request) { - if (request == null) { - return; - } - var info = request.getAWSRequestMetrics().getTimingInfo(); - long requests = getCounter(info, AWSRequestMetrics.Field.RequestCount); - long exceptions = getCounter(info, AWSRequestMetrics.Field.Exception); - long throttles = getCounter(info, AWSRequestMetrics.Field.ThrottleException); - - this.requests.addAndGet(requests); - this.exceptions.addAndGet(exceptions); - this.throttles.addAndGet(throttles); - if (exceptions >= 0 && exceptions < this.exceptionsHistogram.length()) { - this.exceptionsHistogram.incrementAndGet((int) exceptions); - } - if (throttles >= 0 && throttles < this.throttlesHistogram.length()) { - this.throttlesHistogram.incrementAndGet((int) throttles); - } - } - - private static long getCounter(TimingInfo info, AWSRequestMetrics.Field field) { - var counter = info.getCounter(field.name()); - return counter != null ? counter.longValue() : 0L; - } - - public void emitMetrics() { - if (logger.isDebugEnabled()) { - var metrics = Maps.newMapWithExpectedSize(4); - metrics.put(MESSAGE_FIELD, "S3 Request Retry Stats"); - metrics.put("elasticsearch.metrics.s3.requests", requests.get()); - metrics.put("elasticsearch.metrics.s3.exceptions", exceptions.get()); - metrics.put("elasticsearch.metrics.s3.throttles", throttles.get()); - for (int i = 0; i < exceptionsHistogram.length(); i++) { - long exceptions = exceptionsHistogram.get(i); - if (exceptions != 0) { - metrics.put("elasticsearch.metrics.s3.exceptions_histogram_" + i, exceptions); - } - } - for (int i = 0; i < throttlesHistogram.length(); i++) { - long throttles = throttlesHistogram.get(i); - if (throttles != 0) { - metrics.put("elasticsearch.metrics.s3.throttles_histogram_" + i, throttles); - } - } - logger.debug(new ESLogMessage().withFields(metrics)); - } - } -} From 5b22d4245e83b21eec434825778f19a639982349 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Thu, 1 Feb 2024 12:23:10 +0100 Subject: [PATCH 04/86] Retry TransportShardMultiGetFomTranslogAction during relocations (#104901) Same as #104579 but for MGET. Relates ES-5727 --- .../action/get/TransportGetAction.java | 2 +- .../get/TransportShardMultiGetAction.java | 165 +++++++++++++----- ...ansportShardMultiGetFomTranslogAction.java | 1 - 3 files changed, 119 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 5eab04663e959..9df6853eeeef0 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -296,7 +296,7 @@ private void tryGetFromTranslog(GetRequest request, IndexShard indexShard, Clust static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) { var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { - throw new NoShardAvailableActionException(shardId, "primary shard is not active"); + return null; } DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); assert node != null; diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 6dfd706b3268f..2e558b42d7e2b 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -8,10 +8,15 @@ package org.elasticsearch.action.get; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; @@ -19,6 +24,7 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.OperationRouting; @@ -27,15 +33,17 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -163,8 +171,12 @@ private void handleMultiGetOnUnpromotableShard( ActionListener listener ) throws IOException { ShardId shardId = indexShard.shardId(); - var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); if (request.refresh()) { + var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); + if (node == null) { + listener.onFailure(new NoShardAvailableActionException(shardId, "primary shard is not active")); + return; + } logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); var refreshRequest = new BasicReplicationRequest(shardId); refreshRequest.setParentTask(request.getParentTask()); @@ -173,57 +185,116 @@ private void handleMultiGetOnUnpromotableShard( refreshRequest, listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) ); - } else if (request.realtime()) { - TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( - request, - shardId - ); - mgetFromTranslogRequest.setParentTask(request.getParentTask()); - transportService.sendRequest( - node, - TransportShardMultiGetFomTranslogAction.NAME, - mgetFromTranslogRequest, - new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { - var responseHasMissingLocations = false; - for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) { - if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) { - responseHasMissingLocations = true; - break; - } - } - if (responseHasMissingLocations == false) { - logger.debug("received result of all ids in real-time mget[shard] from the promotable shard."); - l.onResponse(r.multiGetShardResponse()); - } else { - logger.debug( - "no result for some ids from the promotable shard (segment generation to wait for: {})", - r.segmentGeneration() - ); - if (r.segmentGeneration() == -1) { - // Nothing to wait for (no previous unsafe generation), just handle the rest locally. - ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run(); - } else { - assert r.segmentGeneration() > -1L; - assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM; - indexShard.waitForPrimaryTermAndGeneration( - r.primaryTerm(), - r.segmentGeneration(), - listener.delegateFailureAndWrap( - (ll, aLong) -> getExecutor(request, shardId).execute( - ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)) - ) - ) - ); - } - } - }), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId)) + return; + } + if (request.realtime()) { + final var state = clusterService.state(); + final var observer = new ClusterStateObserver( + state, + clusterService, + TimeValue.timeValueSeconds(60), + logger, + threadPool.getThreadContext() ); + shardMultiGetFromTranslog(request, indexShard, state, observer, listener); } else { // A non-real-time mget with no explicit refresh requested. super.asyncShardOperation(request, shardId, listener); } } + private void shardMultiGetFromTranslog( + MultiGetShardRequest request, + IndexShard indexShard, + ClusterState state, + ClusterStateObserver observer, + ActionListener listener + ) { + tryShardMultiGetFromTranslog(request, indexShard, state, listener.delegateResponse((l, e) -> { + final var cause = ExceptionsHelper.unwrapCause(e); + logger.debug("mget_from_translog[shard] failed", cause); + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { + logger.debug("retrying mget_from_translog[shard]"); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + shardMultiGetFromTranslog(request, indexShard, state, observer, l); + } + + @Override + public void onClusterServiceClose() { + l.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); + } + }); + } else { + l.onFailure(e); + } + })); + } + + private void tryShardMultiGetFromTranslog( + MultiGetShardRequest request, + IndexShard indexShard, + ClusterState state, + ActionListener listener + ) { + final var shardId = indexShard.shardId(); + var node = getCurrentNodeOfPrimary(state, shardId); + if (node == null) { + listener.onFailure(new NoShardAvailableActionException(shardId, "primary shard is not active")); + return; + } + TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( + request, + shardId + ); + mgetFromTranslogRequest.setParentTask(request.getParentTask()); + transportService.sendRequest( + node, + TransportShardMultiGetFomTranslogAction.NAME, + mgetFromTranslogRequest, + new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { + var responseHasMissingLocations = false; + for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) { + if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) { + responseHasMissingLocations = true; + break; + } + } + if (responseHasMissingLocations == false) { + logger.debug("received result of all ids in real-time mget[shard] from the promotable shard."); + l.onResponse(r.multiGetShardResponse()); + } else { + logger.debug( + "no result for some ids from the promotable shard (segment generation to wait for: {})", + r.segmentGeneration() + ); + if (r.segmentGeneration() == -1) { + // Nothing to wait for (no previous unsafe generation), just handle the rest locally. + ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run(); + } else { + assert r.segmentGeneration() > -1L; + assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM; + indexShard.waitForPrimaryTermAndGeneration( + r.primaryTerm(), + r.segmentGeneration(), + listener.delegateFailureAndWrap( + (ll, aLong) -> getExecutor(request, shardId).execute( + ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)) + ) + ) + ); + } + } + }), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId)) + ); + } + private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) { logger.trace("handling local gets for missing locations"); for (int i = 0; i < response.locations.size(); i++) { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index 5058990efd966..52504176eb7e1 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.util.Objects; -// TODO(ES-5727): add a retry mechanism to TransportShardMultiGetFromTranslogAction public class TransportShardMultiGetFomTranslogAction extends HandledTransportAction< TransportShardMultiGetFomTranslogAction.Request, TransportShardMultiGetFomTranslogAction.Response> { From 81a49f156758bfa5fc88d65cfc8d9bbf54575ab5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 1 Feb 2024 13:09:17 +0100 Subject: [PATCH 05/86] Restrict usage of certain aggregations when in sort order execution is required (#104665) A number of aggregations that rely on deferred collection don't work with time series index searcher and will produce incorrect result. These aggregation usages should fail. The documentation has been updated to describe these limitations. In case of multi terms aggregation, the depth first collection is forcefully used when time series aggregation is used. This behaviour is inline with the terms aggregation. --- docs/changelog/104665.yaml | 5 ++ .../bucket/time-series-aggregation.asciidoc | 11 ++++ .../test/aggregations/time_series.yml | 61 +++++++++++++++++++ .../bucket/DeferableBucketAggregator.java | 11 ++++ .../multiterms/MultiTermsAggregator.java | 5 +- .../rest-api-spec/test/analytics/100_tsdb.yml | 28 ++++++++- 6 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/104665.yaml diff --git a/docs/changelog/104665.yaml b/docs/changelog/104665.yaml new file mode 100644 index 0000000000000..a7043cbdc9dda --- /dev/null +++ b/docs/changelog/104665.yaml @@ -0,0 +1,5 @@ +pr: 104665 +summary: Restrict usage of certain aggregations when in sort order execution is required +area: TSDB +type: enhancement +issues: [] diff --git a/docs/reference/aggregations/bucket/time-series-aggregation.asciidoc b/docs/reference/aggregations/bucket/time-series-aggregation.asciidoc index d93df55118a8b..1fb527cd645f0 100644 --- a/docs/reference/aggregations/bucket/time-series-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/time-series-aggregation.asciidoc @@ -97,3 +97,14 @@ further. Alternatively, using sub aggregations can limit the amount of values re The `keyed` parameter determines if buckets are returned as a map with unique keys per bucket. By default with `keyed` set to false, buckets are returned as an array. + +[[times-series-aggregations-limitations]] +==== Limitations + +The `time_series` aggregation has many limitations. Many aggregation performance optimizations are disabled when using +the `time_series` aggregation. For example the filter by filter optimization or collect mode breath first (`terms` and +`multi_terms` aggregation forcefully use the depth first collect mode). + +The following aggregations also fail to work if used in combination with the `time_series` aggregation: +`auto_date_histogram`, `variable_width_histogram`, `rare_terms`, `global`, `composite`, `sampler`, `random_sampler` and +`diversified_sampler`. diff --git a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml index 33ca0dc4779b6..9831f87fb5067 100644 --- a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml +++ b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml @@ -343,3 +343,64 @@ setup: - match: { aggregations.ts.buckets.0.key: { "key": "10" } } - match: { aggregations.ts.buckets.0.doc_count: 1 } + +--- +"auto_date_histogram aggregation with time_series aggregation": + - skip: + version: " - 8.12.99" + reason: "Handling for time series aggregation failures introduced in 8.13.0" + + - do: + catch: '/\[by_time\] aggregation is incompatible with time series execution mode/' + search: + index: tsdb + body: + aggs: + by_time: + auto_date_histogram: + field: "@timestamp" + aggs: + ts: + time_series: {} + +--- +"variable_width_histogram aggregation with time_series aggregation": + - skip: + version: " - 8.12.99" + reason: "Handling for time series aggregation failures introduced in 8.13.0" + + - do: + catch: '/\[variable_width_histogram\] aggregation is incompatible with time series execution mode/' + search: + index: tsdb + body: + aggs: + variable_width_histogram: + variable_width_histogram: + field: val + aggs: + ts: + time_series: {} + +--- +"rare_terms aggregation with time_series aggregation": + - skip: + version: " - 8.12.99" + reason: "Handling for time series aggregation failures introduced in 8.13.0" + + - do: + catch: '/\[rare_terms\] aggregation is incompatible with time series execution mode/' + search: + index: tsdb + body: + aggs: + ts: + time_series: {} + aggs: + rare_terms: + rare_terms: + field: key + aggs: + max: + max: + field: val diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java index 192d3b1f84858..79dcdd6e0b220 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java @@ -28,6 +28,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator { */ private DeferringBucketCollector deferringCollector; private List deferredAggregationNames; + private final boolean inSortOrderExecutionRequired; protected DeferableBucketAggregator( String name, @@ -38,6 +39,7 @@ protected DeferableBucketAggregator( ) throws IOException { // Assumes that we're collecting MANY buckets. super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata); + this.inSortOrderExecutionRequired = context.isInSortOrderExecutionRequired(); } @Override @@ -46,6 +48,15 @@ protected void doPreCollection() throws IOException { List deferredAggregations = null; for (int i = 0; i < subAggregators.length; ++i) { if (shouldDefer(subAggregators[i])) { + // Deferred collection isn't possible with TimeSeriesIndexSearcher, + // this will always result in incorrect results. The is caused by + // the fact that tsid will not be correctly recorded, because when + // deferred collection occurs the TimeSeriesIndexSearcher already + // completed execution. + if (inSortOrderExecutionRequired) { + throw new IllegalArgumentException("[" + name + "] aggregation is incompatible with time series execution mode"); + } + if (deferringCollector == null) { deferringCollector = buildDeferringCollector(); deferredAggregations = new ArrayList<>(subAggregators.length); diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index 7633d38d7ebba..2a1d9f8c44c53 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -95,11 +95,14 @@ protected MultiTermsAggregator( partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); this.formats = formats; this.showTermDocCountError = showTermDocCountError; - if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) { + if (subAggsNeedScore() && descendsFromNestedAggregator(parent) || context.isInSortOrderExecutionRequired()) { /** * Force the execution to depth_first because we need to access the score of * nested documents in a sub-aggregation and we are not able to generate this score * while replaying deferred documents. + * + * We also force depth_first for time-series aggs executions since they need to be visited in a particular order (index + * sort order) which might be changed by the breadth_first execution. */ this.collectMode = SubAggCollectionMode.DEPTH_FIRST; } else { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/100_tsdb.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/100_tsdb.yml index ef34e64ad41d7..c014a4768dcef 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/100_tsdb.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/100_tsdb.yml @@ -61,7 +61,7 @@ setup: - '{"@timestamp": "2021-04-28T18:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}' --- -aggretate multi_terms: +aggregate multi_terms: - skip: version: " - 8.0.99" reason: introduced in 8.1.0 @@ -85,6 +85,32 @@ aggretate multi_terms: - match: { aggregations.m_terms.buckets.2.key_as_string: "{k8s.pod.uid=947e4ced-1786-4e53-9e0c-5c447e959507, metricset=pod}|10.10.55.2" } - match: { aggregations.m_terms.buckets.2.doc_count: 1 } +--- +"multi_terms aggregation with time_series aggregation": + - skip: + version: " - 8.12.99" + reason: "multi_terms for time series aggregation fixed in 8.13.0" + + - do: + search: + index: test + body: + aggs: + ts: + time_series: {} + m_terms: + multi_terms: + collect_mode: breadth_first + terms: + - field: k8s.pod.name + - field: k8s.pod.ip + aggs: + max_value: + max: + field: val + - length: { aggregations.ts.buckets: 2 } + - length: { aggregations.m_terms.buckets: 3 } + --- "Auto date histogram on counter": - do: From 155f2cdf3ff600511ea291148b37d9d80033a9f7 Mon Sep 17 00:00:00 2001 From: Carlos Delgado <6339205+carlosdelest@users.noreply.github.com> Date: Thu, 1 Feb 2024 13:11:29 +0100 Subject: [PATCH 06/86] Extract inner classes from TransportBulkAction (#104986) --- .../action/bulk/BulkOperation.java | 392 ++++++++++++++ .../action/bulk/BulkRequestModifier.java | 167 ++++++ .../action/bulk/TransportBulkAction.java | 492 +----------------- .../action/bulk/BulkRequestModifierTests.java | 6 +- 4 files changed, 576 insertions(+), 481 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java new file mode 100644 index 0000000000000..1d95f430d5c7e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -0,0 +1,392 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.RoutingMissingException; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.IndexRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; + +/** + * retries on retryable cluster blocks, resolves item requests, + * constructs shard bulk requests and delegates execution to shard bulk action + */ +final class BulkOperation extends ActionRunnable { + + private static final Logger logger = LogManager.getLogger(BulkOperation.class); + + private final Task task; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private BulkRequest bulkRequest; // set to null once all requests are sent out + private final ActionListener listener; + private final AtomicArray responses; + private final long startTimeNanos; + private final ClusterStateObserver observer; + private final Map indicesThatCannotBeCreated; + private final String executorName; + private final LongSupplier relativeTimeProvider; + private IndexNameExpressionResolver indexNameExpressionResolver; + private NodeClient client; + + BulkOperation( + Task task, + ThreadPool threadPool, + String executorName, + ClusterService clusterService, + BulkRequest bulkRequest, + NodeClient client, + AtomicArray responses, + Map indicesThatCannotBeCreated, + IndexNameExpressionResolver indexNameExpressionResolver, + LongSupplier relativeTimeProvider, + long startTimeNanos, + ActionListener listener + ) { + super(listener); + this.task = task; + this.threadPool = threadPool; + this.clusterService = clusterService; + this.responses = responses; + this.bulkRequest = bulkRequest; + this.listener = listener; + this.startTimeNanos = startTimeNanos; + this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; + this.executorName = executorName; + this.relativeTimeProvider = relativeTimeProvider; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.client = client; + this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); + } + + @Override + protected void doRun() { + assert bulkRequest != null; + final ClusterState clusterState = observer.setAndGetObservedState(); + if (handleBlockExceptions(clusterState)) { + return; + } + Map> requestsByShard = groupRequestsByShards(clusterState); + executeBulkRequestsByShard(requestsByShard, clusterState); + } + + private long buildTookInMillis(long startTimeNanos) { + return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeNanos); + } + + private Map> groupRequestsByShards(ClusterState clusterState) { + final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); + Metadata metadata = clusterState.metadata(); + // Group the requests by ShardId -> Operations mapping + Map> requestsByShard = new HashMap<>(); + + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); + // the request can only be null because we set it to null in the previous step, so it gets ignored + if (docWriteRequest == null) { + continue; + } + if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { + continue; + } + if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) { + continue; + } + if (addFailureIfRequiresDataStreamAndNoParentDataStream(docWriteRequest, i, metadata)) { + continue; + } + IndexAbstraction ia = null; + boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE; + try { + ia = concreteIndices.resolveIfAbsent(docWriteRequest); + if (ia.isDataStreamRelated() && includeDataStreams == false) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } + // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether + // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so + // the validation needs to be performed here too. + if (ia.getParentDataStream() != null && + // avoid valid cases when directly indexing into a backing index + // (for example when directly indexing into .ds-logs-foobar-000001) + ia.getName().equals(docWriteRequest.index()) == false && docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } + + TransportBulkAction.prohibitCustomRoutingOnDataStream(docWriteRequest, metadata); + TransportBulkAction.prohibitAppendWritesInBackingIndices(docWriteRequest, metadata); + docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); + + final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata); + if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) { + continue; + } + IndexRouting indexRouting = concreteIndices.routing(concreteIndex); + docWriteRequest.process(indexRouting); + int shardId = docWriteRequest.route(indexRouting); + List shardRequests = requestsByShard.computeIfAbsent( + new ShardId(concreteIndex, shardId), + shard -> new ArrayList<>() + ); + shardRequests.add(new BulkItemRequest(i, docWriteRequest)); + } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) { + String name = ia != null ? ia.getName() : docWriteRequest.index(); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e); + BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure); + responses.set(i, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(i, null); + } + } + return requestsByShard; + } + + private void executeBulkRequestsByShard(Map> requestsByShard, ClusterState clusterState) { + if (requestsByShard.isEmpty()) { + listener.onResponse( + new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) + ); + return; + } + + String nodeId = clusterService.localNode().getId(); + Runnable onBulkItemsComplete = () -> { + listener.onResponse( + new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) + ); + // Allow memory for bulk shard request items to be reclaimed before all items have been completed + bulkRequest = null; + }; + + try (RefCountingRunnable bulkItemRequestCompleteRefCount = new RefCountingRunnable(onBulkItemsComplete)) { + for (Map.Entry> entry : requestsByShard.entrySet()) { + final ShardId shardId = entry.getKey(); + final List requests = entry.getValue(); + + BulkShardRequest bulkShardRequest = new BulkShardRequest( + shardId, + bulkRequest.getRefreshPolicy(), + requests.toArray(new BulkItemRequest[0]) + ); + bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); + bulkShardRequest.timeout(bulkRequest.timeout()); + bulkShardRequest.routedBasedOnClusterVersion(clusterState.version()); + if (task != null) { + bulkShardRequest.setParentTask(nodeId, task.getId()); + } + executeBulkShardRequest(bulkShardRequest, bulkItemRequestCompleteRefCount.acquire()); + } + } + } + + private void executeBulkShardRequest(BulkShardRequest bulkShardRequest, Releasable releaseOnFinish) { + client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>() { + @Override + public void onResponse(BulkShardResponse bulkShardResponse) { + for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + // we may have no response if item failed + if (bulkItemResponse.getResponse() != null) { + bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); + } + responses.set(bulkItemResponse.getItemId(), bulkItemResponse); + } + releaseOnFinish.close(); + } + + @Override + public void onFailure(Exception e) { + // create failures for all relevant requests + for (BulkItemRequest request : bulkShardRequest.items()) { + final String indexName = request.index(); + DocWriteRequest docWriteRequest = request.request(); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e); + responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure)); + } + releaseOnFinish.close(); + } + }); + } + + private boolean handleBlockExceptions(ClusterState state) { + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { + if (blockException.retryable()) { + logger.trace("cluster is blocked, scheduling a retry", blockException); + retry(blockException); + } else { + onFailure(blockException); + } + return true; + } + return false; + } + + void retry(Exception failure) { + assert failure != null; + if (observer.isTimedOut()) { + // we running as a last attempt after a timeout has happened. don't retry + onFailure(failure); + return; + } + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + /* + * This is called on the cluster state update thread pool + * but we'd prefer to coordinate the bulk request on the + * write thread pool just to make sure the cluster state + * update thread doesn't get clogged up. + */ + dispatchRetry(); + } + + @Override + public void onClusterServiceClose() { + onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + /* + * Try one more time.... This is called on the generic + * thread pool but out of an abundance of caution we + * switch over to the write thread pool that we expect + * to coordinate the bulk request. + */ + dispatchRetry(); + } + + private void dispatchRetry() { + threadPool.executor(executorName).submit(BulkOperation.this); + } + }); + } + + private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest request, int idx, final Metadata metadata) { + if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { + Exception exception = new IndexNotFoundException( + "[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true] and [" + request.index() + "] is not an alias", + request.index() + ); + addFailure(request, idx, exception); + return true; + } + return false; + } + + private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequest request, int idx, final Metadata metadata) { + if (request.isRequireDataStream() && (metadata.indexIsADataStream(request.index()) == false)) { + Exception exception = new ResourceNotFoundException( + "[" + DocWriteRequest.REQUIRE_DATA_STREAM + "] request flag is [true] and [" + request.index() + "] is not a data stream", + request.index() + ); + addFailure(request, idx, exception); + return true; + } + return false; + } + + private boolean addFailureIfIndexIsClosed(DocWriteRequest request, Index concreteIndex, int idx, final Metadata metadata) { + IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex); + if (indexMetadata.getState() == IndexMetadata.State.CLOSE) { + addFailure(request, idx, new IndexClosedException(concreteIndex)); + return true; + } + return false; + } + + private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest request, int idx) { + IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); + if (cannotCreate != null) { + addFailure(request, idx, cannotCreate); + return true; + } + return false; + } + + private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), unavailableException); + BulkItemResponse bulkItemResponse = BulkItemResponse.failure(idx, request.opType(), failure); + responses.set(idx, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(idx, null); + } + + private static class ConcreteIndices { + private final ClusterState state; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final Map indexAbstractions = new HashMap<>(); + private final Map routings = new HashMap<>(); + + ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) { + this.state = state; + this.indexNameExpressionResolver = indexNameExpressionResolver; + } + + IndexAbstraction resolveIfAbsent(DocWriteRequest request) { + try { + IndexAbstraction indexAbstraction = indexAbstractions.get(request.index()); + if (indexAbstraction == null) { + indexAbstraction = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request); + indexAbstractions.put(request.index(), indexAbstraction); + } + return indexAbstraction; + } catch (IndexNotFoundException e) { + if (e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams", e); + } else { + throw e; + } + } + } + + IndexRouting routing(Index index) { + IndexRouting routing = routings.get(index); + if (routing == null) { + routing = IndexRouting.fromIndexMetadata(state.metadata().getIndexSafe(index)); + routings.put(index, routing); + } + return routing; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java new file mode 100644 index 0000000000000..e42ddd41b0b0a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.apache.lucene.util.SparseFixedBitSet; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +final class BulkRequestModifier implements Iterator> { + + private static final String DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated"; + + final BulkRequest bulkRequest; + final SparseFixedBitSet failedSlots; + final List itemResponses; + final AtomicIntegerArray originalSlots; + + volatile int currentSlot = -1; + + BulkRequestModifier(BulkRequest bulkRequest) { + this.bulkRequest = bulkRequest; + this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); + this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); + this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok + } + + @Override + public DocWriteRequest next() { + return bulkRequest.requests().get(++currentSlot); + } + + @Override + public boolean hasNext() { + return (currentSlot + 1) < bulkRequest.requests().size(); + } + + BulkRequest getBulkRequest() { + if (itemResponses.isEmpty()) { + return bulkRequest; + } else { + BulkRequest modifiedBulkRequest = new BulkRequest(); + modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); + modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); + modifiedBulkRequest.timeout(bulkRequest.timeout()); + + int slot = 0; + List> requests = bulkRequest.requests(); + for (int i = 0; i < requests.size(); i++) { + DocWriteRequest request = requests.get(i); + if (failedSlots.get(i) == false) { + modifiedBulkRequest.add(request); + originalSlots.set(slot++, i); + } + } + return modifiedBulkRequest; + } + } + + ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { + if (itemResponses.isEmpty()) { + return actionListener.map( + response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis) + ); + } else { + return actionListener.map(response -> { + // these items are the responses from the subsequent bulk request, their 'slots' + // are not correct for this response we're building + final BulkItemResponse[] bulkResponses = response.getItems(); + + final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; + + // the item responses are from the original request, so their slots are correct. + // these are the responses for requests that failed early and were not passed on to the subsequent bulk. + for (BulkItemResponse item : itemResponses) { + allResponses[item.getItemId()] = item; + } + + // use the original slots for the responses from the bulk + for (int i = 0; i < bulkResponses.length; i++) { + allResponses[originalSlots.get(i)] = bulkResponses[i]; + } + + if (Assertions.ENABLED) { + assertResponsesAreCorrect(bulkResponses, allResponses); + } + + return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis); + }); + } + } + + private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) { + // check for an empty intersection between the ids + final Set failedIds = itemResponses.stream().map(BulkItemResponse::getItemId).collect(Collectors.toSet()); + final Set responseIds = IntStream.range(0, bulkResponses.length) + .map(originalSlots::get) // resolve subsequent bulk ids back to the original slots + .boxed() + .collect(Collectors.toSet()); + assert Sets.haveEmptyIntersection(failedIds, responseIds) + : "bulk item response slots cannot have failed and been processed in the subsequent bulk request, failed ids: " + + failedIds + + ", response ids: " + + responseIds; + + // check for the correct number of responses + final int expectedResponseCount = bulkRequest.requests.size(); + final int actualResponseCount = failedIds.size() + responseIds.size(); + assert expectedResponseCount == actualResponseCount + : "Expected [" + expectedResponseCount + "] responses, but found [" + actualResponseCount + "]"; + + // check that every response is present + for (int i = 0; i < allResponses.length; i++) { + assert allResponses[i] != null : "BulkItemResponse at index [" + i + "] was null"; + } + } + + synchronized void markItemAsFailed(int slot, Exception e) { + final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); + final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); + // We hit a error during preprocessing a request, so we: + // 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed + // 2) Add a bulk item failure for this request + // 3) Continue with the next request in the bulk. + failedSlots.set(slot); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e); + itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure)); + } + + synchronized void markItemAsDropped(int slot) { + final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); + final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); + failedSlots.set(slot); + UpdateResponse dropped = new UpdateResponse( + new ShardId(docWriteRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), + id, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + docWriteRequest.version(), + DocWriteResponse.Result.NOOP + ); + itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index de11a57a237df..e33f3c71e0076 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -10,18 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.SparseFixedBitSet; -import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest.OpType; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -36,7 +31,6 @@ import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -44,25 +38,19 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; @@ -71,21 +59,15 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.LongSupplier; import java.util.stream.Collectors; -import java.util.stream.IntStream; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -105,7 +87,6 @@ public class TransportBulkAction extends HandledTransportAction { - private final Task task; - private BulkRequest bulkRequest; // set to null once all requests are sent out - private final ActionListener listener; - private final AtomicArray responses; - private final long startTimeNanos; - private final ClusterStateObserver observer; - private final Map indicesThatCannotBeCreated; - private final String executorName; - - BulkOperation( - Task task, - BulkRequest bulkRequest, - ActionListener listener, - String executorName, - AtomicArray responses, - long startTimeNanos, - Map indicesThatCannotBeCreated - ) { - super(listener); - this.task = task; - this.bulkRequest = bulkRequest; - this.listener = listener; - this.responses = responses; - this.startTimeNanos = startTimeNanos; - this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; - this.executorName = executorName; - this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); - } - - @Override - protected void doRun() { - assert bulkRequest != null; - final ClusterState clusterState = observer.setAndGetObservedState(); - if (handleBlockExceptions(clusterState)) { - return; - } - Map> requestsByShard = groupRequestsByShards(clusterState); - executeBulkRequestsByShard(requestsByShard, clusterState); - } - - private Map> groupRequestsByShards(ClusterState clusterState) { - final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); - Metadata metadata = clusterState.metadata(); - // Group the requests by ShardId -> Operations mapping - Map> requestsByShard = new HashMap<>(); - - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); - // the request can only be null because we set it to null in the previous step, so it gets ignored - if (docWriteRequest == null) { - continue; - } - if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { - continue; - } - if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) { - continue; - } - if (addFailureIfRequiresDataStreamAndNoParentDataStream(docWriteRequest, i, metadata)) { - continue; - } - IndexAbstraction ia = null; - boolean includeDataStreams = docWriteRequest.opType() == OpType.CREATE; - try { - ia = concreteIndices.resolveIfAbsent(docWriteRequest); - if (ia.isDataStreamRelated() && includeDataStreams == false) { - throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); - } - // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether - // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so - // the validation needs to be performed here too. - if (ia.getParentDataStream() != null && - // avoid valid cases when directly indexing into a backing index - // (for example when directly indexing into .ds-logs-foobar-000001) - ia.getName().equals(docWriteRequest.index()) == false && docWriteRequest.opType() != OpType.CREATE) { - throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); - } - - prohibitCustomRoutingOnDataStream(docWriteRequest, metadata); - prohibitAppendWritesInBackingIndices(docWriteRequest, metadata); - docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); - - final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata); - if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) { - continue; - } - IndexRouting indexRouting = concreteIndices.routing(concreteIndex); - docWriteRequest.process(indexRouting); - int shardId = docWriteRequest.route(indexRouting); - List shardRequests = requestsByShard.computeIfAbsent( - new ShardId(concreteIndex, shardId), - shard -> new ArrayList<>() - ); - shardRequests.add(new BulkItemRequest(i, docWriteRequest)); - } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) { - String name = ia != null ? ia.getName() : docWriteRequest.index(); - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e); - BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - } - return requestsByShard; - } - - private void executeBulkRequestsByShard(Map> requestsByShard, ClusterState clusterState) { - if (requestsByShard.isEmpty()) { - listener.onResponse( - new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) - ); - return; - } - - String nodeId = clusterService.localNode().getId(); - Runnable onBulkItemsComplete = () -> { - listener.onResponse( - new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) - ); - // Allow memory for bulk shard request items to be reclaimed before all items have been completed - bulkRequest = null; - }; - - try (RefCountingRunnable bulkItemRequestCompleteRefCount = new RefCountingRunnable(onBulkItemsComplete)) { - for (Map.Entry> entry : requestsByShard.entrySet()) { - final ShardId shardId = entry.getKey(); - final List requests = entry.getValue(); - - BulkShardRequest bulkShardRequest = new BulkShardRequest( - shardId, - bulkRequest.getRefreshPolicy(), - requests.toArray(new BulkItemRequest[0]) - ); - bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); - bulkShardRequest.timeout(bulkRequest.timeout()); - bulkShardRequest.routedBasedOnClusterVersion(clusterState.version()); - if (task != null) { - bulkShardRequest.setParentTask(nodeId, task.getId()); - } - executeBulkShardRequest(bulkShardRequest, bulkItemRequestCompleteRefCount.acquire()); - } - } - } - - private void executeBulkShardRequest(BulkShardRequest bulkShardRequest, Releasable releaseOnFinish) { - client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>() { - @Override - public void onResponse(BulkShardResponse bulkShardResponse) { - for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { - // we may have no response if item failed - if (bulkItemResponse.getResponse() != null) { - bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); - } - responses.set(bulkItemResponse.getItemId(), bulkItemResponse); - } - releaseOnFinish.close(); - } - - @Override - public void onFailure(Exception e) { - // create failures for all relevant requests - for (BulkItemRequest request : bulkShardRequest.items()) { - final String indexName = request.index(); - DocWriteRequest docWriteRequest = request.request(); - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e); - responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure)); - } - releaseOnFinish.close(); - } - }); - } - - private boolean handleBlockExceptions(ClusterState state) { - ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); - if (blockException != null) { - if (blockException.retryable()) { - logger.trace("cluster is blocked, scheduling a retry", blockException); - retry(blockException); - } else { - onFailure(blockException); - } - return true; - } - return false; - } - - void retry(Exception failure) { - assert failure != null; - if (observer.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - onFailure(failure); - return; - } - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - /* - * This is called on the cluster state update thread pool - * but we'd prefer to coordinate the bulk request on the - * write thread pool just to make sure the cluster state - * update thread doesn't get clogged up. - */ - dispatchRetry(); - } - - @Override - public void onClusterServiceClose() { - onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - /* - * Try one more time.... This is called on the generic - * thread pool but out of an abundance of caution we - * switch over to the write thread pool that we expect - * to coordinate the bulk request. - */ - dispatchRetry(); - } - - private void dispatchRetry() { - threadPool.executor(executorName).submit(BulkOperation.this); - } - }); - } - - private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest request, int idx, final Metadata metadata) { - if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { - Exception exception = new IndexNotFoundException( - "[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true] and [" + request.index() + "] is not an alias", - request.index() - ); - addFailure(request, idx, exception); - return true; - } - return false; - } - - private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequest request, int idx, final Metadata metadata) { - if (request.isRequireDataStream() && (metadata.indexIsADataStream(request.index()) == false)) { - Exception exception = new ResourceNotFoundException( - "[" - + DocWriteRequest.REQUIRE_DATA_STREAM - + "] request flag is [true] and [" - + request.index() - + "] is not a data stream", - request.index() - ); - addFailure(request, idx, exception); - return true; - } - return false; - } - - private boolean addFailureIfIndexIsClosed(DocWriteRequest request, Index concreteIndex, int idx, final Metadata metadata) { - IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex); - if (indexMetadata.getState() == IndexMetadata.State.CLOSE) { - addFailure(request, idx, new IndexClosedException(concreteIndex)); - return true; - } - return false; - } - - private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest request, int idx) { - IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); - if (cannotCreate != null) { - addFailure(request, idx, cannotCreate); - return true; - } - return false; - } - - private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), unavailableException); - BulkItemResponse bulkItemResponse = BulkItemResponse.failure(idx, request.opType(), failure); - responses.set(idx, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(idx, null); - } - } - void executeBulk( Task task, BulkRequest bulkRequest, @@ -906,45 +600,20 @@ void executeBulk( AtomicArray responses, Map indicesThatCannotBeCreated ) { - new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run(); - } - - private static class ConcreteIndices { - private final ClusterState state; - private final IndexNameExpressionResolver indexNameExpressionResolver; - private final Map indexAbstractions = new HashMap<>(); - private final Map routings = new HashMap<>(); - - ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) { - this.state = state; - this.indexNameExpressionResolver = indexNameExpressionResolver; - } - - IndexAbstraction resolveIfAbsent(DocWriteRequest request) { - try { - IndexAbstraction indexAbstraction = indexAbstractions.get(request.index()); - if (indexAbstraction == null) { - indexAbstraction = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request); - indexAbstractions.put(request.index(), indexAbstraction); - } - return indexAbstraction; - } catch (IndexNotFoundException e) { - if (e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { - throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams", e); - } else { - throw e; - } - } - } - - IndexRouting routing(Index index) { - IndexRouting routing = routings.get(index); - if (routing == null) { - routing = IndexRouting.fromIndexMetadata(state.metadata().getIndexSafe(index)); - routings.put(index, routing); - } - return routing; - } + new BulkOperation( + task, + threadPool, + executorName, + clusterService, + bulkRequest, + client, + responses, + indicesThatCannotBeCreated, + indexNameExpressionResolver, + relativeTimeProvider, + startTimeNanos, + listener + ).run(); } private long relativeTime() { @@ -1010,137 +679,4 @@ public boolean isForceExecution() { ); } - static final class BulkRequestModifier implements Iterator> { - - final BulkRequest bulkRequest; - final SparseFixedBitSet failedSlots; - final List itemResponses; - final AtomicIntegerArray originalSlots; - - volatile int currentSlot = -1; - - BulkRequestModifier(BulkRequest bulkRequest) { - this.bulkRequest = bulkRequest; - this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); - this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); - this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok - } - - @Override - public DocWriteRequest next() { - return bulkRequest.requests().get(++currentSlot); - } - - @Override - public boolean hasNext() { - return (currentSlot + 1) < bulkRequest.requests().size(); - } - - BulkRequest getBulkRequest() { - if (itemResponses.isEmpty()) { - return bulkRequest; - } else { - BulkRequest modifiedBulkRequest = new BulkRequest(); - modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); - modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); - modifiedBulkRequest.timeout(bulkRequest.timeout()); - - int slot = 0; - List> requests = bulkRequest.requests(); - for (int i = 0; i < requests.size(); i++) { - DocWriteRequest request = requests.get(i); - if (failedSlots.get(i) == false) { - modifiedBulkRequest.add(request); - originalSlots.set(slot++, i); - } - } - return modifiedBulkRequest; - } - } - - ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { - if (itemResponses.isEmpty()) { - return actionListener.map( - response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis) - ); - } else { - return actionListener.map(response -> { - // these items are the responses from the subsequent bulk request, their 'slots' - // are not correct for this response we're building - final BulkItemResponse[] bulkResponses = response.getItems(); - - final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; - - // the item responses are from the original request, so their slots are correct. - // these are the responses for requests that failed early and were not passed on to the subsequent bulk. - for (BulkItemResponse item : itemResponses) { - allResponses[item.getItemId()] = item; - } - - // use the original slots for the responses from the bulk - for (int i = 0; i < bulkResponses.length; i++) { - allResponses[originalSlots.get(i)] = bulkResponses[i]; - } - - if (Assertions.ENABLED) { - assertResponsesAreCorrect(bulkResponses, allResponses); - } - - return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis); - }); - } - } - - private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) { - // check for an empty intersection between the ids - final Set failedIds = itemResponses.stream().map(BulkItemResponse::getItemId).collect(Collectors.toSet()); - final Set responseIds = IntStream.range(0, bulkResponses.length) - .map(originalSlots::get) // resolve subsequent bulk ids back to the original slots - .boxed() - .collect(Collectors.toSet()); - assert Sets.haveEmptyIntersection(failedIds, responseIds) - : "bulk item response slots cannot have failed and been processed in the subsequent bulk request, failed ids: " - + failedIds - + ", response ids: " - + responseIds; - - // check for the correct number of responses - final int expectedResponseCount = bulkRequest.requests.size(); - final int actualResponseCount = failedIds.size() + responseIds.size(); - assert expectedResponseCount == actualResponseCount - : "Expected [" + expectedResponseCount + "] responses, but found [" + actualResponseCount + "]"; - - // check that every response is present - for (int i = 0; i < allResponses.length; i++) { - assert allResponses[i] != null : "BulkItemResponse at index [" + i + "] was null"; - } - } - - synchronized void markItemAsFailed(int slot, Exception e) { - final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); - final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); - // We hit a error during preprocessing a request, so we: - // 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed - // 2) Add a bulk item failure for this request - // 3) Continue with the next request in the bulk. - failedSlots.set(slot); - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e); - itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure)); - } - - synchronized void markItemAsDropped(int slot) { - final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); - final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); - failedSlots.set(slot); - UpdateResponse dropped = new UpdateResponse( - new ShardId(docWriteRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), - id, - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - docWriteRequest.version(), - DocWriteResponse.Result.NOOP - ); - itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped)); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index 5cd1fde9edd9b..763dd87f76db3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -38,7 +38,7 @@ public void testBulkRequestModifier() { } // wrap the bulk request and fail some of the item requests at random - TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(bulkRequest); + BulkRequestModifier modifier = new BulkRequestModifier(bulkRequest); Set failedSlots = new HashSet<>(); for (int i = 0; modifier.hasNext(); i++) { modifier.next(); @@ -91,7 +91,7 @@ public void testPipelineFailures() { originalBulkRequest.add(new IndexRequest("index").id(String.valueOf(i))); } - TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(originalBulkRequest); + BulkRequestModifier modifier = new BulkRequestModifier(originalBulkRequest); final List failures = new ArrayList<>(); // iterate the requests in order, recording that half of them should be failures @@ -147,7 +147,7 @@ public void testNoFailures() { originalBulkRequest.add(new IndexRequest("index").id(String.valueOf(i))); } - TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(originalBulkRequest); + BulkRequestModifier modifier = new BulkRequestModifier(originalBulkRequest); while (modifier.hasNext()) { modifier.next(); } From d8ff2892cee6f8589668f0b969ef603e3360ecbb Mon Sep 17 00:00:00 2001 From: Navarone Feekery <13634519+navarone-feekery@users.noreply.github.com> Date: Thu, 1 Feb 2024 13:50:58 +0100 Subject: [PATCH 07/86] [Connectors API] Add new field `api_key_secret_id` to Connector (#104982) - Add api_key_secret_id field - Add update endpoint for api_key_id and api_key_secret_id --- docs/changelog/104982.yaml | 5 + .../api/connector.update_api_key_id.json | 38 +++++ .../341_connector_update_api_key_id.yml | 110 ++++++++++++ .../xpack/application/EnterpriseSearch.java | 5 + .../application/connector/Connector.java | 27 ++- .../connector/ConnectorIndexService.java | 28 ++++ .../RestUpdateConnectorApiKeyIdAction.java | 48 ++++++ ...ransportUpdateConnectorApiKeyIdAction.java | 55 ++++++ .../action/UpdateConnectorApiKeyIdAction.java | 158 ++++++++++++++++++ .../connector/ConnectorIndexServiceTests.java | 48 ++++++ .../connector/ConnectorTestUtils.java | 1 + .../application/connector/ConnectorTests.java | 69 +++++++- ...KeyIdActionRequestBWCSerializingTests.java | 50 ++++++ .../xpack/security/operator/Constants.java | 1 + 14 files changed, 641 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/104982.yaml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_api_key_id.json create mode 100644 x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/341_connector_update_api_key_id.yml create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestUpdateConnectorApiKeyIdAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportUpdateConnectorApiKeyIdAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorApiKeyIdAction.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/action/UpdateConnectorApiKeyIdActionRequestBWCSerializingTests.java diff --git a/docs/changelog/104982.yaml b/docs/changelog/104982.yaml new file mode 100644 index 0000000000000..62194aa68b80c --- /dev/null +++ b/docs/changelog/104982.yaml @@ -0,0 +1,5 @@ +pr: 104982 +summary: "[Connectors API] Add new field `api_key_secret_id` to Connector" +area: Application +type: enhancement +issues: [] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_api_key_id.json b/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_api_key_id.json new file mode 100644 index 0000000000000..5b58a7b5b59a5 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/connector.update_api_key_id.json @@ -0,0 +1,38 @@ +{ + "connector.update_api_key_id": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/connector-apis.html", + "description": "Updates the API key id and/or API key secret id fields in the connector document." + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_connector/{connector_id}/_api_key_id", + "methods": [ + "PUT" + ], + "parts": { + "connector_id": { + "type": "string", + "description": "The unique identifier of the connector to be updated." + } + } + } + ] + }, + "body": { + "description": "An object containing the connector's API key id and/or Connector Secret document id for that API key.", + "required": true + } + } +} diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/341_connector_update_api_key_id.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/341_connector_update_api_key_id.yml new file mode 100644 index 0000000000000..3d82c53acae50 --- /dev/null +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/341_connector_update_api_key_id.yml @@ -0,0 +1,110 @@ +setup: + - skip: + version: " - 8.12.99" + reason: Introduced in 8.13.0 + + - do: + connector.put: + connector_id: test-connector + body: + index_name: search-1-test + name: my-connector + language: pl + is_native: false + service_type: super-connector + +--- +"Update Connector Api Key Id": + - do: + connector.update_api_key_id: + connector_id: test-connector + body: + api_key_id: test-api-key-id + + + - match: { result: updated } + + - do: + connector.get: + connector_id: test-connector + + - match: { api_key_id: test-api-key-id } + +--- +"Update Connector Api Key Secret Id": + - do: + connector.update_api_key_id: + connector_id: test-connector + body: + api_key_secret_id: test-api-key-secret-id + + + - match: { result: updated } + + - do: + connector.get: + connector_id: test-connector + + - match: { api_key_secret_id: test-api-key-secret-id } + +--- +"Update Connector Api Key Id and Api Key Secret Id": + - do: + connector.update_api_key_id: + connector_id: test-connector + body: + api_key_id: test-api-key-id + api_key_secret_id: test-api-key-secret-id + + - match: { result: updated } + + - do: + connector.get: + connector_id: test-connector + + - match: { api_key_id: test-api-key-id } + - match: { api_key_secret_id: test-api-key-secret-id } + +--- +"Update Connector Api Key Id - 404 when connector doesn't exist": + - do: + catch: "missing" + connector.update_api_key_id: + connector_id: test-non-existent-connector + body: + api_key_id: test-api-key-id + api_key_secret_id: test-api-key-secret-id + +--- +"Update Connector Api Key Id - 400 status code when connector_id is empty": + - do: + catch: "bad_request" + connector.update_api_key_id: + connector_id: "" + body: + api_key_id: test-api-key-id + api_key_secret_id: test-api-key-secret-id + +--- +"Update Connector Api Key Id - 400 status code when both values are null": + - do: + catch: "bad_request" + connector.update_api_key_id: + connector_id: test-connector + body: + api_key_id: null + api_key_secret_id: null + + - match: { error.reason: "Validation Failed: 1: [api_key_id] and [api_key_secret_id] cannot both be [null]. Please provide a value for at least one of them.;" } + +--- +"Update Connector Api Key Id - 400 status code when payload is not string": + - do: + catch: "bad_request" + connector.update_api_key_id: + connector_id: test-connector + body: + api_key_id: + field_1: test + field_2: something + api_key_secret_id: test-api-key-secret-id diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java index 3933e7923d6b9..e8a7e9f564420 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.application.connector.action.RestListConnectorAction; import org.elasticsearch.xpack.application.connector.action.RestPostConnectorAction; import org.elasticsearch.xpack.application.connector.action.RestPutConnectorAction; +import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorApiKeyIdAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorConfigurationAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorFilteringAction; @@ -68,6 +69,7 @@ import org.elasticsearch.xpack.application.connector.action.TransportListConnectorAction; import org.elasticsearch.xpack.application.connector.action.TransportPostConnectorAction; import org.elasticsearch.xpack.application.connector.action.TransportPutConnectorAction; +import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorApiKeyIdAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorConfigurationAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorFilteringAction; @@ -78,6 +80,7 @@ import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorPipelineAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorSchedulingAction; import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorServiceTypeAction; +import org.elasticsearch.xpack.application.connector.action.UpdateConnectorApiKeyIdAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction; @@ -244,6 +247,7 @@ protected XPackLicenseState getLicenseState() { new ActionHandler<>(ListConnectorAction.INSTANCE, TransportListConnectorAction.class), new ActionHandler<>(PostConnectorAction.INSTANCE, TransportPostConnectorAction.class), new ActionHandler<>(PutConnectorAction.INSTANCE, TransportPutConnectorAction.class), + new ActionHandler<>(UpdateConnectorApiKeyIdAction.INSTANCE, TransportUpdateConnectorApiKeyIdAction.class), new ActionHandler<>(UpdateConnectorConfigurationAction.INSTANCE, TransportUpdateConnectorConfigurationAction.class), new ActionHandler<>(UpdateConnectorErrorAction.INSTANCE, TransportUpdateConnectorErrorAction.class), new ActionHandler<>(UpdateConnectorFilteringAction.INSTANCE, TransportUpdateConnectorFilteringAction.class), @@ -334,6 +338,7 @@ public List getRestHandlers( new RestListConnectorAction(), new RestPostConnectorAction(), new RestPutConnectorAction(), + new RestUpdateConnectorApiKeyIdAction(), new RestUpdateConnectorConfigurationAction(), new RestUpdateConnectorErrorAction(), new RestUpdateConnectorFilteringAction(), diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java index b7ddf560247ed..8bf6802a895be 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/Connector.java @@ -42,6 +42,7 @@ *