Skip to content

Commit

Permalink
add support for write index resolution when creating/updating documents
Browse files Browse the repository at this point in the history
This commit introduces a new option to IndicesOptions that requires
a write index to exist and then uses this option for index/update requests.

This commit also fixes a subtle issue with how write-indices are resolved
in Aliases. Before, all aliases pointing to one-and-only-one index had a
write index even if is_write_index=false. This should not be the case.
  • Loading branch information
talevy committed Jun 22, 2018
1 parent 7313a98 commit d2da8ec
Show file tree
Hide file tree
Showing 21 changed files with 282 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ public void testAliasesContainTarget() {

public void testTargetIsAlias() {
Exception e = expectThrows(IllegalArgumentException.class, () -> succeeds("target_multi", "foo"));
assertThat(e.getMessage(), containsString("Alias [target_multi] has more than one indices associated with it [["));
// The index names can come in either order
assertThat(e.getMessage(), containsString("target"));
assertThat(e.getMessage(), containsString("target2"));
assertThat(e.getMessage(), containsString("Alias [target_multi] points to multiple indices"));
}

public void testRemoteInfoSkipsValidation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final Set<String> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -300,7 +300,7 @@ protected void doRun() throws Exception {
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.routing(), docWriteRequest.index(), true));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index(), false));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.request().type())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void doExecute(final MultiGetRequest request, final ActionListener<Mul
try {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName();

item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index()));
item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index(), false));
if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex, item.type()))) {
String message = "routing is required for [" + concreteSingleIndex + "]/[" + item.type() + "]/[" + item.id() + "]";
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new IllegalArgumentException(message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
Expand Down Expand Up @@ -188,6 +189,11 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

/**
* The content type. This will be used when generating a document from user provided objects like Maps and when parsing the
* source at index time
Expand Down Expand Up @@ -496,7 +502,7 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi

/* resolve the routing if needed */
public void resolveRouting(MetaData metaData) {
routing(metaData.resolveIndexRouting(routing, index));
routing(metaData.resolveIndexRouting(routing, index, true));
}

@Override
Expand Down Expand Up @@ -603,5 +609,4 @@ public long getAutoGeneratedTimestamp() {
public IndexRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on IndexRequest");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public enum Option {
IGNORE_ALIASES,
ALLOW_NO_INDICES,
FORBID_ALIASES_TO_MULTIPLE_INDICES,
FORBID_CLOSED_INDICES;
FORBID_CLOSED_INDICES,
REQUIRE_ALIASES_TO_WRITE_INDEX;

public static final EnumSet<Option> NONE = EnumSet.noneOf(Option.class);
}
Expand All @@ -93,6 +94,9 @@ public enum Option {
public static final IndicesOptions STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES, Option.FORBID_CLOSED_INDICES),
EnumSet.noneOf(WildcardStates.class));
public static final IndicesOptions STRICT_ALIAS_TO_WRITE_INDEX_NO_EXPAND_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.REQUIRE_ALIASES_TO_WRITE_INDEX, Option.FORBID_CLOSED_INDICES),
EnumSet.noneOf(WildcardStates.class));

private final EnumSet<Option> options;
private final EnumSet<WildcardStates> expandWildcards;
Expand Down Expand Up @@ -231,6 +235,13 @@ public boolean allowAliasesToMultipleIndices() {
return options.contains(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES) == false;
}

/**
* @return whether aliases pointing to a write index should resolve to that index
*/
public boolean requireAliasesToWriteIndex() {
return options.contains(Option.REQUIRE_ALIASES_TO_WRITE_INDEX);
}

