Skip to content

Commit

Permalink
Teach reindex to stop when cancelled
Browse files Browse the repository at this point in the history
All we do is check the cancelled flag and stop the request at a few key
points.

Adds the cancellation cause to the status so any request that is cancelled
but doesn't die can be seen in the task list.
  • Loading branch information
nik9000 committed Feb 11, 2016
1 parent a9fa0cb commit 30107b4
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.tasks;

import org.elasticsearch.common.Nullable;

import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -56,4 +58,11 @@ public boolean isCancelled() {
return reason.get() != null;
}

/**
* The reason the task was cancelled or null if it hasn't been cancelled.
*/
@Nullable
public String getReasonCancelled() {
return reason.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ public BulkByScrollTask getTask() {
return task;
}

private void initialSearch() {
void initialSearch() {
if (task.isCancelled()) {
finishHim(null);
return;
}
try {
// Default to sorting by _doc if it hasn't been changed.
if (firstSearchRequest.source().sorts() == null) {
Expand Down Expand Up @@ -144,8 +148,19 @@ public void onFailure(Throwable e) {
}
}

/**
* Set the last returned scrollId. Package private for testing.
*/
void setScroll(String scroll) {
this.scroll.set(scroll);
}

void onScrollResponse(SearchResponse searchResponse) {
scroll.set(searchResponse.getScrollId());
if (task.isCancelled()) {
finishHim(null);
return;
}
setScroll(searchResponse.getScrollId());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())));
return;
Expand Down Expand Up @@ -178,7 +193,7 @@ protected void doRun() throws Exception {
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScrollRequest();
startNextScroll();
return;
}
request.timeout(mainRequest.getTimeout());
Expand All @@ -198,6 +213,10 @@ public void onFailure(Throwable t) {
}

void sendBulkRequest(BulkRequest request) {
if (task.isCancelled()) {
finishHim(null);
return;
}
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
Expand All @@ -212,6 +231,10 @@ public void onFailure(Throwable e) {
}

void onBulkResponse(BulkResponse response) {
if (task.isCancelled()) {
finishHim(null);
return;
}
try {
List<Failure> failures = new ArrayList<Failure>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -252,13 +275,17 @@ void onBulkResponse(BulkResponse response) {
startNormalTermination(emptyList(), emptyList());
return;
}
startNextScrollRequest();
startNextScroll();
} catch (Throwable t) {
finishHim(t);
}
}

void startNextScrollRequest() {
void startNextScroll() {
if (task.isCancelled()) {
finishHim(null);
return;
}
SearchScrollRequest request = new SearchScrollRequest();
request.scrollId(scroll.get()).scroll(firstSearchRequest.scroll());
client.searchScroll(request, new ActionListener<SearchResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.plugin.reindex;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;

import java.io.IOException;
Expand All @@ -31,7 +33,7 @@
/**
* Task storing information about a currently running BulkByScroll request.
*/
public class BulkByScrollTask extends Task {
public class BulkByScrollTask extends CancellableTask {
/**
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
Expand All @@ -52,7 +54,7 @@ public BulkByScrollTask(long id, String type, String action, String description)
@Override
public Status getStatus() {
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
retries.get());
retries.get(), getReasonCancelled());
}

/**
Expand All @@ -63,7 +65,7 @@ public long getSuccessfullyProcessed() {
}

public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0);
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, null);

private final long total;
private final long updated;
Expand All @@ -73,8 +75,10 @@ public static class Status implements Task.Status {
private final long versionConflicts;
private final long noops;
private final long retries;
private final String reasonCancelled;

public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries) {
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
@Nullable String reasonCancelled) {
this.total = checkPositive(total, "total");
this.updated = checkPositive(updated, "updated");
this.created = checkPositive(created, "created");
Expand All @@ -83,6 +87,7 @@ public Status(long total, long updated, long created, long deleted, int batches,
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
this.noops = checkPositive(noops, "noops");
this.retries = checkPositive(retries, "retries");
this.reasonCancelled = reasonCancelled;
}

public Status(StreamInput in) throws IOException {
Expand All @@ -94,6 +99,7 @@ public Status(StreamInput in) throws IOException {
versionConflicts = in.readVLong();
noops = in.readVLong();
retries = in.readVLong();
reasonCancelled = in.readOptionalString();
}

@Override
Expand All @@ -106,6 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(versionConflicts);
out.writeVLong(noops);
out.writeVLong(retries);
out.writeOptionalString(reasonCancelled);
}

@Override
Expand All @@ -129,6 +136,9 @@ public XContentBuilder innerXContent(XContentBuilder builder, Params params, boo
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
builder.field("retries", retries);
if (reasonCancelled != null) {
builder.field("canceled", reasonCancelled);
}
return builder;
}

Expand All @@ -152,6 +162,9 @@ public void innerToString(StringBuilder builder, boolean includeCreated, boolean
builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops);
builder.append(",retries=").append(retries);
if (reasonCancelled != null) {
builder.append(",canceled=").append(reasonCancelled);
}
}

@Override
Expand Down Expand Up @@ -221,6 +234,13 @@ public long getRetries() {
return retries;
}

/**
* The reason that the request was canceled or null if it hasn't been.
*/
public String getReasonCancelled() {
return reasonCancelled;
}

private int checkPositive(int value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long getNoops() {
return status.getNoops();
}

/**
* The reason that the request was canceled or null if it hasn't been.
*/
public String getReasonCancelled() {
return status.getReasonCancelled();
}

/**
* All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the
* default).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hamcrest.TypeSafeMatcher;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public abstract class AbstractBulkIndexByScrollResponseMatcher<
Response extends BulkIndexByScrollResponse,
Expand All @@ -36,6 +37,7 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
private Matcher<Integer> batchesMatcher;
private Matcher<Long> versionConflictsMatcher = equalTo(0L);
private Matcher<Integer> failuresMatcher = equalTo(0);
private Matcher<String> reasonCancelledMatcher = nullValue(String.class);

protected abstract Self self();

Expand Down Expand Up @@ -93,13 +95,18 @@ public Self failures(int failures) {
return failures(equalTo(failures));
}

public Self reasonCancelled(Matcher<String> reasonCancelledMatcher) {
this.reasonCancelledMatcher = reasonCancelledMatcher;
return self();
}

@Override
protected boolean matchesSafely(Response item) {
return updatedMatcher.matches(item.getUpdated()) &&
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
failuresMatcher.matches(item.getIndexingFailures().size());
failuresMatcher.matches(item.getIndexingFailures().size()) &&
reasonCancelledMatcher.matches(item.getReasonCancelled());
}

@Override
Expand All @@ -110,5 +117,6 @@ public void describeTo(Description description) {
}
description.appendText(" and versionConflicts matches ").appendDescriptionOf(versionConflictsMatcher);
description.appendText(" and failures size matches ").appendDescriptionOf(failuresMatcher);
description.appendText(" and reason cancelled matches ").appendDescriptionOf(reasonCancelledMatcher);
}
}
Loading

0 comments on commit 30107b4

Please sign in to comment.