Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new require_alias option to indexing requests #58917

Merged
merged 18 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/reference/docs/index_.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ and <<update-delete-docs-in-a-backing-index>>.

`POST /<target>/_create/<_id>`

IMPORTANT: You cannot add new documents to a data stream using the
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
`PUT /<target>/_create/<_id>` format instead. See
IMPORTANT: You cannot add new documents to a data stream using the
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
`PUT /<target>/_create/<_id>` format instead. See
<<add-documents-to-a-data-stream>>.

[[docs-index-api-path-params]]
Expand Down Expand Up @@ -94,6 +94,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=version_type]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias]

[[docs-index-api-request-body]]
==== {api-request-body-title}

Expand Down
2 changes: 2 additions & 0 deletions docs/reference/docs/update.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=if_primary_term]
`lang`::
(Optional, string) The script language. Default: `painless`.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]

`retry_on_conflict`::
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,12 @@ such as `1264`.
A value of `-1` indicates {es} was unable to compute this number.
end::memory[]

tag::require-alias[]
`require_alias`::
(Optional, boolean) When true, this requires the destination to be an alias.
Defaults to false.
end::require-alias[]

tag::node-filter[]
`<node_filter>`::
(Optional, string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.List;
Expand Down Expand Up @@ -111,6 +113,9 @@ static void validateAgainstAliases(SearchRequest source, IndexRequest destinatio
return;
}
String target = destination.index();
if (destination.isRequireAlias() && (false == clusterState.getMetadata().hasAlias(target))) {
throw new IndexNotFoundException("[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true]", target);
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
}
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
/*
* If we're going to autocreate the index we don't need to resolve
Expand Down
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
"pipeline":{
"type":"string",
"description":"The pipeline id to preprocess incoming documents with"
},
"require_alias": {
"type": "boolean",
"description": "When true, requires destination to be an alias. Default is false"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
"if_primary_term":{
"type":"number",
"description":"only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"require_alias": {
"type": "boolean",
"description": "When true, requires destination is an alias. Default is false"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,43 @@
{"index": {"_index": "test_index", "_id": "test_id"}}
{"f1": "v1", "f2": 42}
{}

---
"When setting require_alias flag":
- skip:
# TODO adjust after backport
version: " - 7.99.99"
reason: "require_alias flag was added in version 7.9"

- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}
- do:
bulk:
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
refresh: true
body:
- index:
_index: new_index_not_created
require_alias: true
- f: 1
- index:
_index: new_index_created
- f: 2
- index:
_index: test_require_alias
require_alias: true
- f: 3
- create:
_index: test_require_alias
- f: 4
- match: { errors: true }
- match: { items.0.index.status: 404 }
- match: { items.0.index.error.type: index_not_found_exception }
- match: { items.0.index.error.reason: "no such index [new_index_not_created] and [require_alias] request flag is [true]" }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
"Set require_alias flag":
- skip:
# TODO adjust after backport
version: " - 7.99.99"
reason: "require_alias flag added in 7.9+"
- do:
catch: missing
index:
index: test_require_alias
require_alias: true
body: { foo: bar }

- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}

- do:
index:
index: test_require_alias
require_alias: true
body: { foo: bar }
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"Set require_alias flag":
- skip:
# TODO adjust after backport
version: " - 7.99.99"
reason: "require_alias flag added in 7.9+"
- do:
catch: missing
update:
index: test_1_autocreate
id: 1
require_alias: true
body:
doc: { foo: bar, count: 1 }
doc_as_upsert: true
- do:
update:
index: test_1_autocreate
id: 1
require_alias: false
body:
doc: { foo: bar, count: 1 }
doc_as_upsert: true
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
*/
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {

// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";

/**
* Set the index for this request
* @return the Request
Expand Down Expand Up @@ -142,6 +145,11 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
*/
OpType opType();

/**
* Should this request override specifically require the destination to be an alias?
* @return boolean flag, when true specifically requires an alias
*/
boolean isRequireAlias();
/**
* Requested operation type to perform on the document
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -62,6 +63,7 @@ public final class BulkRequestParser {
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;
Expand Down Expand Up @@ -166,6 +168,7 @@ public void parse(
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = false;

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
Expand Down Expand Up @@ -208,6 +211,8 @@ public void parse(
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
Expand Down Expand Up @@ -245,19 +250,22 @@ public void parse(
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
.version(version).versionType(versionType)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), type);
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType)
.setRequireAlias(requireAlias), type);
} else {
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
.version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), type);
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias), type);
}
} else if ("create".equals(action)) {
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
.version(version).versionType(versionType)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), type);
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias), type);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
Expand All @@ -266,6 +274,7 @@ public void parse(
UpdateRequest updateRequest = new UpdateRequest().index(index).id(id).routing(routing)
.retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,28 +222,29 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
if (needToCheck()) {
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
final Set<String> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
final Map<String, Boolean> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 && v2));
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
* that we'll use when we try to run the requests. */
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
for (String index : indices) {
for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
boolean shouldAutoCreate;
final String index = indexAndFlag.getKey();
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
// We should only auto create if we are not requiring it to be an alias
if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
autoCreateIndices.add(index);
}
}
Expand Down Expand Up @@ -513,6 +514,9 @@ protected void doRun() {
if (docWriteRequest == null) {
continue;
}
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
continue;
}
Expand Down Expand Up @@ -664,6 +668,16 @@ public void onTimeout(TimeValue timeout) {
});
}

private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> request, int idx, final Metadata metadata) {
if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) {
Exception exception = new IndexNotFoundException("[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true]",
request.index());
addFailure(request, idx, exception);
return true;
}
return false;
}

private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int idx, final ConcreteIndices concreteIndices,
final Metadata metadata) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ public OpType opType() {
return OpType.DELETE;
}

@Override
public boolean isRequireAlias() {
return false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private boolean isPipelineResolved;

private boolean requireAlias;

/**
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
* provided ID.
Expand Down Expand Up @@ -150,6 +152,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
requireAlias = in.readBoolean();
} else {
requireAlias = false;
}
}

public IndexRequest() {
Expand Down Expand Up @@ -656,6 +663,9 @@ private void writeBody(StreamOutput out) throws IOException {
}
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(requireAlias);
}
}

@Override
Expand Down Expand Up @@ -703,4 +713,14 @@ public long getAutoGeneratedTimestamp() {
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed());
}

@Override
public boolean isRequireAlias() {
return requireAlias;
}

public IndexRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
}
}
Loading