Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identify cancelled tasks in list tasks API #72931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class TaskInfo {
private long startTime;
private long runningTimeNanos;
private boolean cancellable;
private boolean cancelled;
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
Expand Down Expand Up @@ -93,6 +94,14 @@ void setCancellable(boolean cancellable) {
this.cancellable = cancellable;
}

public boolean isCancelled() {
return cancelled;
}

void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}

public TaskId getParentTaskId() {
return parentTaskId;
}
Expand Down Expand Up @@ -134,6 +143,7 @@ private void noOpParse(Object s) {}
parser.declareLong(TaskInfo::setStartTime, new ParseField("start_time_in_millis"));
parser.declareLong(TaskInfo::setRunningTimeNanos, new ParseField("running_time_in_nanos"));
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
Expand All @@ -147,6 +157,7 @@ public boolean equals(Object o) {
return getStartTime() == taskInfo.getStartTime() &&
getRunningTimeNanos() == taskInfo.getRunningTimeNanos() &&
isCancellable() == taskInfo.isCancellable() &&
isCancelled() == taskInfo.isCancelled() &&
Objects.equals(getTaskId(), taskInfo.getTaskId()) &&
Objects.equals(getType(), taskInfo.getType()) &&
Objects.equals(getAction(), taskInfo.getAction()) &&
Expand All @@ -159,8 +170,17 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
getTaskId(), getType(), getAction(), getDescription(), getStartTime(),
getRunningTimeNanos(), isCancellable(), getParentTaskId(), status, getHeaders()
getTaskId(),
getType(),
getAction(),
getDescription(),
getStartTime(),
getRunningTimeNanos(),
isCancellable(),
isCancelled(),
getParentTaskId(),
status,
getHeaders()
);
}

Expand All @@ -175,6 +195,7 @@ public String toString() {
", startTime=" + startTime +
", runningTimeNanos=" + runningTimeNanos +
", cancellable=" + cancellable +
", cancelled=" + cancelled +
", parentTaskId=" + parentTaskId +
", status=" + status +
", headers=" + headers +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,23 @@ static TaskInfo randomTaskInfo() {
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers);
}

private static TaskId randomTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,22 @@ private static TaskInfo randomTaskInfo() {
long startTime = randomLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
null,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
}

for (int i = 0; i < 4; i++) {
boolean isCancellable = randomBoolean();
tasks.add(new org.elasticsearch.tasks.TaskInfo(
new TaskId(NODE_ID, (long) i),
randomAlphaOfLength(4),
Expand All @@ -65,7 +66,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
new FakeTaskStatus(randomAlphaOfLength(4), randomInt()),
randomLongBetween(1, 3),
randomIntBetween(5, 10),
false,
isCancellable,
isCancellable && randomBoolean(),
new TaskId("node1", randomLong()),
Map.of("x-header-of", "some-value")));
}
Expand Down Expand Up @@ -99,6 +101,7 @@ protected void assertInstances(ByNodeCancelTasksResponse serverTestInstance,
assertEquals(ti.getStartTime(), taskInfo.getStartTime());
assertEquals(ti.getRunningTimeNanos(), taskInfo.getRunningTimeNanos());
assertEquals(ti.isCancellable(), taskInfo.isCancellable());
assertEquals(ti.isCancelled(), taskInfo.isCancelled());
assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId());
assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId());
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
Expand Down
10 changes: 9 additions & 1 deletion docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ The API returns the following result:
"description" : "indices[test], types[test], search_type[QUERY_THEN_FETCH], source[{\"query\":...}]",
"start_time_in_millis" : 1483478610008,
"running_time_in_nanos" : 13991383,
"cancellable" : true
"cancellable" : true,
"cancelled" : false
}
}
}
Expand Down Expand Up @@ -243,6 +244,13 @@ nodes `nodeId1` and `nodeId2`.
POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex
--------------------------------------------------

A task may continue to run for some time after it has been cancelled because it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

may not be able to safely stop its current activity straight away. The list
tasks API will continue to list these cancelled tasks until they complete. The
`cancelled` flag in the response to the list tasks API indicates that the
cancellation command has been processed and the task will stop as soon as
possible.

===== Task Grouping

The task lists returned by task API commands can be grouped either by nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,18 @@ public void testRethrottleSuccessfulResponse() {
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
for (int i = 0; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
tasks.add(new TaskInfo(
new TaskId("test", 123),
"test",
"test",
"test",
status,
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices,
Expand All @@ -112,8 +122,18 @@ public void testRethrottleWithSomeSucceeded() {
List<TaskInfo> tasks = new ArrayList<>();
for (int i = succeeded; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
tasks.add(new TaskInfo(
new TaskId("test", 123),
"test",
"test",
"test",
status,
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices - succeeded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
CyclicBarrier b = new CyclicBarrier(2);
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new RuntimeException("test")),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()),
new RuntimeException("test")),
new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) {
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime,
System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers);
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
getAction(),
description,
status,
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask)this).isCancelled(),
parentTask,
headers);
}

/**
Expand Down
Loading