diff --git a/plugins/delete-by-query/pom.xml b/plugins/delete-by-query/pom.xml new file mode 100644 index 0000000000000..2f22427f51d24 --- /dev/null +++ b/plugins/delete-by-query/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.elasticsearch.plugin + elasticsearch-delete-by-query + + jar + Elasticsearch Delete By Query plugin + The Delete By Query plugin allows to delete documents in Elasticsearch with a single query. + + + org.elasticsearch + elasticsearch-plugin + 2.0.0-SNAPSHOT + + + + + 1 + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + + diff --git a/plugins/delete-by-query/src/main/assemblies/plugin.xml b/plugins/delete-by-query/src/main/assemblies/plugin.xml new file mode 100644 index 0000000000000..d5a4e719ce8cc --- /dev/null +++ b/plugins/delete-by-query/src/main/assemblies/plugin.xml @@ -0,0 +1,34 @@ + + + + + plugin + + zip + + false + + + / + true + true + + org.elasticsearch:elasticsearch + + + + / + true + true + + + \ No newline at end of file diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryModule.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryModule.java new file mode 100644 index 0000000000000..255336e695a88 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryModule.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery; + +import org.elasticsearch.action.ActionModule; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.PreProcessModule; +import org.elasticsearch.deletebyquery.action.DeleteByQueryAction; +import org.elasticsearch.deletebyquery.action.TransportDeleteByQueryAction; +import org.elasticsearch.deletebyquery.rest.RestDeleteByQueryAction; +import org.elasticsearch.rest.RestModule; + +public class DeleteByQueryModule extends AbstractModule implements PreProcessModule { + + @Override + public void processModule(Module module) { + if (module instanceof RestModule) { + RestModule restModule = (RestModule) module; + restModule.addRestAction(RestDeleteByQueryAction.class); + } + if (module instanceof ActionModule) { + ActionModule actionModule = (ActionModule) module; + actionModule.registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class); + } + } + + @Override + protected void configure() { + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryPlugin.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryPlugin.java new file mode 100644 index 0000000000000..052c810e10832 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/DeleteByQueryPlugin.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery; + +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.AbstractPlugin; + +import java.util.ArrayList; +import java.util.Collection; + +public class DeleteByQueryPlugin extends AbstractPlugin { + + public static final String NAME = "delete-by-query"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "Elasticsearch Delete-By-Query Plugin"; + } + + @Override + public Collection modules(Settings settings) { + Collection modules = new ArrayList<>(); + modules.add(new DeleteByQueryModule()); + return modules; + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryAction.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryAction.java new file mode 100644 index 0000000000000..20c19630924bb --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class DeleteByQueryAction extends Action { + + public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction(); + public static final String NAME = "indices:data/write/delete/by_query"; + + private DeleteByQueryAction() { + super(NAME); + } + + @Override + public DeleteByQueryResponse newResponse() { + return new DeleteByQueryResponse(); + } + + @Override + public DeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new DeleteByQueryRequestBuilder(client, this); + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequest.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequest.java new file mode 100644 index 0000000000000..762059703b3d2 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequest.java @@ -0,0 +1,264 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.ElasticsearchGenerationException; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.QuerySourceBuilder; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.search.Scroll; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.search.Scroll.readScroll; + +public class DeleteByQueryRequest extends ActionRequest implements IndicesRequest.Replaceable { + + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); + + private String[] types = Strings.EMPTY_ARRAY; + + private BytesReference source; + + private String routing; + + private Integer size = 1_000; + + private Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10)); + + private Long timeout; + + public DeleteByQueryRequest() { + } + + /** + * Constructs a new delete by query request to run against the provided indices. No indices means + * it will run against all indices. + */ + public DeleteByQueryRequest(String... indices) { + this.indices = indices; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (source == null) { + validationException = addValidationError("source is missing", validationException); + } + return validationException; + } + + @Override + public String[] indices() { + return this.indices; + } + + @Override + public DeleteByQueryRequest indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public DeleteByQueryRequest indicesOptions(IndicesOptions indicesOptions) { + if (indicesOptions == null) { + throw new IllegalArgumentException("IndicesOptions must not be null"); + } + this.indicesOptions = indicesOptions; + return this; + } + + public String[] types() { + return this.types; + } + + public DeleteByQueryRequest types(String... types) { + this.types = types; + return this; + } + + public BytesReference source() { + return source; + } + + public DeleteByQueryRequest source(QuerySourceBuilder sourceBuilder) { + this.source = sourceBuilder.buildAsBytes(Requests.CONTENT_TYPE); + return this; + } + + public DeleteByQueryRequest source(Map querySource) { + try { + XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE); + builder.map(querySource); + return source(builder); + } catch (IOException e) { + throw new ElasticsearchGenerationException("Failed to generate [" + querySource + "]", e); + } + } + + public DeleteByQueryRequest source(XContentBuilder builder) { + this.source = builder.bytes(); + return this; + } + + public DeleteByQueryRequest source(String querySource) { + this.source = new BytesArray(querySource); + return this; + } + + public DeleteByQueryRequest source(byte[] querySource) { + return source(querySource, 0, querySource.length); + } + + public DeleteByQueryRequest source(byte[] querySource, int offset, int length) { + return source(new BytesArray(querySource, offset, length)); + } + + public DeleteByQueryRequest source(BytesReference querySource) { + this.source = querySource; + return this; + } + + public String routing() { + return this.routing; + } + + public DeleteByQueryRequest routing(String routing) { + this.routing = routing; + return this; + } + + public DeleteByQueryRequest routing(String... routings) { + this.routing = Strings.arrayToCommaDelimitedString(routings); + return this; + } + + public DeleteByQueryRequest size(int size) { + this.size = size; + return this; + } + + public Integer size() { + return size; + } + + + public Scroll scroll() { + return scroll; + } + + public DeleteByQueryRequest scroll(Scroll scroll) { + this.scroll = scroll; + return this; + } + + public DeleteByQueryRequest scroll(TimeValue keepAlive) { + return scroll(new Scroll(keepAlive)); + } + + public DeleteByQueryRequest scroll(String keepAlive) { + return scroll(new Scroll(TimeValue.parseTimeValue(keepAlive, null, getClass().getSimpleName() + ".keepAlive"))); + } + + public TimeValue timeout() { + return timeout != null ? new TimeValue(timeout) : null; + } + + public DeleteByQueryRequest timeout(TimeValue timeout) { + this.timeout = timeout.millis(); + return this; + } + + public DeleteByQueryRequest timeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout").millis(); + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + types = in.readStringArray(); + source = in.readBytesReference(); + routing = in.readOptionalString(); + size = in.readVInt(); + if (in.readBoolean()) { + scroll = readScroll(in); + } + if (in.readBoolean()) { + timeout = in.readVLong(); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeStringArray(types); + out.writeBytesReference(source); + out.writeOptionalString(routing); + out.writeVInt(size); + if (scroll == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + scroll.writeTo(out); + } + if (timeout == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVLong(timeout); + } + } + + @Override + public String toString() { + String sSource = "_na_"; + try { + sSource = XContentHelper.convertToJson(source, false); + } catch (Exception e) { + // ignore + } + return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], source[" + sSource + "]"; + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequestBuilder.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequestBuilder.java new file mode 100644 index 0000000000000..f0e567f547c06 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryRequestBuilder.java @@ -0,0 +1,186 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.QuerySourceBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; + +import java.util.Map; + +public class DeleteByQueryRequestBuilder extends ActionRequestBuilder { + + private QuerySourceBuilder sourceBuilder; + + public DeleteByQueryRequestBuilder(ElasticsearchClient client, DeleteByQueryAction action) { + super(client, action, new DeleteByQueryRequest()); + } + + public DeleteByQueryRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and wildcard indices expressions. + *

+ * For example indices that don't exist. + */ + public DeleteByQueryRequestBuilder setIndicesOptions(IndicesOptions options) { + request.indicesOptions(options); + return this; + } + + /** + * The query used to delete documents. + * + * @see org.elasticsearch.index.query.QueryBuilders + */ + public DeleteByQueryRequestBuilder setQuery(QueryBuilder queryBuilder) { + sourceBuilder().setQuery(queryBuilder); + return this; + } + + /** + * The query binary used to delete documents. + */ + public DeleteByQueryRequestBuilder setQuery(BytesReference queryBinary) { + sourceBuilder().setQuery(queryBinary); + return this; + } + + /** + * Constructs a new builder with a raw search query. + */ + public DeleteByQueryRequestBuilder setQuery(XContentBuilder query) { + return setQuery(query.bytes()); + } + + /** + * A comma separated list of routing values to control the shards the action will be executed on. + */ + public DeleteByQueryRequestBuilder setRouting(String routing) { + request.routing(routing); + return this; + } + + /** + * The routing values to control the shards that the action will be executed on. + */ + public DeleteByQueryRequestBuilder setRouting(String... routing) { + request.routing(routing); + return this; + } + + /** + * The number of documents to delete at once. + */ + public DeleteByQueryRequestBuilder setSize(int size) { + request.size(size); + return this; + } + + /** + * The source to execute. It is preferable to use either {@link #setSource(byte[])} + * or {@link #setQuery(QueryBuilder)}. + */ + public DeleteByQueryRequestBuilder setSource(String source) { + request().source(source); + return this; + } + + /** + * The source to execute in the form of a map. + */ + public DeleteByQueryRequestBuilder setSource(Map source) { + request().source(source); + return this; + } + + /** + * The source to execute in the form of a builder. + */ + public DeleteByQueryRequestBuilder setSource(XContentBuilder builder) { + request().source(builder); + return this; + } + + /** + * The source to execute. + */ + public DeleteByQueryRequestBuilder setSource(byte[] source) { + request().source(source); + return this; + } + + /** + * The source to execute. + */ + public DeleteByQueryRequestBuilder setSource(BytesReference source) { + request().source(source); + return this; + } + + /** + * An optional timeout to control how long the delete by query is allowed to take. + */ + public DeleteByQueryRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * An optional timeout to control how long the delete by query is allowed to take. + */ + public DeleteByQueryRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + + /** + * The types of documents the query will run against. Defaults to all types. + */ + public DeleteByQueryRequestBuilder setTypes(String... types) { + request.types(types); + return this; + } + + @Override + public ListenableActionFuture execute() { + if (sourceBuilder != null) { + request.source(sourceBuilder); + } + return super.execute(); + } + + private QuerySourceBuilder sourceBuilder() { + if (sourceBuilder == null) { + sourceBuilder = new QuerySourceBuilder(); + } + return sourceBuilder; + } + +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryResponse.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryResponse.java new file mode 100644 index 0000000000000..4179c8d175b5c --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/DeleteByQueryResponse.java @@ -0,0 +1,202 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; + +/** + * Delete by query response + */ +public class DeleteByQueryResponse extends ActionResponse implements ToXContent { + + private long tookInMillis; + private boolean timedOut = false; + + private long found; + private long deleted; + private long missing; + private long failed; + + private IndexDeleteByQueryResponse[] indices = IndexDeleteByQueryResponse.EMPTY_ARRAY; + private ShardOperationFailedException[] shardFailures = ShardSearchFailure.EMPTY_ARRAY; + + DeleteByQueryResponse() { + } + + DeleteByQueryResponse(long tookInMillis, boolean timedOut, long found, long deleted, long missing, long failed, IndexDeleteByQueryResponse[] indices, ShardOperationFailedException[] shardFailures) { + this.tookInMillis = tookInMillis; + this.timedOut = timedOut; + this.found = found; + this.deleted = deleted; + this.missing = missing; + this.failed = failed; + this.indices = indices; + this.shardFailures = shardFailures; + } + + /** + * The responses from all the different indices. + */ + public IndexDeleteByQueryResponse[] getIndices() { + return indices; + } + + /** + * The response of a specific index. + */ + public IndexDeleteByQueryResponse getIndex(String index) { + if (index == null) { + return null; + } + for (IndexDeleteByQueryResponse i : indices) { + if (index.equals(i.getIndex())) { + return i; + } + } + return null; + } + + public TimeValue getTook() { + return new TimeValue(tookInMillis); + } + + public long getTookInMillis() { + return tookInMillis; + } + + public boolean isTimedOut() { + return this.timedOut; + } + + public long getTotalFound() { + return found; + } + + public long getTotalDeleted() { + return deleted; + } + + public long getTotalMissing() { + return missing; + } + + public long getTotalFailed() { + return failed; + } + + public ShardOperationFailedException[] getShardFailures() { + return shardFailures; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tookInMillis = in.readVLong(); + timedOut = in.readBoolean(); + found = in.readVLong(); + deleted = in.readVLong(); + missing = in.readVLong(); + failed = in.readVLong(); + + int size = in.readVInt(); + indices = new IndexDeleteByQueryResponse[size]; + for (int i = 0; i < size; i++) { + IndexDeleteByQueryResponse index = new IndexDeleteByQueryResponse(); + index.readFrom(in); + indices[i] = index; + } + + size = in.readVInt(); + if (size == 0) { + shardFailures = ShardSearchFailure.EMPTY_ARRAY; + } else { + shardFailures = new ShardSearchFailure[size]; + for (int i = 0; i < shardFailures.length; i++) { + shardFailures[i] = readShardSearchFailure(in); + } + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(tookInMillis); + out.writeBoolean(timedOut); + out.writeVLong(found); + out.writeVLong(deleted); + out.writeVLong(missing); + out.writeVLong(failed); + + out.writeVInt(indices.length); + for (IndexDeleteByQueryResponse indexResponse : indices) { + indexResponse.writeTo(out); + } + + out.writeVInt(shardFailures.length); + for (ShardOperationFailedException shardSearchFailure : shardFailures) { + shardSearchFailure.writeTo(out); + } + } + + + static final class Fields { + static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out"); + static final XContentBuilderString INDICES = new XContentBuilderString("_indices"); + static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOOK, tookInMillis); + builder.field(Fields.TIMED_OUT, timedOut); + + builder.startObject(Fields.INDICES); + IndexDeleteByQueryResponse all = new IndexDeleteByQueryResponse("_all", found, deleted, missing, failed); + all.toXContent(builder, params); + for (IndexDeleteByQueryResponse indexResponse : indices) { + indexResponse.toXContent(builder, params); + } + builder.endObject(); + + builder.startArray(Fields.FAILURES); + if (shardFailures != null) { + for (ShardOperationFailedException shardFailure : shardFailures) { + builder.startObject(); + shardFailure.toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + return builder; + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/IndexDeleteByQueryResponse.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/IndexDeleteByQueryResponse.java new file mode 100644 index 0000000000000..6d50814e2a052 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/IndexDeleteByQueryResponse.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + * Delete by query response executed on a specific index. + */ +public class IndexDeleteByQueryResponse extends ActionResponse implements ToXContent { + + public static final IndexDeleteByQueryResponse[] EMPTY_ARRAY = new IndexDeleteByQueryResponse[0]; + + private String index; + + private long found; + private long deleted; + private long missing; + private long failed; + + IndexDeleteByQueryResponse() { + } + + IndexDeleteByQueryResponse(String index) { + super(); + this.index = index; + } + + public IndexDeleteByQueryResponse(String index, long found, long deleted, long missing, long failed) { + this.index = index; + this.found = found; + this.deleted = deleted; + this.missing = missing; + this.failed = failed; + } + + public String getIndex() { + return this.index; + } + + public long getFound() { + return found; + } + + public void incrementFound() { + incrementFound(1L); + } + + public void incrementFound(long delta) { + this.found = found + delta; + } + + public long getDeleted() { + return deleted; + } + + public void incrementDeleted() { + incrementDeleted(1L); + } + + public void incrementDeleted(long delta) { + this.deleted = deleted + delta; + } + + public long getMissing() { + return missing; + } + + public void incrementMissing() { + incrementMissing(1L); + } + + public void incrementMissing(long delta) { + this.missing = missing + delta; + } + + public long getFailed() { + return failed; + } + + public void incrementFailed() { + incrementFailed(1L); + } + + public void incrementFailed(long delta) { + this.failed = failed + delta; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + } + + static final class Fields { + static final XContentBuilderString FOUND = new XContentBuilderString("found"); + static final XContentBuilderString DELETED = new XContentBuilderString("deleted"); + static final XContentBuilderString MISSING = new XContentBuilderString("missing"); + static final XContentBuilderString FAILED = new XContentBuilderString("failed"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(index); + builder.field(Fields.FOUND, found); + builder.field(Fields.DELETED, deleted); + builder.field(Fields.MISSING, missing); + builder.field(Fields.FAILED, failed); + builder.endObject(); + return builder; + } +} \ No newline at end of file diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/TransportDeleteByQueryAction.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/TransportDeleteByQueryAction.java new file mode 100644 index 0000000000000..f1d42ace23e34 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/action/TransportDeleteByQueryAction.java @@ -0,0 +1,454 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.*; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class TransportDeleteByQueryAction extends HandledTransportAction { + + private final ThreadPool threadPool; + private final TransportSearchAction searchAction; + private final TransportSearchScrollAction scrollAction; + private final Client client; + + @Inject + protected TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, Client client, + TransportSearchAction transportSearchAction, + TransportSearchScrollAction transportSearchScrollAction, + TransportService transportService, ActionFilters actionFilters) { + super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, DeleteByQueryRequest.class); + this.threadPool = threadPool; + this.searchAction = transportSearchAction; + this.scrollAction = transportSearchScrollAction; + this.client = client; + } + + @Override + protected void doExecute(DeleteByQueryRequest request, ActionListener listener) { + new AsyncDeleteByQueryAction(request, listener).start(); + } + + class AsyncDeleteByQueryAction { + + private final DeleteByQueryRequest request; + private final ActionListener listener; + + private final long startTime; + private final AtomicBoolean timedOut; + private final AtomicLong total; + + private final AtomicInteger bulks; + private final Semaphore semaphore; + + private volatile ShardOperationFailedException[] shardFailures; + private final CopyOnWriteArrayList bulkResults; + + AsyncDeleteByQueryAction(DeleteByQueryRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; + this.startTime = System.currentTimeMillis(); + this.timedOut = new AtomicBoolean(false); + this.total = new AtomicLong(0L); + this.semaphore = new Semaphore(0); + this.bulks = new AtomicInteger(0); + this.shardFailures = ShardSearchFailure.EMPTY_ARRAY; + this.bulkResults = new CopyOnWriteArrayList<>(); + } + + public void start() { + executeScan(); + } + + void executeScan() { + try { + final SearchRequest scanRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions(request.indicesOptions()); + scanRequest.searchType(SearchType.SCAN).scroll(request.scroll()); + if (request.routing() != null) { + scanRequest.routing(request.routing()); + } + + SearchSourceBuilder source = new SearchSourceBuilder().query(request.source()).fields("_routing", "_parent").fetchSource(false).version(true); + if (request.size() != null) { + source.size(request.size()); + } + if (request.timeout() != null) { + source.timeout(request.timeout()); + } + scanRequest.source(source); + + logger.debug("executing scan request"); + searchAction.execute(scanRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + long hits = searchResponse.getHits().getTotalHits(); + logger.debug("scan request executed: found [{}] document(s) to delete", hits); + addShardFailures(searchResponse.getShardFailures()); + + if (hits == 0) { + listener.onResponse(buildResponse()); + return; + } + total.set(hits); + + logger.debug("start scrolling [{}] document(s)", hits); + executeScroll(searchResponse.getScrollId()); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } catch (Throwable t) { + logger.error("unable to execute the initial scan request of delete by query", t); + listener.onFailure(t); + } + } + + void executeScroll(final String scrollId) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("executing scroll request [{}]", scrollId); + } + + scrollAction.execute(new SearchScrollRequest(scrollId).scroll(request.scroll()), new ActionListener() { + @Override + public void onResponse(SearchResponse scrollResponse) { + onScrollResponse(scrollId, scrollResponse); + } + + @Override + public void onFailure(Throwable e) { + onScrollFailure(scrollId, e); + + } + }); + } + + @Override + public void onFailure(Throwable t) { + onScrollFailure(scrollId, t); + } + + @Override + public void onRejection(Throwable t) { + onScrollFailure(scrollId, t); + } + }); + } + + void onScrollResponse(String scrollId, SearchResponse scrollResponse) { + final SearchHit[] docs = scrollResponse.getHits().getHits(); + final String nextScrollId = scrollResponse.getScrollId(); + addShardFailures(scrollResponse.getShardFailures()); + + if (logger.isDebugEnabled()) { + logger.debug("scroll request [{}] executed: [{}] document(s) returned", scrollId, docs.length); + } + + if ((docs.length == 0) || (nextScrollId == null)) { + if (logger.isDebugEnabled()) { + logger.debug("scrolling documents terminated"); + } + finishScroll(scrollId, false, null); + return; + } + + if (isTimedOut()) { + if (logger.isDebugEnabled()) { + logger.debug("scrolling documents timed out"); + } + finishScroll(scrollId, true, null); + return; + } + + // Delete the scrolled documents using the Bulk API + threadPool.generic().execute(new AbstractRunnable() { + + final int bulkId = bulks.getAndIncrement(); + + @Override + protected void doRun() throws Exception { + executeBulk(bulkId, docs); + } + + @Override + public void onFailure(Throwable t) { + onBulkFailure(bulkId, docs, t); + } + + @Override + public void onRejection(Throwable t) { + bulks.decrementAndGet(); + onBulkFailure(bulkId, docs, t); + } + }); + + if (logger.isDebugEnabled()) { + logger.debug("scrolling next batch of document(s) with scroll id [{}]", nextScrollId); + } + executeScroll(nextScrollId); + } + + void onScrollFailure(final String scrollId, Throwable t) { + logger.error("scroll request [{}] failed, now scrolling document(s) is stopped", t, scrollId); + finishScroll(scrollId, isTimedOut(), t); + } + + void executeBulk(final int bulkId, final SearchHit[] docs) { + BulkRequest bulkRequest = new BulkRequest(); + for (SearchHit doc : docs) { + DeleteRequest delete = new DeleteRequest(doc.index(), doc.type(), doc.id()).version(doc.version()); + SearchHitField routing = doc.field("_routing"); + if (routing != null) { + delete.routing((String) routing.value()); + } + SearchHitField parent = doc.field("_parent"); + if (parent != null) { + delete.parent((String) parent.value()); + } + bulkRequest.add(delete); + } + + if (logger.isDebugEnabled()) { + logger.debug("executing bulk request [{}] with [{}] deletions", bulkId, bulkRequest.numberOfActions()); + } + + boolean execute = false; + try { + client.bulk(bulkRequest, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkResponse) { + try { + onBulkResponse(bulkId, bulkResponse); + } finally { + semaphore.release(); + } + } + + @Override + public void onFailure(Throwable e) { + try { + onBulkFailure(bulkId, docs, e); + } finally { + semaphore.release(); + } + } + }); + execute = true; + } finally { + if (!execute) { + semaphore.release(); + } + } + } + + void onBulkResponse(final int bulkId, BulkResponse bulkResponse) { + if (logger.isDebugEnabled()) { + logger.debug("bulk request [{}] executed with {}", bulkId, bulkResponse.hasFailures() ? "failures" : "success"); + } + + // Pre compute the found/deleted/failed/missing counters for + // every index involved in the bulk request + Map results = new HashMap<>(); + for (BulkItemResponse item : bulkResponse.getItems()) { + IndexDeleteByQueryResponse indexCounter = results.get(item.getIndex()); + if (indexCounter == null) { + indexCounter = new IndexDeleteByQueryResponse(item.getIndex()); + } + indexCounter.incrementFound(); + if (item.isFailed()) { + indexCounter.incrementFailed(); + } else { + indexCounter.incrementDeleted(); + DeleteResponse delete = item.getResponse(); + if (!delete.isFound()) { + indexCounter.incrementMissing(); + } + } + results.put(item.getIndex(), indexCounter); + } + bulkResults.addAll(results.values()); + } + + void onBulkFailure(final int bulkId, SearchHit[] docs, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("execution of scroll request [{}] failed", t, bulkId); + } + + // Pre compute the deleted/ counter for + // every index involved in the bulk request + Map results = new HashMap<>(); + for (SearchHit doc : docs) { + IndexDeleteByQueryResponse indexCounter = results.get(doc.index()); + if (indexCounter == null) { + indexCounter = new IndexDeleteByQueryResponse(doc.index()); + } + indexCounter.incrementFound(); + indexCounter.incrementFailed(); + results.put(doc.getIndex(), indexCounter); + } + bulkResults.addAll(results.values()); + } + + void finishScroll(final String scrollId, boolean scrollTimedOut, Throwable t) { + if (scrollTimedOut) { + logger.debug("delete-by-query response marked as timed out"); + timedOut.set(true); + } + + if (Strings.hasText(scrollId)) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.warn("unable to clear scroll id [{}]: {}", scrollId, t.getMessage()); + } + + @Override + protected void doRun() throws Exception { + client.prepareClearScroll().addScrollId(scrollId).execute(new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + logger.debug("scroll id [{}] cleared", scrollId); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("unable to clear scroll id [{}]: {}", scrollId, e.getMessage()); + } + }); + } + }); + } + + if (t != null) { + try { + addShardFailure(new ShardSearchFailure(t)); + + logger.debug("scrolling document(s) end with failures, waiting for [{}] bulks requests to terminate", bulks.get()); + semaphore.tryAcquire(bulks.get(), 5, TimeUnit.SECONDS); + logger.debug("[{}] bulks requests terminated", bulks.get()); + + listener.onResponse(buildResponse()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFailure(t); + } + } else { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + finishHim(); + } + }); + } + } + + void finishHim() { + try { + logger.debug("scrolling document(s) end with success, waiting for [{}] bulks requests to terminate", bulks.get()); + semaphore.acquire(bulks.get()); + + logger.debug("[{}] bulks requests terminated", bulks.get()); + listener.onResponse(buildResponse()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFailure(e); + } + } + + private boolean isTimedOut() { + return request.timeout() != null && (System.currentTimeMillis() >= (startTime + request.timeout().millis())); + } + + private void addShardFailure(ShardOperationFailedException failure) { + addShardFailures(new ShardOperationFailedException[]{failure}); + } + + private void addShardFailures(ShardOperationFailedException[] failures) { + if (!CollectionUtils.isEmpty(failures)) { + ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length]; + System.arraycopy(shardFailures, 0, duplicates, 0, shardFailures.length); + System.arraycopy(failures, 0, duplicates, shardFailures.length, failures.length); + shardFailures = ExceptionsHelper.groupBy(duplicates); + } + } + + private DeleteByQueryResponse buildResponse() { + long took = System.currentTimeMillis() - startTime; + long deleted = 0; + long missing = 0; + long failed = 0; + + // Sum the number of documents found/deleted/failed/missing + // for every bulk result + Map counters = new HashMap<>(); + for(IndexDeleteByQueryResponse bulkResult : bulkResults) { + IndexDeleteByQueryResponse counter = counters.get(bulkResult.getIndex()); + if (counter == null) { + counter = new IndexDeleteByQueryResponse(bulkResult.getIndex()); + } + counter.incrementFound(bulkResult.getFound()); + counter.incrementDeleted(bulkResult.getDeleted()); + counter.incrementMissing(bulkResult.getMissing()); + counter.incrementFailed(bulkResult.getFailed()); + counters.put(bulkResult.getIndex(), counter); + + deleted = deleted + bulkResult.getDeleted(); + missing = missing + bulkResult.getMissing(); + failed = failed + bulkResult.getFailed(); + } + IndexDeleteByQueryResponse[] indices = counters.values().toArray(new IndexDeleteByQueryResponse[counters.size()]); + return new DeleteByQueryResponse(took, timedOut.get(), total.get(), deleted, missing, failed, indices, shardFailures); + } + } +} diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/rest/RestDeleteByQueryAction.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/rest/RestDeleteByQueryAction.java new file mode 100644 index 0000000000000..ad5f8dd4de008 --- /dev/null +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/deletebyquery/rest/RestDeleteByQueryAction.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deletebyquery.rest; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.QuerySourceBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.deletebyquery.action.DeleteByQueryRequest; +import org.elasticsearch.deletebyquery.action.DeleteByQueryResponse; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +import static org.elasticsearch.deletebyquery.action.DeleteByQueryAction.INSTANCE; +import static org.elasticsearch.rest.RestRequest.Method.DELETE; + +public class RestDeleteByQueryAction extends BaseRestHandler { + + @Inject + public RestDeleteByQueryAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(DELETE, "/{index}/_query", this); + controller.registerHandler(DELETE, "/{index}/{type}/_query", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { + DeleteByQueryRequest delete = new DeleteByQueryRequest(Strings.splitStringByCommaToArray(request.param("index"))); + delete.indicesOptions(IndicesOptions.fromRequest(request, delete.indicesOptions())); + delete.routing(request.param("routing")); + int size = request.paramAsInt("size", -1); + if (size != -1) { + delete.size(size); + } + if (request.hasParam("timeout")) { + delete.timeout(request.paramAsTime("timeout", null)); + } + if (request.hasContent()) { + delete.source(request.content()); + } else { + String source = request.param("source"); + if (source != null) { + delete.source(source); + } else { + QuerySourceBuilder querySourceBuilder = RestActions.parseQuerySource(request); + if (querySourceBuilder != null) { + delete.source(querySourceBuilder); + } + } + } + delete.types(Strings.splitStringByCommaToArray(request.param("type"))); + client.execute(INSTANCE, delete, new RestToXContentListener(channel)); + } +} diff --git a/plugins/delete-by-query/src/main/resources/es-plugin.properties b/plugins/delete-by-query/src/main/resources/es-plugin.properties new file mode 100644 index 0000000000000..374b79c59a7ae --- /dev/null +++ b/plugins/delete-by-query/src/main/resources/es-plugin.properties @@ -0,0 +1,3 @@ +plugin=org.elasticsearch.deletebyquery.DeleteByQueryPlugin +version=${project.version} +lucene=${lucene.version} diff --git a/plugins/delete-by-query/src/test/java/org/elasticsearch/plugin/deletebyquery/DeleteByQueryTests.java b/plugins/delete-by-query/src/test/java/org/elasticsearch/plugin/deletebyquery/DeleteByQueryTests.java new file mode 100644 index 0000000000000..4e13371a42d86 --- /dev/null +++ b/plugins/delete-by-query/src/test/java/org/elasticsearch/plugin/deletebyquery/DeleteByQueryTests.java @@ -0,0 +1,394 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.deletebyquery; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.deletebyquery.action.DeleteByQueryAction; +import org.elasticsearch.deletebyquery.action.DeleteByQueryRequestBuilder; +import org.elasticsearch.deletebyquery.action.DeleteByQueryResponse; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + + +@ClusterScope(scope = SUITE, transportClientRatio = 0) +public class DeleteByQueryTests extends ElasticsearchIntegrationTest { + + @Test(expected = ActionRequestValidationException.class) + public void testDeleteByQueryWithNoSource() { + newDeleteByQuery().get(); + fail("should have thrown a validation exception because of the missing source"); + } + + @Test + public void testDeleteByQueryWithNoIndices() { + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery()); + delete.setIndicesOptions(IndicesOptions.fromOptions(false, true, true, false)); + assertDBQResponse(delete.get(), 0L, 0l, 0l, 0l); + assertSearchContextsClosed(); + } + + @Test + public void testDeleteByQueryWithOneIndex() throws Exception { + final long docs = randomIntBetween(1, 50); + for (int i = 0; i < docs; i++) { + index("test", "test", String.valueOf(i), "fields1", 1); + } + refresh(); + assertHitCount(client().prepareCount("test").get(), docs); + + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("t*").setQuery(QueryBuilders.matchAllQuery()); + assertDBQResponse(delete.get(), docs, docs, 0l, 0l); + refresh(); + assertHitCount(client().prepareCount("test").get(), 0); + assertSearchContextsClosed(); + } + + @Test + public void testDeleteByQueryWithMissingIndex() throws Exception { + client().prepareIndex("test", "test") + .setSource(jsonBuilder().startObject().field("field1", 1).endObject()) + .setRefresh(true) + .get(); + assertHitCount(client().prepareCount().get(), 1); + + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test", "missing").setQuery(QueryBuilders.matchAllQuery()); + try { + delete.get(); + fail("should have thrown an exception because of a missing index"); + } catch (IndexMissingException e) { + // Ok + } + + delete.setIndicesOptions(IndicesOptions.lenientExpandOpen()); + assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l); + refresh(); + assertHitCount(client().prepareCount("test").get(), 0); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testDeleteByQueryWithTypes() throws Exception { + final long docs = randomIntBetween(1, 50); + for (int i = 0; i < docs; i++) { + index(randomFrom("test1", "test2", "test3"), "type1", String.valueOf(i), "foo", "bar"); + index(randomFrom("test1", "test2", "test3"), "type2", String.valueOf(i), "foo", "bar"); + } + refresh(); + assertHitCount(client().prepareCount().get(), docs * 2); + assertHitCount(client().prepareCount().setTypes("type1").get(), docs); + assertHitCount(client().prepareCount().setTypes("type2").get(), docs); + + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setTypes("type1").setQuery(QueryBuilders.matchAllQuery()); + assertDBQResponse(delete.get(), docs, docs, 0l, 0l); + refresh(); + + assertHitCount(client().prepareCount().get(), docs); + assertHitCount(client().prepareCount().setTypes("type1").get(), 0); + assertHitCount(client().prepareCount().setTypes("type2").get(), docs); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testDeleteByQueryWithRouting() throws Exception { + assertAcked(prepareCreate("test").setSettings("number_of_shards", 2)); + ensureGreen("test"); + + final int docs = randomIntBetween(2, 10); + logger.info("--> indexing [{}] documents with routing", docs); + for (int i = 0; i < docs; i++) { + client().prepareIndex("test", "test", String.valueOf(i)).setRouting(String.valueOf(i)).setSource("field1", 1).get(); + } + refresh(); + + logger.info("--> counting documents with no routing, should be equal to [{}]", docs); + assertHitCount(client().prepareCount().get(), docs); + + String routing = String.valueOf(randomIntBetween(2, docs)); + + logger.info("--> counting documents with routing [{}]", routing); + long expected = client().prepareCount().setRouting(routing).get().getCount(); + + logger.info("--> delete all documents with routing [{}] with a delete-by-query", routing); + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setRouting(routing).setQuery(QueryBuilders.matchAllQuery()); + assertDBQResponse(delete.get(), expected, expected, 0l, 0l); + refresh(); + + assertHitCount(client().prepareCount().get(), docs - expected); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testDeleteByFieldQuery() throws Exception { + assertAcked(prepareCreate("test").addAlias(new Alias("alias"))); + + int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "test", Integer.toString(i)) + .setRouting(randomAsciiOfLengthBetween(1, 5)) + .setSource("foo", "bar").get(); + } + refresh(); + + int n = between(0, numDocs - 1); + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n))).get(), 1); + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), numDocs); + + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("alias").setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n))); + assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l); + refresh(); + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), numDocs - 1); + assertSearchContextsClosed(); + } + + @Test + public void testDeleteByQueryWithDateMath() throws Exception { + index("test", "type", "1", "d", "2013-01-01"); + ensureGreen(); + refresh(); + assertHitCount(client().prepareCount("test").get(), 1); + + DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")); + assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l); + refresh(); + assertHitCount(client().prepareCount("test").get(), 0); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testDeleteByTermQuery() throws Exception { + createIndex("test"); + ensureGreen(); + + int numDocs = scaledRandomIntBetween(10, 50); + IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs + 1]; + for (int i = 0; i < numDocs; i++) { + indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i)).setSource("field", "value"); + } + indexRequestBuilders[numDocs] = client().prepareIndex("test", "test", Integer.toString(numDocs)).setSource("field", "other_value"); + indexRandom(true, indexRequestBuilders); + + SearchResponse searchResponse = client().prepareSearch("test").get(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs + 1)); + + DeleteByQueryResponse delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.termQuery("field", "value")).get(); + assertDBQResponse(delete, numDocs, numDocs, 0l, 0l); + + refresh(); + searchResponse = client().prepareSearch("test").get(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testConcurrentDeleteByQueriesOnDifferentDocs() throws InterruptedException { + createIndex("test"); + ensureGreen(); + + final Thread[] threads = new Thread[scaledRandomIntBetween(2, 5)]; + final long docs = randomIntBetween(1, 50); + for (int i = 0; i < docs; i++) { + for (int j = 0; j < threads.length; j++) { + index("test", "test", String.valueOf(i * 10 + j), "field", j); + } + } + refresh(); + assertHitCount(client().prepareCount("test").get(), docs * threads.length); + + final CountDownLatch start = new CountDownLatch(1); + final AtomicReference exceptionHolder = new AtomicReference<>(); + + for (int i = 0; i < threads.length; i++) { + final int threadNum = i; + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.termQuery("field", threadNum)).get(), docs); + + Runnable r = new Runnable() { + @Override + public void run() { + try { + start.await(); + + DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.termQuery("field", threadNum)).get(); + assertDBQResponse(rsp, docs, docs, 0L, 0L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable e) { + exceptionHolder.set(e); + Thread.currentThread().interrupt(); + } + } + }; + threads[i] = new Thread(r); + threads[i].start(); + } + + start.countDown(); + for (Thread thread : threads) { + thread.join(); + } + + Throwable assertionError = exceptionHolder.get(); + if (assertionError != null) { + assertionError.printStackTrace(); + } + assertThat(assertionError + " should be null", assertionError, nullValue()); + refresh(); + + for (int i = 0; i < threads.length; i++) { + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.termQuery("field", i)).get(), 0); + } + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testConcurrentDeleteByQueriesOnSameDocs() throws InterruptedException { + assertAcked(prepareCreate("test").setSettings(Settings.settingsBuilder().put("index.refresh_interval", -1))); + ensureGreen(); + + final long docs = randomIntBetween(50, 100); + for (int i = 0; i < docs; i++) { + index("test", "test", String.valueOf(i), "foo", "bar"); + } + refresh(); + assertHitCount(client().prepareCount("test").get(), docs); + + final Thread[] threads = new Thread[scaledRandomIntBetween(2, 9)]; + + final CountDownLatch start = new CountDownLatch(1); + final AtomicReference exceptionHolder = new AtomicReference<>(); + + final MatchQueryBuilder query = QueryBuilders.matchQuery("foo", "bar"); + final AtomicLong deleted = new AtomicLong(0); + + for (int i = 0; i < threads.length; i++) { + assertHitCount(client().prepareCount("test").setQuery(query).get(), docs); + + Runnable r = new Runnable() { + @Override + public void run() { + try { + start.await(); + DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(query).get(); + deleted.addAndGet(rsp.getTotalDeleted()); + + assertThat(rsp.getTotalFound(), equalTo(docs)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable e) { + exceptionHolder.set(e); + Thread.currentThread().interrupt(); + } + } + }; + threads[i] = new Thread(r); + threads[i].start(); + } + + start.countDown(); + for (Thread thread : threads) { + thread.join(); + } + refresh(); + + Throwable assertionError = exceptionHolder.get(); + if (assertionError != null) { + assertionError.printStackTrace(); + } + assertThat(assertionError + " should be null", assertionError, nullValue()); + assertHitCount(client().prepareCount("test").get(), 0L); + assertThat(deleted.get(), equalTo(docs)); + assertSearchContextsClosed(); + } + + @Test + @Slow + public void testDeleteByQueryOnReadOnlyIndex() throws InterruptedException { + createIndex("test"); + ensureGreen(); + + final long docs = randomIntBetween(1, 50); + for (int i = 0; i < docs; i++) { + index("test", "test", String.valueOf(i), "field", 1); + } + refresh(); + assertHitCount(client().prepareCount("test").get(), docs); + + try { + enableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY); + DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery()).get(); + assertDBQResponse(rsp, docs, 0L, docs, 0L); + } finally { + disableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY); + } + + assertHitCount(client().prepareCount("test").get(), docs); + assertSearchContextsClosed(); + } + + private DeleteByQueryRequestBuilder newDeleteByQuery() { + return new DeleteByQueryRequestBuilder(client(), DeleteByQueryAction.INSTANCE); + } + + private void assertDBQResponse(DeleteByQueryResponse response, long found, long deleted, long failed, long missing) { + assertNotNull(response); + assertThat(response.isTimedOut(), equalTo(false)); + assertThat(response.getShardFailures().length, equalTo(0)); + assertThat(response.getTotalFound(), equalTo(found)); + assertThat(response.getTotalDeleted(), equalTo(deleted)); + assertThat(response.getTotalFailed(), equalTo(failed)); + assertThat(response.getTotalMissing(), equalTo(missing)); + } + + private void assertSearchContextsClosed() { + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for (NodeStats nodeStat : nodesStats.getNodes()){ + assertThat(nodeStat.getIndices().getSearch().getOpenContexts(), equalTo(0L)); + } + } +} \ No newline at end of file