Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Fix CoordinatorTests #34002

Merged
merged 8 commits into from
Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilderTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationActionTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]AbstractTermVectorsTestCase.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,19 @@ public long globalCheckpoint() {
return globalCheckpoint;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReplicaResponse that = (ReplicaResponse) o;
return localCheckpoint == that.localCheckpoint &&
globalCheckpoint == that.globalCheckpoint;
}

@Override
public int hashCode() {
return Objects.hash(localCheckpoint, globalCheckpoint);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
}

PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode()) :
publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();

synchronized (mutex) {
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
Expand Down Expand Up @@ -560,7 +563,9 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
publication.onTimeout();
synchronized (mutex) {
publication.onTimeout();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ private void handleWakeUp() {
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
if (running() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -251,7 +252,7 @@ public void close(Mode newMode) {
assert closed == false : "CandidateJoinAccumulator closed";
closed = true;
if (newMode == Mode.LEADER) {
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new HashMap<>();
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new LinkedHashMap<>();
joinRequestAccumulator.forEach((key, value) -> {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(key, "elect leader");
pendingAsTasks.put(task, new JoinTaskListener(task, value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ void handleWakeUp() {
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),

new TransportResponseHandler<TransportResponse.Empty>() {

@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
if (isClosed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -127,6 +129,11 @@ void start(final Iterable<DiscoveryNode> broadcastNodes) {
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler<PreVoteResponse>() {
@Override
public PreVoteResponse read(StreamInput in) throws IOException {
return new PreVoteResponse(in);
}

@Override
public void handleResponse(PreVoteResponse response) {
handlePreVoteResponse(response, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void start(Set<DiscoveryNode> faultyNodes) {
}

public void onTimeout() {
if (isCompleted) {
return;
}

assert timedOut == false;
timedOut = true;
if (applyCommitRequest.isPresent() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
Expand All @@ -55,8 +57,10 @@
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -127,10 +131,38 @@ public ActionRequestValidationException validate() {
}
}

class Response extends ActionResponse {}
class Response extends ActionResponse {
private long identity = randomLong();

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return identity == response.identity;
}

@Override
public int hashCode() {
return Objects.hash(identity);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(identity);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
identity = in.readLong();
}
}

class Action extends TransportMasterNodeAction<Request, Response> {
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, threadPool,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new);
}
Expand Down Expand Up @@ -176,7 +208,7 @@ public void testLocalOperationWithoutBlocks() throws ExecutionException, Interru

new Action(Settings.EMPTY, "internal:testAction", transportService, clusterService, threadPool) {
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
if (masterOperationFailure) {
listener.onFailure(exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
Expand All @@ -29,8 +31,11 @@
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;

import java.io.IOException;
import java.util.Optional;

import static org.elasticsearch.test.ESTestCase.copyWriteable;

public abstract class DisruptableMockTransport extends MockTransport {
private final Logger logger;

Expand Down Expand Up @@ -68,7 +73,6 @@ public String toString() {
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {

assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
super.onSendRequest(requestId, action, request, destination);

final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();
Expand Down Expand Up @@ -162,8 +166,15 @@ public String toString() {
}
};

final TransportRequest copiedRequest;
try {
copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
} catch (IOException e) {
throw new AssertionError("exception de/serializing request", e);
}

try {
requestHandler.processMessageReceived(request, transportChannel);
requestHandler.processMessageReceived(copiedRequest, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
Expand All @@ -181,6 +192,10 @@ public String toString() {
});
}

private NamedWriteableRegistry writeableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}

public enum ConnectionStatus {
CONNECTED,
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public void clear() {

@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.test.transport;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -91,7 +90,14 @@ public TransportService createTransportService(Settings settings, ThreadPool thr
public void handleResponse(final long requestId, final TransportResponse response) {
final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
if (transportResponseHandler != null) {
transportResponseHandler.handleResponse(response);
final TransportResponse deliveredResponse;
try (BytesStreamOutput output = new BytesStreamOutput()) {
response.writeTo(output);
deliveredResponse = transportResponseHandler.read(output.bytes().streamInput());
} catch (IOException | UnsupportedOperationException e) {
throw new AssertionError("failed to serialize/deserialize response " + response, e);
}
transportResponseHandler.handleResponse(deliveredResponse);
}
}

Expand Down Expand Up @@ -126,7 +132,7 @@ public void handleRemoteError(final long requestId, final Throwable t) {
output.writeException(t);
remoteException = new RemoteTransportException("remote failure", output.bytes().streamInput().readException());
} catch (IOException ioException) {
throw new ElasticsearchException("failed to serialize/deserialize supplied exception " + t, ioException);
throw new AssertionError("failed to serialize/deserialize supplied exception " + t, ioException);
}
}
this.handleError(requestId, remoteException);
Expand Down Expand Up @@ -181,7 +187,6 @@ public void close() {
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {

}

protected boolean nodeConnected(DiscoveryNode discoveryNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
Expand Down Expand Up @@ -191,6 +193,11 @@ private TransportRequestHandler<TransportRequest.Empty> requestHandlerCaptures(C

private TransportResponseHandler<TransportResponse> responseHandlerShouldNotBeCalled() {
return new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse read(StreamInput in) {
throw new AssertionError("should not be called");
}

@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");
Expand All @@ -210,6 +217,11 @@ public String executor() {

private TransportResponseHandler<TransportResponse> responseHandlerShouldBeCalledNormally(Runnable onCalled) {
return new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(TransportResponse response) {
onCalled.run();
Expand All @@ -229,6 +241,11 @@ public String executor() {

private TransportResponseHandler<TransportResponse> responseHandlerShouldBeCalledExceptionally(Consumer<TransportException> onCalled) {
return new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse read(StreamInput in) {
throw new AssertionError("should not be called");
}

@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");
Expand Down