Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Implement basic cluster formation #33668

Merged
merged 55 commits into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
6cf5ca2
Introduce test suite
DaveCTurner Sep 11, 2018
72cc9a5
Basic structure
DaveCTurner Sep 11, 2018
70b7857
Initialise prevoting components
DaveCTurner Sep 11, 2018
3ef0cd7
Add assertions and extract class
DaveCTurner Sep 11, 2018
9df9464
Stuff
DaveCTurner Sep 11, 2018
349095e
Send start join requests and respond with join requests
DaveCTurner Sep 11, 2018
655578e
Initialises a bunch of stuff
DaveCTurner Sep 11, 2018
e07875a
MOre plumbing
DaveCTurner Sep 11, 2018
3b0d634
Bump term and also try and join self
DaveCTurner Sep 11, 2018
8af79fa
Simplify
DaveCTurner Sep 11, 2018
25dc957
Deal with multiple nodes
DaveCTurner Sep 11, 2018
caf999b
Plumbing to allow publication/commit to take place
DaveCTurner Sep 13, 2018
93bbdd4
Add toString() methods to tasks for simpler debugging
DaveCTurner Sep 13, 2018
eb2ee5e
Actually do the publication
DaveCTurner Sep 13, 2018
800ed5b
Strengthen tests
DaveCTurner Sep 13, 2018
c2f7ecd
Test now submits a value to all nodes
DaveCTurner Sep 13, 2018
e0ffce0
Fix up ElectionSchedulerFactoryTests
DaveCTurner Sep 13, 2018
6888884
Overly assertive
DaveCTurner Sep 13, 2018
7d633ca
Stuff
DaveCTurner Sep 13, 2018
2fd0d76
Only track joins
DaveCTurner Sep 13, 2018
3bc2307
Rename test
DaveCTurner Sep 13, 2018
49ba396
Fix time value bounds test
DaveCTurner Sep 13, 2018
3972c52
Precommit
DaveCTurner Sep 13, 2018
8b17a88
This method was already there
DaveCTurner Sep 14, 2018
7e7b60c
Whitespace
DaveCTurner Sep 14, 2018
3f07062
Ok to get the mutex twice
DaveCTurner Sep 14, 2018
11cdff8
Reword
DaveCTurner Sep 14, 2018
0e6fa0b
Not so public
DaveCTurner Sep 14, 2018
deaef21
Synchronisation
DaveCTurner Sep 14, 2018
8327b87
No special-case logging
DaveCTurner Sep 14, 2018
703febb
Add comment about why this is public
DaveCTurner Sep 14, 2018
ff90397
Streamy
DaveCTurner Sep 14, 2018
27e7252
Use nodeIdFromIndex
DaveCTurner Sep 14, 2018
90f0059
Handling a StartJoin hits the disk so don't run it on the network thread
DaveCTurner Sep 14, 2018
c8e2221
Whitespace
DaveCTurner Sep 14, 2018
b90e8ee
Send start-join request to self so it happens in the background
DaveCTurner Sep 14, 2018
1d2913a
Add '/coordination/' to actions
DaveCTurner Sep 14, 2018
d78a864
Move start-join things to JoinHelper
DaveCTurner Sep 14, 2018
98e7ad6
Do not set state until the first update() call
DaveCTurner Sep 14, 2018
f331b9a
Improve toString()s
DaveCTurner Sep 14, 2018
16c725f
Imports
DaveCTurner Sep 14, 2018
153dbf3
Solve handshake deadlock by responding inline rather than by skipping…
DaveCTurner Sep 14, 2018
5693494
Reformat
DaveCTurner Sep 14, 2018
2abe6ae
Imports
DaveCTurner Sep 14, 2018
d0f5fa9
Use genuine HandshakingTransportAddressConnector
DaveCTurner Sep 14, 2018
38b0956
Ignore CSREs thrown by tasks
DaveCTurner Sep 14, 2018
f219511
Oops, following wrong leader
DaveCTurner Sep 14, 2018
a58798d
updateMaxTermSeen not used
DaveCTurner Sep 14, 2018
b5d0d79
Update max term seen when handling prevote responses
DaveCTurner Sep 14, 2018
aba2e97
Expose leader rather than just isActive for a tighter invariant
DaveCTurner Sep 14, 2018
a036702
Imports
DaveCTurner Sep 14, 2018
af8a335
Merge branch 'zen2' into 2018-09-11-coordinator-tests
DaveCTurner Sep 14, 2018
576ca62
Merge remote-tracking branch 'elastic/zen2' into pr/33668
ywelsch Sep 17, 2018
e0bef12
randomSubsetOf now possibly returns original list
ywelsch Sep 17, 2018
ff0e312
fix monitoring test
ywelsch Sep 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public String toString() {
return "ApplyCommitRequest{" +
"term=" + term +
", version=" + version +
", sourceNode=" + sourceNode +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ public void invariant() {
assert publishVotes.isEmpty() || electionWon();
}

public boolean containsJoinVote(DiscoveryNode node) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
return joinVotes.containsVoteFor(node);
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
Expand All @@ -27,9 +28,14 @@
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -102,6 +108,35 @@ public String toString() {
}));
}

