Skip to content

Commit

Permalink
Transport actions should make started tasks available to their callers
Browse files Browse the repository at this point in the history
Sometimes action callers might be interested in having an access to the task that they have just initiated. This changes allows callers to get access to the Task object of the actions that they just started if the action supports the task management.
  • Loading branch information
imotov committed Jan 19, 2016
1 parent 53cce3c commit d9a0122
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final ActionFuture<Response> execute(Request request) {
return future;
}

public final void execute(Request request, ActionListener<Response> listener) {
public final Task execute(Request request, ActionListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
Expand All @@ -85,6 +85,7 @@ public void onFailure(Throwable e) {
}
});
}
return task;
}

private final void execute(Task task, Request request, ActionListener<Response> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
Expand Down Expand Up @@ -67,8 +69,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -418,7 +422,17 @@ private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch
return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request"));
}

private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws InterruptedException {
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws InterruptedException {
PlainActionFuture<NodesResponse> future = newFuture();
startBlockingTestNodesAction(checkLatch, request, future);
return future;
}

private Task startBlockingTestNodesAction(CountDownLatch checkLatch, ActionListener<NodesResponse> listener) throws InterruptedException {
return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request"), listener);
}

private Task startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request, ActionListener<NodesResponse> listener) throws InterruptedException {
CountDownLatch actionLatch = new CountDownLatch(nodesCount);
TestNodesAction[] actions = new TestNodesAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
Expand All @@ -442,17 +456,31 @@ protected NodeResponse nodeOperation(NodeRequest request) {
for (TestNode node : testNodes) {
assertEquals(0, node.transportService.getTaskManager().getTasks().size());
}
ActionFuture<NodesResponse> future = actions[0].execute(request);
Task task = actions[0].execute(request, listener);
logger.info("Awaiting for all actions to start");
actionLatch.await();
logger.info("Done waiting for all actions to start");
return future;
return task;
}

public void testRunningTasksCount() throws Exception {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
CountDownLatch responseLatch = new CountDownLatch(1);
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
Task mainTask = startBlockingTestNodesAction(checkLatch, new ActionListener<NodesResponse>() {
@Override
public void onResponse(NodesResponse listTasksResponse) {
responseReference.set(listTasksResponse);
responseLatch.countDown();
}

@Override
public void onFailure(Throwable e) {
logger.warn("Couldn't get list of tasks", e);
responseLatch.countDown();
}
});

// Check task counts using taskManager
Map<Long, Task> localTasks = testNodes[0].transportService.getTaskManager().getTasks();
Expand Down Expand Up @@ -505,9 +533,17 @@ public void testRunningTasksCount() throws Exception {
assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}

// Make sure that the main task on coordinating node is the task that was returned to us by execute()
listTasksRequest.actions("testAction"); // only pick the main task
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId());

// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertTrue(responseLatch.await(10, TimeUnit.SECONDS));

NodesResponse responses = responseReference.get();
assertEquals(0, responses.failureCount());

// Make sure that we don't have any lingering tasks
Expand Down

0 comments on commit d9a0122

Please sign in to comment.