Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Enable default pipelines #32286

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with default pipeline":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While having rest tests is fine to ensure the param is passed through the rest layer correctly, we should first have unit tests.

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
index:
index: test
type: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

- do:
index:
index: test
type: test
id: 2
pipeline: ""
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -125,7 +127,27 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData != null) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
if (!defaultPipeline.isEmpty()) {
indexRequest.setPipeline(defaultPipeline);
hasIndexRequestsWithPipelines = true;
}
}
} else if (!pipeline.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer == false instead of ! as the latter is susceptible to being missed when reading through code.

hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ public final class IndexSettings {
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length",
1000, 1, Property.Dynamic, Property.IndexScope);

public static final Setting<String> DEFAULT_PIPELINE =
Setting.simpleString("index.default_pipeline", "", Property.Dynamic, Property.IndexScope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add validation to this setting so it is not empty. We can make the default the _none constant.


private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -293,6 +296,7 @@ public final class IndexSettings {
private volatile TimeValue searchIdleAfter;
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -408,6 +412,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -447,6 +452,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -822,4 +828,12 @@ public boolean isExplicitRefresh() {
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }

public String getDefaultPipeline() {
return defaultPipeline;
}

public void setDefaultPipeline(String defaultPipeline) {
this.defaultPipeline = defaultPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void doRun() throws Exception {
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(null);
indexRequest.setPipeline("");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this since null now triggers an empty loop because it causes the default pipeline to be set over and over.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what triggers this? I'm unsure about the original comment, but it seems like that should be fixed first, so that we can have null as meaning "no pipeline" in the index request. Empty string should be an error. We need some type of literal string (which we disallow from pipeline names) to mean "no pipeline". In the original default pipelines issue, I suggested _none or something like that. Using empty string is too easy to happen with a malformed request (like accidentally not serializing http params correctly into the request).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst the problem is that I need to somehow differentiate between "no pipeline so set default" (when the request first hits org.elasticsearch.action.bulk.TransportBulkAction#doExecute) and "no pipeline, period" (when it hits org.elasticsearch.action.bulk.TransportBulkAction#doExecute and I want to prevent the default to be applied again). Previously double execution here could be prevented by setting null, but now I used null to trigger setting the default and "" to mean that no default should be set (since that is also what I'd see if someone used a url ending in ?pipeline=).

So better move to _none for when we want to prevent setting a pipeline and also use the default on "" (i.e. URLs ending in ?pipeline=)? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So better move to _none for when we want to prevent setting a pipeline and also use the default on "" (i.e. URLs ending in ?pipeline=)? :)

Sorry I don't understand what you are suggesting here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst

simply:

?pipeline= means no pipeline is set i.e is equivalent to not ?pipeline parameter in the uri => apply default pipeline
?pipeline=_none => don't run any pipeline

sorry, seems it's too late for my writing skills :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel strongly ?pipeline= should be an error. It is too easy too have someone mean to override a pipeline, but then get the default (which is the opposite of the problem when using ?pipeline= to mean don't run any pipeline).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst I like it :) Error for ?pipeline= it is :) => on it

} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -45,6 +46,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
public void testNonExceptional() {
Expand Down Expand Up @@ -97,7 +99,11 @@ public void testSomeFail() {

private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -153,6 +154,7 @@ public void setupAction() {
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down