public void sendJoin(final DiscoveryNode destination, final JoinRequest joinRequest) {
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
logger.debug("successfully joined {} with {}", destination, joinRequest);
}

@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
if (rootCause instanceof CoordinationStateRejectedException) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("failed to join {} with {}: {}", destination, joinRequest, rootCause.getMessage());
} else {
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
}
}

@Override
public String executor() {
return Names.SAME;
}
});
}

public interface JoinCallback {
void onSuccess();

Expand Down Expand Up @@ -211,7 +246,8 @@ public void close(Mode newMode) {

@Override
public String toString() {
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}';
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() +
", closed=" + closed + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public class PreVoteCollector extends AbstractComponent {
// Tuple for simple atomic updates
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader

PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
final TransportService transportService, final Runnable startElection) {
PreVoteCollector(final Settings settings, final TransportService transportService, final Runnable startElection) {
super(settings);
state = new Tuple<>(null, preVoteResponse);
state = new Tuple<>(null, new PreVoteResponse(0, 0, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are required to keep the PreVoteCollector abreast of changes in the current pre-vote response via the update() method, but we create it before we know what the initial response should be so have to pass in this dummy value to the constructor and call update() after loading the persistent state. On reflection we don't need to set state at all - see 98e7ad6.

this.transportService = transportService;
this.startElection = startElection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void onPossibleCommitFailure() {

protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);

protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response);
protected abstract void onJoin(Join join);

protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
Expand Down Expand Up @@ -287,8 +287,10 @@ public void onResponse(PublishWithJoinResponse response) {
return;
}

// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
onPossibleJoin(discoveryNode, response);
response.getJoin().ifPresent(join -> {
assert discoveryNode.equals(join.getSourceNode());
onJoin(join);
});

assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
Expand All @@ -65,20 +66,22 @@ public abstract class PeerFinder extends AbstractComponent {
private final TransportService transportService;
private final TransportAddressConnector transportAddressConnector;
private final ConfiguredHostsResolver configuredHostsResolver;
private final LongConsumer updateMaxTermSeen;

private volatile long currentTerm;
private boolean active;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();

PeerFinder(Settings settings, TransportService transportService,
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver, LongConsumer updateMaxTermSeen) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
this.updateMaxTermSeen = updateMaxTermSeen;

transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
PeersRequest::new,
Expand All @@ -95,6 +98,8 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
leader = Optional.empty();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}

onFoundPeersUpdated(); // trigger a check for a quorum already
}

public void deactivate(DiscoveryNode leader) {
Expand Down Expand Up @@ -160,6 +165,11 @@ private DiscoveryNode getLocalNode() {
*/
protected abstract void onFoundPeersUpdated();

// only for assertions
public boolean isActive() {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
return active;
}

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Expand Down Expand Up @@ -247,7 +257,7 @@ protected void doRun() {

@Override
public String toString() {
return "PeerFinder::handleWakeUp";
return "PeerFinder handling wakeup";
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@ public void onFailure(Exception e) {
"failed to notify channel of error message for action [{}]", action), inner);
}
}

@Override
public String toString() {
return "processing of [" + action + "][" + requestId + "]: " + request;
}
});
}

Expand Down Expand Up @@ -1176,7 +1181,17 @@ public void sendResponse(final TransportResponse response, TransportResponseOpti
if (ThreadPool.Names.SAME.equals(executor)) {
processResponse(handler, response);
} else {
threadPool.executor(executor).execute(() -> processResponse(handler, response));
threadPool.executor(executor).execute(new Runnable() {
@Override
public String toString() {
return "delivery of response to [" + action + "][" + requestId + "]: " + response;
}

@Override
public void run() {
DirectResponseChannel.this.processResponse(handler, response);
}
});
}
}
}
Expand Down
Loading