Skip to content

Commit

Permalink
Combine node name and task id into single string task id
Browse files Browse the repository at this point in the history
This commit changes the URL for task operations from `/_tasks/{nodeId}/{taskId}` to `/_tasks/{taskId}`, where `{taskId}` has a form of nodeid:id
  • Loading branch information
imotov committed Feb 24, 2016
1 parent 7140d18 commit d6af669
Show file tree
Hide file tree
Showing 27 changed files with 354 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {

private String reason = DEFAULT_REASON;

/**
* Cancel tasks on the specified nodes. If none are passed, all cancellable tasks on
* all nodes will be cancelled.
*/
public CancelTasksRequest(String... nodesIds) {
super(nodesIds);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -54,7 +46,6 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(reason);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -36,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -84,17 +84,17 @@ protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
}

protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask> operation) {
if (request.taskId() != BaseTasksRequest.ALL_TASKS) {
if (request.taskId().isSet() == false) {
// we are only checking one task, we can optimize it
CancellableTask task = taskManager.getCancellableTask(request.taskId());
CancellableTask task = taskManager.getCancellableTask(request.taskId().getId());
if (task != null) {
if (request.match(task)) {
operation.accept(task);
} else {
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support this operation");
}
} else {
if (taskManager.getTask(request.taskId()) != null) {
if (taskManager.getTask(request.taskId().getId()) != null) {
// The task exists, but doesn't support cancellation
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support cancellation");
} else {
Expand Down Expand Up @@ -135,11 +135,14 @@ protected boolean accumulateExceptions() {
}

private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, BanLock banLock) {
sendSetBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId(), reason), banLock);
sendSetBanRequest(nodes,
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
banLock);
}

private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
sendRemoveBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId()));
sendRemoveBanRequest(nodes,
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())));
}

