Skip to content

Commit

Permalink
Zen2: Add node id to log output of CoordinatorTests (#33929)
Browse files Browse the repository at this point in the history
With recent changes to the logging framework, the node name can no longer be injected into the logging output using the node.name setting, which means that for the CoordinatorTests (which are simulating a cluster in a fully deterministic fashion using a single thread), as all the different nodes are running under the same test thread, we are not able to distinguish which log lines are coming from which node. This commit readds logging for node ids in the CoordinatorTests, making two very small changes to DeterministicTaskQueue and TestThreadInfoPatternConverter.
  • Loading branch information
ywelsch authored Sep 21, 2018
1 parent 187f787 commit a612dd1
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,16 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
: "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']';
currentPublication = Optional.of(publication);

transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, () -> {
synchronized (mutex) {
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
publication.onTimeout();
}

@Override
public String toString() {
return "scheduled timeout for " + publication;
}
});
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
}
Expand Down Expand Up @@ -625,15 +631,23 @@ protected void onFoundPeersUpdated() {
if (foundQuorum) {
if (electionScheduler == null) {
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, () -> {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}

@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,22 @@ protected synchronized void done() {

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.submit(() -> {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public String toString() {
return "ListenableFuture notification";
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class TransportResponse extends TransportMessage {

public static class Empty extends TransportResponse {
public static final Empty INSTANCE = new Empty();

@Override
public String toString() {
return "Empty{}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -38,13 +39,8 @@
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;

Expand All @@ -55,7 +51,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -184,15 +179,15 @@ class ClusterNode extends AbstractComponent {
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private MockTransport mockTransport;
private DisruptableMockTransport mockTransport;

ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
setUp();
onNode(localNode, this::setUp).run();
}

private DiscoveryNode createDiscoveryNode() {
Expand All @@ -206,112 +201,44 @@ private DiscoveryNode createDiscoveryNode() {
}

private void setUp() {
mockTransport = new MockTransport() {
mockTransport = new DisruptableMockTransport(logger) {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
super.onSendRequest(requestId, action, request, destination);
protected DiscoveryNode getLocalNode() {
return localNode;
}

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
}

// connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
final Consumer<Runnable> scheduler;
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) {
final Predicate<ClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
scheduler = Runnable::run;
matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress());
matchesDestination = n -> n.getLocalNode().getAddress().equals(node.getAddress());
} else {
scheduler = deterministicTaskQueue::scheduleNow;
matchesDestination = n -> n.getLocalNode().equals(destination);
matchesDestination = n -> n.getLocalNode().equals(node);
}
return clusterNodes.stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport);
}

scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
clusterNodes.stream().filter(matchesDestination).findAny().ifPresent(
destinationNode -> {

final RequestHandlerRegistry requestHandler
= destinationNode.mockTransport.getRequestHandler(action);

final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}

@Override
public String getChannelType() {
return "coordinator-test-channel";
}

@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleResponse(requestId, response);
}
});
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}

@Override
public void sendResponse(Exception exception) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, exception);
}
});
}
};

try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, e);
}
});
}
}
);
}
});
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
// handshake needs to run inline as the caller blockingly waits on the result
if (action.equals(HANDSHAKE_ACTION_NAME)) {
onNode(destination, doDelivery).run();
} else {
deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery));
}
}
};

masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
masterService = new FakeThreadPoolMasterService("test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(localNode, runnable)));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
Expand Down Expand Up @@ -359,9 +286,20 @@ private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
}
}

@SuppressWarnings("unchecked")
private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler,
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
private static Runnable onNode(DiscoveryNode node, Runnable runnable) {
final String nodeId = "{" + node.getId() + "}{" + node.getEphemeralId() + "}";
return new Runnable() {
@Override
public void run() {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", nodeId)) {
runnable.run();
}
}

@Override
public String toString() {
return nodeId + ": " + runnable.toString();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class DeterministicTaskQueue extends AbstractComponent {

Expand Down Expand Up @@ -182,6 +183,13 @@ public void advanceTime() {
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return getExecutorService(Function.identity());
}

/**
* @return A <code>ExecutorService</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ExecutorService getExecutorService(Function<Runnable, Runnable> runnableWrapper) {
return new ExecutorService() {

@Override
Expand Down Expand Up @@ -246,7 +254,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti

@Override
public void execute(Runnable command) {
scheduleNow(command);
scheduleNow(runnableWrapper.apply(command));
}
};
}
Expand All @@ -255,6 +263,13 @@ public void execute(Runnable command) {
* @return A <code>ThreadPool</code> that uses this task queue.
*/
public ThreadPool getThreadPool() {
return getThreadPool(Function.identity());
}

/**
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
return new ThreadPool(settings) {

{
Expand Down Expand Up @@ -303,12 +318,12 @@ public ThreadPoolStats stats() {

@Override
public ExecutorService generic() {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
public ExecutorService executor(String name) {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
Expand All @@ -318,7 +333,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
final int CANCELLED = 2;
final AtomicInteger taskState = new AtomicInteger(NOT_STARTED);

scheduleAt(currentTimeMillis + delay.millis(), new Runnable() {
scheduleAt(currentTimeMillis + delay.millis(), runnableWrapper.apply(new Runnable() {
@Override
public void run() {
if (taskState.compareAndSet(NOT_STARTED, STARTED)) {
Expand All @@ -330,7 +345,7 @@ public void run() {
public String toString() {
return command.toString();
}
});
}));

return new ScheduledFuture<Object>() {
@Override
Expand Down
Loading

0 comments on commit a612dd1

Please sign in to comment.