/**
* @return whether aliases should be ignored (when resolving a wildcard)
*/
Expand Down Expand Up @@ -375,6 +386,14 @@ public static IndicesOptions strictSingleIndexNoExpandForbidClosed() {
return STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

/**
* @return indices option that requires each specified index or alias to exist, doesn't expand wildcards and
* throws error if any of the aliases resolves to multiple indices with none specified as a write-index
*/
public static IndicesOptions strictAliasToWriteIndexNoExpandForbidClosed() {
return STRICT_ALIAS_TO_WRITE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

/**
* @return indices options that ignores unavailable indices, expands wildcards only to open indices and
* allows that no indices are resolved from wildcard expressions (not returning an error).
Expand Down Expand Up @@ -413,6 +432,7 @@ public String toString() {
", allow_aliases_to_multiple_indices=" + allowAliasesToMultipleIndices() +
", forbid_closed_indices=" + forbidClosedIndices() +
", ignore_aliases=" + ignoreAliases() +
", require_aliases_to_write_index=" + requireAliasesToWriteIndex() +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -58,6 +59,11 @@ public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void doExecute(final MultiTermVectorsRequest request, final ActionList
Map<ShardId, MultiTermVectorsShardRequest> shardRequests = new HashMap<>();
for (int i = 0; i < request.requests.size(); i++) {
TermVectorsRequest termVectorsRequest = request.requests.get(i);
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index()));
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index(), false));
if (!clusterState.metaData().hasConcreteIndex(termVectorsRequest.index())) {
responses.set(i, new MultiTermVectorsItemResponse(null, new MultiTermVectorsResponse.Failure(termVectorsRequest.index(),
termVectorsRequest.type(), termVectorsRequest.id(), new IndexNotFoundException(termVectorsRequest.index()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected boolean resolveIndex(TermVectorsRequest request) {
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias or a parent)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index(), false));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.request().type())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void resolveRequest(ClusterState state, UpdateRequest request) {
}

public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
request.routing((metaData.resolveIndexRouting(request.routing(), request.index())));
request.routing((metaData.resolveIndexRouting(request.routing(), request.index(), true)));
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
Expand Down Expand Up @@ -169,6 +170,11 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

/**
* The type of the indexed document.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,18 @@ void addIndex(IndexMetaData indexMetaData) {
}

public void computeAndValidateWriteIndex() {
List<IndexMetaData> writeIndices = referenceIndexMetaDatas.stream()
.filter(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).writeIndex()))
.collect(Collectors.toList());
if (referenceIndexMetaDatas.size() == 1) {
writeIndex.set(referenceIndexMetaDatas.get(0));
} else if (writeIndices.size() == 1) {
final List<IndexMetaData> writeIndices;
if (referenceIndexMetaDatas.size() > 1) {
writeIndices = referenceIndexMetaDatas.stream()
.filter(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).writeIndex()))
.collect(Collectors.toList());
} else if(Boolean.FALSE.equals(referenceIndexMetaDatas.get(0).getAliases().get(aliasName).writeIndex()) == false) {
writeIndices = Collections.singletonList(referenceIndexMetaDatas.get(0));
} else {
writeIndices = Collections.emptyList();
}

if (writeIndices.size() == 1) {
writeIndex.set(writeIndices.get(0));
} else if (writeIndices.size() > 1) {
List<String> writeIndicesStrings = writeIndices.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,29 +194,43 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}

Collection<IndexMetaData> resolvedIndices = aliasOrIndex.getIndices();
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) {

if (aliasOrIndex.isAlias() && options.requireAliasesToWriteIndex()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
IndexMetaData writeIndex = alias.getWriteIndex();
if (writeIndex == null) {
if (alias.getIndices().size() > 1) {
throw new IllegalArgumentException("Alias [" + alias.getAliasName() +
"] points to multiple indices with none set as a write-index [is_write_index=true]");
} else {
throw new IllegalArgumentException("Alias [" + alias.getAliasName() + "] points to an index ["
+ alias.getIndices().get(0).getIndex().getName() + "] with [is_write_index=false]");
}
}
concreteIndices.add(writeIndex.getIndex());
} else if (resolvedIndices.size() > 1 && options.allowAliasesToMultipleIndices() == false) {
String[] indexNames = new String[resolvedIndices.size()];
int i = 0;
for (IndexMetaData indexMetaData : resolvedIndices) {
indexNames[i++] = indexMetaData.getIndex().getName();
}
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
Arrays.toString(indexNames) + "], can't execute a single index op");
}

for (IndexMetaData index : resolvedIndices) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
Arrays.toString(indexNames) + "], can't execute a single index op");
} else {
for (IndexMetaData index : resolvedIndices) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
}
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public String[] getConcreteAllClosedIndices() {
*/
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
// in the index,bulk,update and delete apis.
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex, boolean isWriteOperation) {
if (aliasOrIndex == null) {
return routing;
}
Expand All @@ -487,10 +487,15 @@ public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex)
return routing;
}
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result;
if (result.getIndices().size() > 1) {
if ((isWriteOperation && alias.getWriteIndex() == null) || (isWriteOperation == false && result.getIndices().size() > 1)) {
rejectSingleIndexOperation(aliasOrIndex, result);
}
AliasMetaData aliasMd = alias.getFirstAliasMetaData();
final AliasMetaData aliasMd;
if (isWriteOperation) {
aliasMd = alias.getWriteIndex().getAliases().get(alias.getAliasName());
} else {
aliasMd = alias.getFirstAliasMetaData();
}
if (aliasMd.indexRouting() != null) {
if (aliasMd.indexRouting().indexOf(',') != -1) {
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" + aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
Expand Down
Loading

0 comments on commit d2da8ec

Please sign in to comment.