Skip to content

Commit

Permalink
Revert "Move DeleteByQuery and Reindex requests into core (#24578)"
Browse files Browse the repository at this point in the history
This reverts commit 6ea2ae3.
  • Loading branch information
s1monw committed May 11, 2017
1 parent 57fddce commit 952feb5
Show file tree
Hide file tree
Showing 77 changed files with 182 additions and 90 deletions.
1 change: 1 addition & 0 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]Retry.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportShardBulkAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]AbstractAsyncBulkByScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]delete[/\\]DeleteRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]explain[/\\]TransportExplainAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]get[/\\]GetRequest.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand All @@ -31,7 +31,7 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
Expand Down Expand Up @@ -77,7 +77,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
Expand Down Expand Up @@ -116,8 +116,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;

public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
this(task, logger, client, threadPool, mainRequest, scriptService, clusterState, listener, client.settings());
}

Expand Down Expand Up @@ -741,7 +741,7 @@ public DeleteRequest self() {
/**
* Wraps a {@link DeleteRequest} in a {@link RequestWrapper}
*/
public static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
return new DeleteRequestWrapper(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -355,7 +355,7 @@ public int getSlices() {
/**
* Build a new request for a slice of the parent request.
*/
public abstract Self forSlice(TaskId slicingTask, SearchRequest slice);
protected abstract Self forSlice(TaskId slicingTask, SearchRequest slice);

/**
* Setup a clone of this request with the information needed to process a slice of it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand All @@ -32,8 +32,8 @@
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
public AsyncDeleteByQueryAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
Expand All @@ -32,13 +32,13 @@
/**
* Helps parallelize reindex requests using sliced scrolls.
*/
class BulkByScrollParallelizationHelper {
public class BulkByScrollParallelizationHelper {
private BulkByScrollParallelizationHelper() {}

public static <Request extends AbstractBulkByScrollRequest<Request>> void startSlices(Client client, TaskManager taskManager,
Action<Request, BulkByScrollResponse, ?> action,
String localNodeId, ParentBulkByScrollTask task, Request request,
ActionListener<BulkByScrollResponse> listener) {
public static <
Request extends AbstractBulkByScrollRequest<Request>
> void startSlices(Client client, TaskManager taskManager, Action<Request, BulkByScrollResponse, ?> action,
String localNodeId, ParentBulkByScrollTask task, Request request, ActionListener<BulkByScrollResponse> listener) {
TaskId parentTaskId = new TaskId(localNodeId, task.getId());
for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, request.getSlices())) {
// TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -44,14 +45,14 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContent {
private TimeValue took;
private BulkByScrollTask.Status status;
private List<Failure> bulkFailures;
private List<ScrollableHitSource.SearchFailure> searchFailures;
private List<SearchFailure> searchFailures;
private boolean timedOut;

public BulkByScrollResponse() {
}

public BulkByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> bulkFailures,
List<ScrollableHitSource.SearchFailure> searchFailures, boolean timedOut) {
List<SearchFailure> searchFailures, boolean timedOut) {
this.took = took;
this.status = requireNonNull(status, "Null status not supported");
this.bulkFailures = bulkFailures;
Expand Down Expand Up @@ -138,7 +139,7 @@ public List<Failure> getBulkFailures() {
/**
* All search failures.
*/
public List<ScrollableHitSource.SearchFailure> getSearchFailures() {
public List<SearchFailure> getSearchFailures() {
return searchFailures;
}

Expand All @@ -165,7 +166,7 @@ public void readFrom(StreamInput in) throws IOException {
took = new TimeValue(in);
status = new BulkByScrollTask.Status(in);
bulkFailures = in.readList(Failure::new);
searchFailures = in.readList(ScrollableHitSource.SearchFailure::new);
searchFailures = in.readList(SearchFailure::new);
timedOut = in.readBoolean();
}

Expand All @@ -180,7 +181,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
failure.toXContent(builder, params);
builder.endObject();
}
for (ScrollableHitSource.SearchFailure failure: searchFailures) {
for (SearchFailure failure: searchFailures) {
failure.toXContent(builder, params);
}
builder.endArray();
Expand All @@ -198,4 +199,4 @@ public String toString() {
builder.append(",search_failures=").append(getSearchFailures().subList(0, min(3, getSearchFailures().size())));
return builder.append(']').toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
Expand Down Expand Up @@ -81,7 +81,7 @@ public ActionRequestValidationException validate() {
}

@Override
public DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
protected DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void onSliceResponse(ActionListener<BulkByScrollResponse> listener, int s
/**
* Record a failure from a slice and respond to the listener if the request is finished.
*/
public void onSliceFailure(ActionListener<BulkByScrollResponse> listener, int sliceId, Exception e) {
void onSliceFailure(ActionListener<BulkByScrollResponse> listener, int sliceId, Exception e) {
results.setOnce(sliceId, new Result(sliceId, e));
recordSliceCompletionAndRespondIfAllDone(listener);
// TODO cancel when a slice fails?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -111,7 +111,7 @@ public final void close(Runnable onCompletion) {
/**
* Set the id of the last scroll. Used for debugging.
*/
public final void setScroll(String scrollId) {
final void setScroll(String scrollId) {
this.scrollId.set(scrollId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

/**
* Implemented by {@link BulkByScrollTask} and {@link BulkByScrollTask.Status} to consistently implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.ESLoggerFactory;
Expand Down Expand Up @@ -115,15 +115,15 @@ TimeValue throttledUntil() {
return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS)));
}

public void setTotal(long totalHits) {
void setTotal(long totalHits) {
total.set(totalHits);
}

public void countBatch() {
void countBatch() {
batch.incrementAndGet();
}

public void countNoop() {
void countNoop() {
noops.incrementAndGet();
}

Expand All @@ -132,7 +132,7 @@ public long getCreated() {
return created.get();
}

public void countCreated() {
void countCreated() {
created.incrementAndGet();
}

Expand All @@ -141,7 +141,7 @@ public long getUpdated() {
return updated.get();
}

public void countUpdated() {
void countUpdated() {
updated.incrementAndGet();
}

Expand All @@ -150,15 +150,15 @@ public long getDeleted() {
return deleted.get();
}

public void countDeleted() {
void countDeleted() {
deleted.incrementAndGet();
}

public void countVersionConflict() {
void countVersionConflict() {
versionConflicts.incrementAndGet();
}

public void countBulkRetry() {
void countBulkRetry() {
bulkRetries.incrementAndGet();
}

Expand All @@ -174,8 +174,8 @@ float getRequestsPerSecond() {
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
* rescheduled over and over again.
*/
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
// Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
Expand All @@ -184,7 +184,7 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
}
}

public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
return timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

/**
* Infrastructure for actions that modify documents based on the results of a scrolling query
* like reindex, update by query or delete by query.
* Infrastructure for actions that modify documents based on the results of a scrolling query.
*/
package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.index.reindex;
package org.elasticsearch.action.bulk.byscroll;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -36,8 +36,8 @@
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -696,7 +696,7 @@ private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScro
}

@Override
public DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) {
protected DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) {
throw new UnsupportedOperationException();
}

Expand Down
Loading

0 comments on commit 952feb5

Please sign in to comment.