diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 578bf35cb2446..3c8d8e9abf2bf 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -271,28 +271,6 @@ POST test/_doc/1?pipeline=drop_guests_network // CONSOLE // TEST[continued] -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/1 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "1", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - Thanks to the `?.` operator the following document will not throw an error. If the pipeline used a `.` the following document would throw a NullPointerException since the `network` object is not part of the source document. @@ -392,28 +370,6 @@ POST test/_doc/3?pipeline=drop_guests_network // CONSOLE // TEST[continued] -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/3 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "3", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - The `?.` operators works well for use in the `if` conditional because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator] returns null if the object is null and `==` is null safe (as well as many other @@ -511,28 +467,6 @@ POST test/_doc/1?pipeline=not_prod_dropper The document is <> since `prod` (case insensitive) is not found in the tags. -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/1 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "1", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - The following document is indexed (i.e. not dropped) since `prod` (case insensitive) is found in the tags. diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml index d1bb3b063a7c4..77a1df81a296a 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -91,4 +91,4 @@ teardown: get: index: test id: 3 -- match: { found: false } + diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml new file mode 100644 index 0000000000000..bb2677f9b193f --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -0,0 +1,119 @@ +--- +teardown: +- do: + ingest.delete_pipeline: + id: "retarget" + ignore: 404 + +- do: + indices.delete: + index: foo + +--- +"Test Change Target Index with Explicit Pipeline": + +- do: + ingest.put_pipeline: + id: "retarget" + body: > + { + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "foo" + } + } + ] + } +- match: { acknowledged: true } + +# no indices +- do: + cat.indices: {} + +- match: + $body: | + /^$/ + +- do: + index: + index: test + id: 1 + pipeline: "retarget" + body: { + a: true + } + +- do: + get: + index: foo + id: 1 +- match: { _source.a: true } + +# only the foo index +- do: + cat.indices: + h: i + +- match: + $body: | + /^foo\n$/ + +--- +"Test Change Target Index with Default Pipeline": + +- do: + indices.put_template: + name: index_template + body: + index_patterns: test + settings: + default_pipeline: "retarget" + +- do: + ingest.put_pipeline: + id: "retarget" + body: > + { + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "foo" + } + } + ] + } +- match: { acknowledged: true } + +# no indices +- do: + cat.indices: {} + +- match: + $body: | + /^$/ + +- do: + index: + index: test + id: 1 + body: { + a: true + } + +- do: + get: + index: foo + id: 1 +- match: { _source.a: true } + +# only the foo index +- do: + cat.indices: + h: i + +- match: + $body: | + /^foo\n$/ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 21bb539676d91..9adc92e02bedb 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -47,11 +47,14 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -151,6 +154,72 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener responses = new AtomicArray<>(bulkRequest.requests.size()); + boolean hasIndexRequestsWithPipelines = false; + final MetaData metaData = clusterService.state().getMetaData(); + ImmutableOpenMap indicesMetaData = metaData.indices(); + for (DocWriteRequest actionRequest : bulkRequest.requests) { + IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if (indexRequest != null) { + // get pipeline from request + String pipeline = indexRequest.getPipeline(); + if (pipeline == null) { + // start to look for default pipeline via settings found in the index meta data + IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); + if (indexMetaData == null && indexRequest.index() != null) { + // if the write request if through an alias use the write index's meta data + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } + } + if (indexMetaData != null) { + // Find the the default pipeline if one is defined from and existing index. + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } else if (indexRequest.index() != null) { + // No index exists yet (and is valid request), so matching index templates to look for a default pipeline + List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); + assert (templates != null); + String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; + // order of templates are highest order first, break if we find a default_pipeline + for (IndexTemplateMetaData template : templates) { + final Settings settings = template.settings(); + if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + break; + } + } + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } + + if (hasIndexRequestsWithPipelines) { + // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but + // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, + // this path is never taken. + try { + if (clusterService.localNode().isIngestNode()) { + processBulkIndexIngestRequest(task, bulkRequest, listener); + } else { + ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); + } + } catch (Exception e) { + listener.onFailure(e); + } + return; + } + if (needToCheck()) { // Attempt to create all the indices that we're going to need during the bulk before we start. // Step 1: collect all the indices in the request @@ -181,7 +250,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener { + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }), responses, indicesThatCannotBeCreated); @@ -215,56 +284,7 @@ public void onFailure(Exception e) { } } } else { - executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); - } - } - - private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, - final ActionListener listener, final AtomicArray responses, - Map indicesThatCannotBeCreated) { - boolean hasIndexRequestsWithPipelines = false; - final MetaData metaData = clusterService.state().getMetaData(); - ImmutableOpenMap indicesMetaData = metaData.indices(); - for (DocWriteRequest actionRequest : bulkRequest.requests) { - IndexRequest indexRequest = getIndexWriteRequest(actionRequest); - if(indexRequest != null){ - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); - if (indexMetaData == null && indexRequest.index() != null) { - //check the alias - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } - } - if (indexMetaData == null) { - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - } else { - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } - if (hasIndexRequestsWithPipelines) { - try { - if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, listener); - } else { - ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); - } - } catch (Exception e) { - listener.onFailure(e); - } - } else { - executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); + executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 219aee9ebe2ff..a13e8af919b2a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,6 +58,7 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -460,7 +462,7 @@ public void testUseDefaultPipelineWithBulkUpsert() throws Exception { verifyZeroInteractions(transportService); } - public void testCreateIndexBeforeRunPipeline() throws Exception { + public void testDoExecuteCalledTwiceCorrectly() throws Exception { Exception exception = new Exception("fake exception"); IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); indexRequest.setPipeline("testpipeline"); @@ -478,20 +480,76 @@ public void testCreateIndexBeforeRunPipeline() throws Exception { // check failure works, and passes through to the listener assertFalse(action.isExecuted); // haven't executed yet + assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); + assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); assertTrue(action.isExecuted); + assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); } + public void testNotFindDefaultPipelineFromTemplateMatches(){ + Exception exception = new Exception("fake exception"); + IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> responseCalled.set(true), + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline()); + verifyZeroInteractions(ingestService); + + } + + public void testFindDefaultPipelineFromTemplateMatch(){ + Exception exception = new Exception("fake exception"); + ClusterState state = clusterService.state(); + + ImmutableOpenMap.Builder templateMetaDataBuilder = ImmutableOpenMap.builder(); + templateMetaDataBuilder.put("template1", IndexTemplateMetaData.builder("template1").patterns(Arrays.asList("missing_index")) + .order(1).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline1").build()).build()); + templateMetaDataBuilder.put("template2", IndexTemplateMetaData.builder("template2").patterns(Arrays.asList("missing_*")) + .order(2).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build()).build()); + templateMetaDataBuilder.put("template3", IndexTemplateMetaData.builder("template3").patterns(Arrays.asList("missing*")) + .order(3).build()); + templateMetaDataBuilder.put("template4", IndexTemplateMetaData.builder("template4").patterns(Arrays.asList("nope")) + .order(4).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline4").build()).build()); + + MetaData metaData = mock(MetaData.class); + when(state.metaData()).thenReturn(metaData); + when(state.getMetaData()).thenReturn(metaData); + when(metaData.templates()).thenReturn(templateMetaDataBuilder.build()); + when(metaData.getTemplates()).thenReturn(templateMetaDataBuilder.build()); + when(metaData.indices()).thenReturn(ImmutableOpenMap.of()); + + IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> responseCalled.set(true), + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + assertEquals("pipeline2", indexRequest.getPipeline()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + } + private void validateDefaultPipeline(IndexRequest indexRequest) { Exception exception = new Exception("fake exception"); indexRequest.source(Collections.emptyMap());