Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Implement basic cluster formation #33668

Merged
merged 55 commits into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
6cf5ca2
Introduce test suite
DaveCTurner Sep 11, 2018
72cc9a5
Basic structure
DaveCTurner Sep 11, 2018
70b7857
Initialise prevoting components
DaveCTurner Sep 11, 2018
3ef0cd7
Add assertions and extract class
DaveCTurner Sep 11, 2018
9df9464
Stuff
DaveCTurner Sep 11, 2018
349095e
Send start join requests and respond with join requests
DaveCTurner Sep 11, 2018
655578e
Initialises a bunch of stuff
DaveCTurner Sep 11, 2018
e07875a
MOre plumbing
DaveCTurner Sep 11, 2018
3b0d634
Bump term and also try and join self
DaveCTurner Sep 11, 2018
8af79fa
Simplify
DaveCTurner Sep 11, 2018
25dc957
Deal with multiple nodes
DaveCTurner Sep 11, 2018
caf999b
Plumbing to allow publication/commit to take place
DaveCTurner Sep 13, 2018
93bbdd4
Add toString() methods to tasks for simpler debugging
DaveCTurner Sep 13, 2018
eb2ee5e
Actually do the publication
DaveCTurner Sep 13, 2018
800ed5b
Strengthen tests
DaveCTurner Sep 13, 2018
c2f7ecd
Test now submits a value to all nodes
DaveCTurner Sep 13, 2018
e0ffce0
Fix up ElectionSchedulerFactoryTests
DaveCTurner Sep 13, 2018
6888884
Overly assertive
DaveCTurner Sep 13, 2018
7d633ca
Stuff
DaveCTurner Sep 13, 2018
2fd0d76
Only track joins
DaveCTurner Sep 13, 2018
3bc2307
Rename test
DaveCTurner Sep 13, 2018
49ba396
Fix time value bounds test
DaveCTurner Sep 13, 2018
3972c52
Precommit
DaveCTurner Sep 13, 2018
8b17a88
This method was already there
DaveCTurner Sep 14, 2018
7e7b60c
Whitespace
DaveCTurner Sep 14, 2018
3f07062
Ok to get the mutex twice
DaveCTurner Sep 14, 2018
11cdff8
Reword
DaveCTurner Sep 14, 2018
0e6fa0b
Not so public
DaveCTurner Sep 14, 2018
deaef21
Synchronisation
DaveCTurner Sep 14, 2018
8327b87
No special-case logging
DaveCTurner Sep 14, 2018
703febb
Add comment about why this is public
DaveCTurner Sep 14, 2018
ff90397
Streamy
DaveCTurner Sep 14, 2018
27e7252
Use nodeIdFromIndex
DaveCTurner Sep 14, 2018
90f0059
Handling a StartJoin hits the disk so don't run it on the network thread
DaveCTurner Sep 14, 2018
c8e2221
Whitespace
DaveCTurner Sep 14, 2018
b90e8ee
Send start-join request to self so it happens in the background
DaveCTurner Sep 14, 2018
1d2913a
Add '/coordination/' to actions
DaveCTurner Sep 14, 2018
d78a864
Move start-join things to JoinHelper
DaveCTurner Sep 14, 2018
98e7ad6
Do not set state until the first update() call
DaveCTurner Sep 14, 2018
f331b9a
Improve toString()s
DaveCTurner Sep 14, 2018
16c725f
Imports
DaveCTurner Sep 14, 2018
153dbf3
Solve handshake deadlock by responding inline rather than by skipping…
DaveCTurner Sep 14, 2018
5693494
Reformat
DaveCTurner Sep 14, 2018
2abe6ae
Imports
DaveCTurner Sep 14, 2018
d0f5fa9
Use genuine HandshakingTransportAddressConnector
DaveCTurner Sep 14, 2018
38b0956
Ignore CSREs thrown by tasks
DaveCTurner Sep 14, 2018
f219511
Oops, following wrong leader
DaveCTurner Sep 14, 2018
a58798d
updateMaxTermSeen not used
DaveCTurner Sep 14, 2018
b5d0d79
Update max term seen when handling prevote responses
DaveCTurner Sep 14, 2018
aba2e97
Expose leader rather than just isActive for a tighter invariant
DaveCTurner Sep 14, 2018
a036702
Imports
DaveCTurner Sep 14, 2018
af8a335
Merge branch 'zen2' into 2018-09-11-coordinator-tests
DaveCTurner Sep 14, 2018
576ca62
Merge remote-tracking branch 'elastic/zen2' into pr/33668
ywelsch Sep 17, 2018
e0bef12
randomSubsetOf now possibly returns original list
ywelsch Sep 17, 2018
ff0e312
fix monitoring test
ywelsch Sep 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public String toString() {
return "ApplyCommitRequest{" +
"term=" + term +
", version=" + version +
", sourceNode=" + sourceNode +
'}';
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
Expand All @@ -27,29 +28,37 @@
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;

public class JoinHelper extends AbstractComponent {

public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";

private final MasterService masterService;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;

public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler) {
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm) {
super(settings);
this.masterService = masterService;
this.transportService = transportService;
Expand Down Expand Up @@ -100,6 +109,62 @@ public String toString() {
return "JoinCallback{request=" + request + "}";
}
}));

transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
(request, channel, task) -> {
final DiscoveryNode destination = request.getSourceNode();
final JoinRequest joinRequest
= new JoinRequest(transportService.getLocalNode(), Optional.of(joinLeaderInTerm.apply(request)));
logger.debug("attempting to join {} with {}", destination, joinRequest);
this.transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
logger.debug("successfully joined {} with {}", destination, joinRequest);
}

@Override
public void handleException(TransportException exp) {
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
}

@Override
public String executor() {
return Names.SAME;
}
});
channel.sendResponse(Empty.INSTANCE);
});
}

public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
startJoinRequest, new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
logger.debug("successful response to {} from {}", startJoinRequest, destination);
}

@Override
public void handleException(TransportException exp) {
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}

public interface JoinCallback {
Expand Down Expand Up @@ -211,7 +276,8 @@ public void close(Mode newMode) {

@Override
public String toString() {
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}';
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() +
", closed=" + closed + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -45,13 +46,11 @@ public class PreVoteCollector extends AbstractComponent {
private final TransportService transportService;
private final Runnable startElection;

// Tuple for simple atomic updates
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader
// Tuple for simple atomic updates. null until the first call to `update()`.
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader.

PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
final TransportService transportService, final Runnable startElection) {
PreVoteCollector(final Settings settings, final TransportService transportService, final Runnable startElection) {
super(settings);
state = new Tuple<>(null, preVoteResponse);
this.transportService = transportService;
this.startElection = startElection;

Expand All @@ -74,7 +73,7 @@ public Releasable start(final ClusterState clusterState, final Iterable<Discover
return preVotingRound;
}

public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) {
public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void onPossibleCommitFailure() {

protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);

protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response);
protected abstract void onJoin(Join join);

protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
Expand Down Expand Up @@ -287,8 +287,10 @@ public void onResponse(PublishWithJoinResponse response) {
return;
}

// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
onPossibleJoin(discoveryNode, response);
response.getJoin().ifPresent(join -> {
assert discoveryNode.equals(join.getSourceNode());
onJoin(join);
});

assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
Expand All @@ -65,20 +66,22 @@ public abstract class PeerFinder extends AbstractComponent {
private final TransportService transportService;
private final TransportAddressConnector transportAddressConnector;
private final ConfiguredHostsResolver configuredHostsResolver;
private final LongConsumer updateMaxTermSeen;

private volatile long currentTerm;
private boolean active;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();

PeerFinder(Settings settings, TransportService transportService,
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver, LongConsumer updateMaxTermSeen) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
this.updateMaxTermSeen = updateMaxTermSeen;

transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
PeersRequest::new,
Expand All @@ -95,6 +98,8 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
leader = Optional.empty();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}

onFoundPeersUpdated(); // trigger a check for a quorum already
}

public void deactivate(DiscoveryNode leader) {
Expand Down Expand Up @@ -160,6 +165,11 @@ private DiscoveryNode getLocalNode() {
*/
protected abstract void onFoundPeersUpdated();

// only for assertions, but accessed from separate package o.e.c.coordination so needs to be public
public boolean isActive() {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
return active;
}

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Expand Down Expand Up @@ -247,7 +257,7 @@ protected void doRun() {

@Override
public String toString() {
return "PeerFinder::handleWakeUp";
return "PeerFinder handling wakeup";
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@ public void onFailure(Exception e) {
"failed to notify channel of error message for action [{}]", action), inner);
}
}

@Override
public String toString() {
return "processing of [" + action + "][" + requestId + "]: " + request;
}
});
}

Expand Down Expand Up @@ -1049,6 +1054,11 @@ public void cancel() {
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
FutureUtils.cancel(future);
}

@Override
public String toString() {
return "TimeoutHandler for [" + action + "][" + requestId + "]";
}
}

static class TimeoutInfoHolder {
Expand Down Expand Up @@ -1176,7 +1186,17 @@ public void sendResponse(final TransportResponse response, TransportResponseOpti
if (ThreadPool.Names.SAME.equals(executor)) {
processResponse(handler, response);
} else {
threadPool.executor(executor).execute(() -> processResponse(handler, response));
threadPool.executor(executor).execute(new Runnable() {
@Override
public String toString() {
return "delivery of response to [" + action + "][" + requestId + "]: " + response;
}

@Override
public void run() {
DirectResponseChannel.this.processResponse(handler, response);
}
});
}
}
}
Expand Down
Loading