-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zen2: Add node id to log output of CoordinatorTests #33929
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
*/ | ||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.CloseableThreadContext; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.ClusterState; | ||
|
@@ -56,6 +57,7 @@ | |
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -192,7 +194,7 @@ class ClusterNode extends AbstractComponent { | |
localNode = createDiscoveryNode(); | ||
persistedState = new InMemoryPersistedState(1L, | ||
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); | ||
setUp(); | ||
wrap(this::setUp, localNode).run(); | ||
} | ||
|
||
private DiscoveryNode createDiscoveryNode() { | ||
|
@@ -223,7 +225,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req | |
matchesDestination = n -> n.getLocalNode().equals(destination); | ||
} | ||
|
||
scheduler.accept(new Runnable() { | ||
scheduler.accept(wrap(new Runnable() { | ||
@Override | ||
public String toString() { | ||
return "delivery of [" + action + "][" + requestId + "]: " + request; | ||
|
@@ -250,7 +252,7 @@ public String getChannelType() { | |
|
||
@Override | ||
public void sendResponse(final TransportResponse response) { | ||
scheduler.accept(new Runnable() { | ||
scheduler.accept(wrap(new Runnable() { | ||
@Override | ||
public String toString() { | ||
return "delivery of response " + response | ||
|
@@ -261,7 +263,7 @@ public String toString() { | |
public void run() { | ||
handleResponse(requestId, response); | ||
} | ||
}); | ||
}, localNode)); | ||
} | ||
|
||
@Override | ||
|
@@ -271,7 +273,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op | |
|
||
@Override | ||
public void sendResponse(Exception exception) { | ||
scheduler.accept(new Runnable() { | ||
scheduler.accept(wrap(new Runnable() { | ||
@Override | ||
public String toString() { | ||
return "delivery of error response " + exception.getMessage() | ||
|
@@ -282,14 +284,14 @@ public String toString() { | |
public void run() { | ||
handleRemoteError(requestId, exception); | ||
} | ||
}); | ||
}, localNode)); | ||
} | ||
}; | ||
|
||
try { | ||
processMessageReceived(request, requestHandler, transportChannel); | ||
} catch (Exception e) { | ||
scheduler.accept(new Runnable() { | ||
scheduler.accept(wrap(new Runnable() { | ||
@Override | ||
public String toString() { | ||
return "delivery of processing error response " + e.getMessage() | ||
|
@@ -300,18 +302,19 @@ public String toString() { | |
public void run() { | ||
handleRemoteError(requestId, e); | ||
} | ||
}); | ||
}, localNode)); | ||
} | ||
} | ||
); | ||
} | ||
}); | ||
}, destination)); | ||
} | ||
}; | ||
|
||
masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow); | ||
masterService = new FakeThreadPoolMasterService("test", wrap(deterministicTaskQueue::scheduleNow, localNode)); | ||
transportService = mockTransport.createTransportService( | ||
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); | ||
settings, deterministicTaskQueue.getThreadPool(wrapper(localNode)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, | ||
null, emptySet()); | ||
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), | ||
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get()); | ||
masterService.setClusterStatePublisher(coordinator); | ||
|
@@ -364,4 +367,28 @@ private static void processMessageReceived(TransportRequest request, RequestHand | |
TransportChannel transportChannel) throws Exception { | ||
requestHandler.processMessageReceived(request, transportChannel); | ||
} | ||
|
||
private static Consumer<Runnable> wrap(Consumer<Runnable> runnableConsumer, DiscoveryNode node) { | ||
return runnable -> runnableConsumer.accept(wrap(runnable, node)); | ||
} | ||
|
||
private static Function<Runnable, Runnable> wrapper(DiscoveryNode node) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd inline this, it's only used in one place. |
||
return runnable -> wrap(runnable, node); | ||
} | ||
|
||
private static Runnable wrap(Runnable runnable, DiscoveryNode node) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be |
||
return new Runnable() { | ||
@Override | ||
public void run() { | ||
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", node.getId())) { | ||
runnable.run(); | ||
} | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return runnable.toString() + " (wrapped for " + node + ")"; | ||
} | ||
}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd inline this, it's only used in one place.