Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2: Add node id to log output of CoordinatorTests #33929

Merged
merged 4 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,8 @@
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;

Expand All @@ -56,7 +51,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -185,7 +179,7 @@ class ClusterNode extends AbstractComponent {
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private MockTransport mockTransport;
private DisruptableMockTransport mockTransport;

ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
Expand All @@ -207,106 +201,36 @@ private DiscoveryNode createDiscoveryNode() {
}

private void setUp() {
mockTransport = new MockTransport() {
mockTransport = new DisruptableMockTransport(logger) {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
super.onSendRequest(requestId, action, request, destination);
protected DiscoveryNode getLocalNode() {
return localNode;
}

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
}

// connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
final Consumer<Runnable> scheduler;
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) {
final Predicate<ClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
scheduler = Runnable::run;
matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress());
matchesDestination = n -> n.getLocalNode().getAddress().equals(node.getAddress());
} else {
scheduler = deterministicTaskQueue::scheduleNow;
matchesDestination = n -> n.getLocalNode().equals(destination);
matchesDestination = n -> n.getLocalNode().equals(node);
}
return clusterNodes.stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport);
}

scheduler.accept(onNode(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
clusterNodes.stream().filter(matchesDestination).findAny().ifPresent(
destinationNode -> {

final RequestHandlerRegistry requestHandler
= destinationNode.mockTransport.getRequestHandler(action);

final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}

@Override
public String getChannelType() {
return "coordinator-test-channel";
}

@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(onNode(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleResponse(requestId, response);
}
}, localNode));
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}

@Override
public void sendResponse(Exception exception) {
scheduler.accept(onNode(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, exception);
}
}, localNode));
}
};

try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(onNode(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, e);
}
}, localNode));
}
}
);
}
}, destination));
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
// handshake needs to run inline as the caller blockingly waits on the result
if (action.equals(HANDSHAKE_ACTION_NAME)) {
onNode(doDelivery, destination).run();
} else {
deterministicTaskQueue.scheduleNow(onNode(doDelivery, destination));
}
}
};

Expand Down Expand Up @@ -362,12 +286,6 @@ private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
}
}

@SuppressWarnings("unchecked")
private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler,
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
}

private static Runnable onNode(Runnable runnable, DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to put the DiscoveryNode thing first, otherwise it ends up a long way from the function call if the Runnable is an anonymous class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. With the recent changes, there were no longer any anonymous classes. I've still changed the order

return new Runnable() {
@Override
Expand All @@ -379,7 +297,7 @@ public void run() {

@Override
public String toString() {
return runnable.toString() + " (wrapped for " + node + ")";
return node.getId() + ": " + runnable.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having the UUID of the disco node will be useful here when we start rebooting nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the ephemeral id

}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public DisruptableMockTransport(Logger logger) {

protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination);

protected abstract Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node);
protected abstract Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action);

protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery);
protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery);

private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
handle(sender, destination, new Runnable() {
private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
handle(sender, destination, action, new Runnable() {
@Override
public void run() {
if (getDisruptedCapturingTransport(destination).isPresent()) {
if (getDisruptedCapturingTransport(destination, action).isPresent()) {
doDelivery.run();
} else {
logger.trace("unknown destination in {}", this);
Expand All @@ -59,7 +59,7 @@ public void run() {

@Override
public String toString() {
return doDelivery.toString() + " from " + sender + " to " + destination;
return doDelivery.toString();
}
});
}
Expand All @@ -70,7 +70,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
super.onSendRequest(requestId, action, request, destination);

final String requestDescription = new ParameterizedMessage("{}[{}] from {} to {}",
final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();

final Runnable returnConnectException = new Runnable() {
Expand All @@ -85,7 +85,7 @@ public String toString() {
}
};

sendFromTo(getLocalNode(), destination, new Runnable() {
sendFromTo(getLocalNode(), destination, action, new Runnable() {
@Override
public void run() {
switch (getConnectionStatus(getLocalNode(), destination)) {
Expand All @@ -94,11 +94,11 @@ public void run() {
break;

case DISCONNECTED:
sendFromTo(destination, getLocalNode(), returnConnectException);
sendFromTo(destination, getLocalNode(), action, returnConnectException);
break;

case CONNECTED:
Optional<DisruptableMockTransport> destinationTransport = getDisruptedCapturingTransport(destination);
Optional<DisruptableMockTransport> destinationTransport = getDisruptedCapturingTransport(destination, action);
assert destinationTransport.isPresent();

final RequestHandlerRegistry<TransportRequest> requestHandler =
Expand All @@ -117,7 +117,7 @@ public String getChannelType() {

@Override
public void sendResponse(final TransportResponse response) {
sendFromTo(destination, getLocalNode(), new Runnable() {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
Expand All @@ -143,7 +143,7 @@ public void sendResponse(TransportResponse response,

@Override
public void sendResponse(Exception exception) {
sendFromTo(destination, getLocalNode(), new Runnable() {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNo
}

@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination) {
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination, String action) {
int index = discoNodes.indexOf(destination);
if (index == -1) {
return Optional.empty();
Expand All @@ -117,7 +117,7 @@ protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(Disc
}

@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
deterministicTaskQueue.scheduleNow(doDelivery);
}
};
Expand All @@ -134,7 +134,7 @@ protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNo
}

@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination) {
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination, String action) {
int index = discoNodes.indexOf(destination);
if (index == -1) {
return Optional.empty();
Expand All @@ -144,7 +144,7 @@ protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(Disc
}

@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
deterministicTaskQueue.scheduleNow(doDelivery);
}
};
Expand Down