Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2: Add node id to log output of CoordinatorTests #33929

Merged
merged 4 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -192,7 +194,7 @@ class ClusterNode extends AbstractComponent {
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
setUp();
wrap(this::setUp, localNode).run();
}

private DiscoveryNode createDiscoveryNode() {
Expand Down Expand Up @@ -223,7 +225,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
matchesDestination = n -> n.getLocalNode().equals(destination);
}

scheduler.accept(new Runnable() {
scheduler.accept(wrap(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
Expand All @@ -250,7 +252,7 @@ public String getChannelType() {

@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(new Runnable() {
scheduler.accept(wrap(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
Expand All @@ -261,7 +263,7 @@ public String toString() {
public void run() {
handleResponse(requestId, response);
}
});
}, localNode));
}

@Override
Expand All @@ -271,7 +273,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op

@Override
public void sendResponse(Exception exception) {
scheduler.accept(new Runnable() {
scheduler.accept(wrap(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
Expand All @@ -282,14 +284,14 @@ public String toString() {
public void run() {
handleRemoteError(requestId, exception);
}
});
}, localNode));
}
};

try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(new Runnable() {
scheduler.accept(wrap(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
Expand All @@ -300,18 +302,19 @@ public String toString() {
public void run() {
handleRemoteError(requestId, e);
}
});
}, localNode));
}
}
);
}
});
}, destination));
}
};

masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
masterService = new FakeThreadPoolMasterService("test", wrap(deterministicTaskQueue::scheduleNow, localNode));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
settings, deterministicTaskQueue.getThreadPool(wrapper(localNode)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode,
null, emptySet());
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
Expand Down Expand Up @@ -364,4 +367,28 @@ private static void processMessageReceived(TransportRequest request, RequestHand
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
}

private static Consumer<Runnable> wrap(Consumer<Runnable> runnableConsumer, DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd inline this, it's only used in one place.

return runnable -> runnableConsumer.accept(wrap(runnable, node));
}

private static Function<Runnable, Runnable> wrapper(DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd inline this, it's only used in one place.

return runnable -> wrap(runnable, node);
}

private static Runnable wrap(Runnable runnable, DiscoveryNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be onNode(DiscoveryNode node, Runnable runnable) - makes it a bit more readable at its call sites?

return new Runnable() {
@Override
public void run() {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", node.getId())) {
runnable.run();
}
}

@Override
public String toString() {
return runnable.toString() + " (wrapped for " + node + ")";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class DeterministicTaskQueue extends AbstractComponent {

Expand Down Expand Up @@ -182,6 +183,13 @@ public void advanceTime() {
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return getExecutorService(Function.identity());
}

/**
* @return A <code>ExecutorService</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ExecutorService getExecutorService(Function<Runnable, Runnable> runnableWrapper) {
return new ExecutorService() {

@Override
Expand Down Expand Up @@ -246,7 +254,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti

@Override
public void execute(Runnable command) {
scheduleNow(command);
scheduleNow(runnableWrapper.apply(command));
}
};
}
Expand All @@ -255,6 +263,13 @@ public void execute(Runnable command) {
* @return A <code>ThreadPool</code> that uses this task queue.
*/
public ThreadPool getThreadPool() {
return getThreadPool(Function.identity());
}

/**
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
return new ThreadPool(settings) {

{
Expand Down Expand Up @@ -303,12 +318,12 @@ public ThreadPoolStats stats() {

@Override
public ExecutorService generic() {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
public ExecutorService executor(String name) {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
Expand All @@ -318,7 +333,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
final int CANCELLED = 2;
final AtomicInteger taskState = new AtomicInteger(NOT_STARTED);

scheduleAt(currentTimeMillis + delay.millis(), new Runnable() {
scheduleAt(currentTimeMillis + delay.millis(), runnableWrapper.apply(new Runnable() {
@Override
public void run() {
if (taskState.compareAndSet(NOT_STARTED, STARTED)) {
Expand All @@ -330,7 +345,7 @@ public void run() {
public String toString() {
return command.toString();
}
});
}));

return new ScheduledFuture<Object>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ private TestThreadInfoPatternConverter() {
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
toAppendTo.append(threadInfo(event.getThreadName()));
if (event.getContextData().isEmpty() == false) {
toAppendTo.append(event.getContextData());
}
}

private static final Pattern ELASTICSEARCH_THREAD_NAME_PATTERN =
Expand All @@ -66,6 +69,7 @@ public void format(LogEvent event, StringBuilder toAppendTo) {
Pattern.compile("SUITE-.+-worker");
private static final Pattern NOT_YET_NAMED_NODE_THREAD_NAME_PATTERN =
Pattern.compile("test_SUITE-CHILD_VM.+cluster\\[T#(.+)\\]");

static String threadInfo(String threadName) {
Matcher m = ELASTICSEARCH_THREAD_NAME_PATTERN.matcher(threadName);
if (m.matches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
Expand Down Expand Up @@ -264,6 +265,34 @@ public void testThreadPoolEnqueuesTasks() {
assertThat(strings, containsInAnyOrder("foo", "bar"));
}

public void testThreadPoolWrapsRunnable() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final AtomicBoolean called = new AtomicBoolean();
final ThreadPool threadPool = taskQueue.getThreadPool(runnable -> () -> {
assertFalse(called.get());
called.set(true);
runnable.run();
});
threadPool.generic().execute(() -> logger.info("runnable executed"));
assertFalse(called.get());
taskQueue.runAllRunnableTasks();
assertTrue(called.get());
}

public void testExecutorServiceWrapsRunnable() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final AtomicBoolean called = new AtomicBoolean();
final ExecutorService executorService = taskQueue.getExecutorService(runnable -> () -> {
assertFalse(called.get());
called.set(true);
runnable.run();
});
executorService.execute(() -> logger.info("runnable executed"));
assertFalse(called.get());
taskQueue.runAllRunnableTasks();
assertTrue(called.get());
}

public void testThreadPoolSchedulesFutureTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
advanceToRandomTime(taskQueue);
Expand Down