Skip to content

Commit

Permalink
Uses TransportMasterNodeAction to update shard snapshot status (#27165)
Browse files Browse the repository at this point in the history
Currently, we are using a plain TransportRequestHandler to post snapshot
status messages to the master. However, it doesn't have a robust retry
mechanism as TransportMasterNodeAction. This change migrates from
TransportRequestHandler to TransportMasterNodeAction for the new
versions and keeps the current implementation for the old versions.

Closes #27151
  • Loading branch information
dnhatn committed Nov 17, 2017
1 parent 5a0a371 commit 1159446
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -85,9 +95,11 @@
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
* starting and stopping shard level snapshots
*/
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener {
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot";
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";

public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot";

private final ClusterService clusterService;

Expand All @@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap();

private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private UpdateSnapshotStatusAction updateSnapshotStatusHandler;

@Inject
public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool,
TransportService transportService, IndicesService indicesService) {
TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings);
this.indicesService = indicesService;
this.snapshotsService = snapshotsService;
Expand All @@ -118,20 +132,27 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S
this.threadPool = threadPool;
if (DiscoveryNode.isDataNode(settings)) {
// this is only useful on the nodes that can hold data
// addLowPriorityApplier to make sure that Repository will be created before snapshot
clusterService.addLowPriorityApplier(this);
clusterService.addListener(this);
}

// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);

if (DiscoveryNode.isMasterNode(settings)) {
// This needs to run only on nodes that can become masters
transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler());
transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6());
}

}

@Override
protected void doStart() {

assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
if (DiscoveryNode.isMasterNode(settings)) {
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null;
}
}

@Override
Expand All @@ -151,11 +172,11 @@ protected void doStop() {

@Override
protected void doClose() {
clusterService.removeApplier(this);
clusterService.removeListener(this);
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
public void clusterChanged(ClusterChangedEvent event) {
try {
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
Expand Down Expand Up @@ -449,7 +470,7 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
/**
* Internal request that is used to send changes in snapshot status to master
*/
public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
private Snapshot snapshot;
private ShardId shardId;
private ShardSnapshotStatus status;
Expand All @@ -462,6 +483,13 @@ public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId,
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
// By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck.
this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
Expand Down Expand Up @@ -502,11 +530,16 @@ public String toString() {
* Updates the shard status
*/
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
try {
transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
if (master.getVersion().onOrAfter(Version.V_6_1_0)) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
} else {
UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status);
transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME);
}
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
}
}

Expand All @@ -515,15 +548,24 @@ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, S
*
* @param request update shard status request
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) {
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
(source, e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]",
request.snapshot(), request.shardId(), request.status()), e));
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
}
});
}

class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
Expand Down Expand Up @@ -578,13 +620,107 @@ public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest> execute(Cluster
}
}

static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {

}

class UpdateSnapshotStatusAction extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected UpdateIndexShardSnapshotStatusResponse newResponse() {
return new UpdateIndexShardSnapshotStatusResponse();
}

@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
innerUpdateSnapshotState(request, listener);
}

@Override
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
return null;
}
}

/**
* A BWC version of {@link UpdateIndexShardSnapshotStatusRequest}
*/
static class UpdateSnapshotStatusRequestV6 extends TransportRequest {
private Snapshot snapshot;
private ShardId shardId;
private ShardSnapshotStatus status;

UpdateSnapshotStatusRequestV6() {

}

UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshot = new Snapshot(in);
shardId = ShardId.readShardId(in);
status = new ShardSnapshotStatus(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshot.writeTo(out);
shardId.writeTo(out);
status.writeTo(out);
}

Snapshot snapshot() {
return snapshot;
}

ShardId shardId() {
return shardId;
}

ShardSnapshotStatus status() {
return status;
}

@Override
public String toString() {
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
}
}

/**
* Transport request handler that is used to send changes in snapshot status to master
* A BWC version of {@link UpdateSnapshotStatusAction}
*/
class UpdateSnapshotStateRequestHandler implements TransportRequestHandler<UpdateIndexShardSnapshotStatusRequest> {
class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> {
@Override
public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception {
innerUpdateSnapshotState(request);
public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception {
final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status());
innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() {
@Override
public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) {

}

@Override
public void onFailure(Exception e) {
logger.warn("Failed to update snapshot status", e);
}
});
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.snapshots;

import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockRepository.Plugin.class, MockTransportService.TestPlugin.class);
}

public void testRetryPostingSnapshotStatusMessages() throws Exception {
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();

logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

final int shards = between(1, 10);
assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0)));
ensureGreen();
final int numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}

logger.info("--> blocking repository");
String blockedNode = blockNodeWithIndex("test-repo", "test-index");
dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(false)
.setIndices("test-index")
.get();
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));

final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")
.get().getSnapshots().get(0).snapshotId();

logger.info("--> start disrupting cluster");
final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode),
NetworkDisruption.NetworkDelay.random(random()));
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();

logger.info("--> unblocking repository");
unblockNode("test-repo", blockedNode);

// Retrieve snapshot status from the data node.
SnapshotShardsService snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, blockedNode);
assertBusy(() -> {
final Snapshot snapshot = new Snapshot("test-repo", snapshotId);
List<IndexShardSnapshotStatus.Stage> stages = snapshotShardsService.currentSnapshotShards(snapshot)
.values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList());
assertThat(stages, hasSize(shards));
assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE)));
});

logger.info("--> stop disrupting cluster");
networkDisruption.stopDisrupting();
internalCluster().clearDisruptionScheme(true);

assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
.prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), equalTo(shards));
}, 10, TimeUnit.SECONDS);
}
}
Loading

0 comments on commit 1159446

Please sign in to comment.