private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, BanLock banLock) {
Expand All @@ -148,8 +151,8 @@ private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request,
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
if (discoveryNode != null) {
// Check if node still in the cluster
logger.debug("Sending ban for tasks with the parent [{}:{}] to the node [{}], ban [{}]", request.parentNodeId, request
.parentTaskId, node, request.ban);
logger.debug("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
request.ban);
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
Expand All @@ -164,8 +167,8 @@ public void handleException(TransportException exp) {
});
} else {
banLock.onBanSet();
logger.debug("Cannot send ban for tasks with the parent [{}:{}] to the node [{}] - the node no longer in the cluster",
request.parentNodeId, request.parentTaskId, node);
logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster",
request.parentTaskId, node);
}
}
}
Expand All @@ -176,13 +179,12 @@ private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest reques
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
if (discoveryNode != null) {
// Check if node still in the cluster
logger.debug("Sending remove ban for tasks with the parent [{}:{}] to the node [{}]", request.parentNodeId,
request.parentTaskId, node);
logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
.INSTANCE_SAME);
} else {
logger.debug("Cannot send remove ban request for tasks with the parent [{}:{}] to the node [{}] - the node no longer in " +
"the cluster", request.parentNodeId, request.parentTaskId, node);
logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " +
"the cluster", request.parentTaskId, node);
}
}
}
Expand Down Expand Up @@ -218,23 +220,27 @@ public void finish() {

private static class BanParentTaskRequest extends TransportRequest {

private String parentNodeId;

private long parentTaskId;
private TaskId parentTaskId;

private boolean ban;

private String reason;

BanParentTaskRequest(String parentNodeId, long parentTaskId, String reason) {
this.parentNodeId = parentNodeId;
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) {
return new BanParentTaskRequest(parentTaskId, reason);
}

static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) {
return new BanParentTaskRequest(parentTaskId);
}

private BanParentTaskRequest(TaskId parentTaskId, String reason) {
this.parentTaskId = parentTaskId;
this.ban = true;
this.reason = reason;
}

BanParentTaskRequest(String parentNodeId, long parentTaskId) {
this.parentNodeId = parentNodeId;
private BanParentTaskRequest(TaskId parentTaskId) {
this.parentTaskId = parentTaskId;
this.ban = false;
}
Expand All @@ -245,8 +251,7 @@ public BanParentTaskRequest() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentNodeId = in.readString();
parentTaskId = in.readLong();
parentTaskId = new TaskId(in);
ban = in.readBoolean();
if (ban) {
reason = in.readString();
Expand All @@ -256,8 +261,7 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(parentNodeId);
out.writeLong(parentTaskId);
parentTaskId.writeTo(out);
out.writeBoolean(ban);
if (ban) {
out.writeString(reason);
Expand All @@ -269,13 +273,13 @@ class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRe
@Override
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
if (request.ban) {
logger.debug("Received ban for the parent [{}:{}] on the node [{}], reason: [{}]", request.parentNodeId, request
.parentTaskId, clusterService.localNode().getId(), request.reason);
taskManager.setBan(request.parentNodeId, request.parentTaskId, request.reason);
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
clusterService.localNode().getId(), request.reason);
taskManager.setBan(request.parentTaskId, request.reason);
} else {
logger.debug("Removing ban for the parent [{}:{}] on the node [{}]", request.parentNodeId, request.parentTaskId,
logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId,
clusterService.localNode().getId());
taskManager.removeBan(request.parentNodeId, request.parentTaskId);
taskManager.removeBan(request.parentTaskId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {

private boolean detailed = false;

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
public ListTasksRequest(String... nodesIds) {
super(nodesIds);
}

/**
* Should the detailed task information be returned.
*/
Expand All @@ -48,7 +40,7 @@ public boolean detailed() {
}

/**
* Should the node settings be returned.
* Should the detailed task information be returned.
*/
public ListTasksRequest detailed(boolean detailed) {
this.detailed = detailed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
builder.startArray("tasks");
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE);
task.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
builder.endObject();
builder.endObject();
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -41,7 +42,7 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {

private final DiscoveryNode node;

private final long id;
private final TaskId taskId;

private final String type;

Expand All @@ -51,28 +52,21 @@ public class TaskInfo implements Writeable<TaskInfo>, ToXContent {

private final Task.Status status;

private final String parentNode;
private final TaskId parentTaskId;

private final long parentId;

public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status) {
this(node, id, type, action, description, status, null, -1L);
}

public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, String parentNode, long parentId) {
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, TaskId parentTaskId) {
this.node = node;
this.id = id;
this.taskId = new TaskId(node.getId(), id);
this.type = type;
this.action = action;
this.description = description;
this.status = status;
this.parentNode = parentNode;
this.parentId = parentId;
this.parentTaskId = parentTaskId;
}

public TaskInfo(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
id = in.readLong();
taskId = new TaskId(node.getId(), in.readLong());
type = in.readString();
action = in.readString();
description = in.readOptionalString();
Expand All @@ -81,16 +75,19 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
status = null;
}
parentNode = in.readOptionalString();
parentId = in.readLong();
parentTaskId = new TaskId(in);
}

public TaskId getTaskId() {
return taskId;
}

public DiscoveryNode getNode() {
return node;
}

public long getId() {
return id;
return taskId.getId();
}

public String getType() {
Expand All @@ -113,12 +110,8 @@ public Task.Status getStatus() {
return status;
}

public String getParentNode() {
return parentNode;
}

public long getParentId() {
return parentId;
public TaskId getParentTaskId() {
return parentTaskId;
}

@Override
Expand All @@ -129,7 +122,7 @@ public TaskInfo readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(id);
out.writeLong(taskId.getId());
out.writeString(type);
out.writeString(action);
out.writeOptionalString(description);
Expand All @@ -139,15 +132,13 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
out.writeOptionalString(parentNode);
out.writeLong(parentId);
parentTaskId.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("node", node.getId());
builder.field("id", id);
builder.field("id", taskId.getId());
builder.field("type", type);
builder.field("action", action);
if (status != null) {
Expand All @@ -156,11 +147,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (description != null) {
builder.field("description", description);
}
if (parentNode != null) {
builder.field("parent_node", parentNode);
builder.field("parent_id", parentId);
if (parentTaskId.isSet() == false) {
builder.field("parent_task_id", parentTaskId.toString());
}
builder.endObject();
return builder;
}
}
Loading

0 comments on commit d6af669

Please sign in to comment.