Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Add the ability to fetch the latest successful shard snapshot #75627

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.elasticsearch.action.admin.cluster.snapshots.features.TransportSnapshottableFeaturesAction;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsAction;
import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction;
import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.get.shard.TransportGetShardSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction;
Expand Down Expand Up @@ -543,6 +545,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
actions.register(SnapshottableFeaturesAction.INSTANCE, TransportSnapshottableFeaturesAction.class);
actions.register(ResetFeatureStateAction.INSTANCE, TransportResetFeatureStateAction.class);
actions.register(GetShardSnapshotAction.INSTANCE, TransportGetShardSnapshotAction.class);

actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.snapshots.get.shard;

import org.elasticsearch.action.ActionType;

public class GetShardSnapshotAction extends ActionType<GetShardSnapshotResponse> {

public static final GetShardSnapshotAction INSTANCE = new GetShardSnapshotAction();
public static final String NAME = "internal:admin/snapshot/get_shard";

public GetShardSnapshotAction() {
super(NAME, GetShardSnapshotResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.snapshots.get.shard;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class GetShardSnapshotRequest extends MasterNodeRequest<GetShardSnapshotRequest> {
private static final String ALL_REPOSITORIES = "_all";

private final List<String> repositories;
private final ShardId shardId;

GetShardSnapshotRequest(List<String> repositories, ShardId shardId) {
assert repositories.isEmpty() == false;
assert repositories.stream().noneMatch(Objects::isNull);
assert repositories.size() == 1 || repositories.stream().noneMatch(repo -> repo.equals(ALL_REPOSITORIES));
this.repositories = Objects.requireNonNull(repositories);
this.shardId = Objects.requireNonNull(shardId);
}

public GetShardSnapshotRequest(StreamInput in) throws IOException {
super(in);
this.repositories = in.readStringList();
this.shardId = new ShardId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(repositories);
shardId.writeTo(out);
}

public static GetShardSnapshotRequest latestSnapshotInAllRepositories(ShardId shardId) {
return new GetShardSnapshotRequest(Collections.singletonList(ALL_REPOSITORIES), shardId);
}

public static GetShardSnapshotRequest latestSnapshotInRepositories(ShardId shardId, List<String> repositories) {
if (repositories.isEmpty()) {
throw new IllegalArgumentException("Expected at least 1 repository but got none");
}

if (repositories.stream().anyMatch(Objects::isNull)) {
throw new NullPointerException("null values are not allowed in the repository list");
}
return new GetShardSnapshotRequest(repositories, shardId);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

if (repositories.size() == 0) {
validationException = addValidationError("repositories are missing", validationException);
}

return validationException;
}

public boolean getFromAllRepositories() {
return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0));
}

public boolean isSingleRepositoryRequest() {
return repositories.size() == 1 && ALL_REPOSITORIES.equalsIgnoreCase(repositories.get(0)) == false;
}

public ShardId getShardId() {
return shardId;
}

public List<String> getRepositories() {
return repositories;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetShardSnapshotRequest request = (GetShardSnapshotRequest) o;
return Objects.equals(repositories, request.repositories) && Objects.equals(shardId, request.shardId);
}

@Override
public int hashCode() {
return Objects.hash(repositories, shardId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.snapshots.get.shard;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardSnapshotInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

public class GetShardSnapshotResponse extends ActionResponse {
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap());

private final Map<String, ShardSnapshotInfo> repositoryShardSnapshots;
private final Map<String, RepositoryException> repositoryFailures;

GetShardSnapshotResponse(Map<String, ShardSnapshotInfo> repositoryShardSnapshots, Map<String, RepositoryException> repositoryFailures) {
this.repositoryShardSnapshots = repositoryShardSnapshots;
this.repositoryFailures = repositoryFailures;
}

GetShardSnapshotResponse(StreamInput in) throws IOException {
super(in);
this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new);
this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o));
out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o));
}

public Optional<ShardSnapshotInfo> getIndexShardSnapshotInfoForRepository(String repositoryName) {
return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName));
}

public Optional<RepositoryException> getFailureForRepository(String repository) {
return Optional.ofNullable(repositoryFailures.get(repository));
}

public Map<String, ShardSnapshotInfo> getRepositoryShardSnapshots() {
return repositoryShardSnapshots;
}

public Map<String, RepositoryException> getRepositoryFailures() {
return repositoryFailures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.snapshots.get.shard;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexSnapshotsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardSnapshotInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;

public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<GetShardSnapshotRequest, GetShardSnapshotResponse> {

private final IndexSnapshotsService indexSnapshotsService;

@Inject
public TransportGetShardSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetShardSnapshotAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetShardSnapshotRequest::new,
indexNameExpressionResolver,
GetShardSnapshotResponse::new,
ThreadPool.Names.SAME
);
this.indexSnapshotsService = new IndexSnapshotsService(repositoriesService);
}

@Override
protected void masterOperation(GetShardSnapshotRequest request, ClusterState state, ActionListener<GetShardSnapshotResponse> listener)
throws Exception {
final Set<String> repositories = getRequestedRepositories(request, state);
final ShardId shardId = request.getShardId();

if (repositories.isEmpty()) {
listener.onResponse(GetShardSnapshotResponse.EMPTY);
return;
}

GroupedActionListener<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> groupedActionListener = new GroupedActionListener<>(
listener.map(this::transformToResponse),
repositories.size()
);

BlockingQueue<String> repositoriesQueue = new LinkedBlockingQueue<>(repositories);
getShardSnapshots(repositoriesQueue, shardId, new ActionListener<Optional<ShardSnapshotInfo>>() {
@Override
public void onResponse(Optional<ShardSnapshotInfo> shardSnapshotInfo) {
groupedActionListener.onResponse(Tuple.tuple(shardSnapshotInfo, null));
}

@Override
public void onFailure(Exception err) {
if (request.isSingleRepositoryRequest() == false && err instanceof RepositoryException) {
groupedActionListener.onResponse(Tuple.tuple(Optional.empty(), (RepositoryException) err));
} else {
groupedActionListener.onFailure(err);
}
}
});
}

private void getShardSnapshots(
BlockingQueue<String> repositories,
ShardId shardId,
ActionListener<Optional<ShardSnapshotInfo>> listener
) {
final String repository = repositories.poll();
if (repository == null) {
return;
}

indexSnapshotsService.getLatestSuccessfulSnapshotForShard(
repository,
shardId,
ActionListener.runAfter(listener, () -> getShardSnapshots(repositories, shardId, listener))
);
}

private GetShardSnapshotResponse transformToResponse(
Collection<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> shardSnapshots
) {
final Map<String, ShardSnapshotInfo> repositoryShardSnapshot = shardSnapshots.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity()));

final Map<String, RepositoryException> failures = shardSnapshots.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.collect(Collectors.toMap(RepositoryException::repository, Function.identity()));

return new GetShardSnapshotResponse(repositoryShardSnapshot, failures);
}

private Set<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) {
RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
if (request.getFromAllRepositories()) {
return repositories.repositories().stream().map(RepositoryMetadata::name).collect(Collectors.toSet());
}

return request.getRepositories().stream().filter(Objects::nonNull).collect(Collectors.toSet());
}

@Override
protected ClusterBlockException checkBlock(GetShardSnapshotRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,23 @@ public String getIndexMetaBlobId(String metaIdentifier) {
* @return blob id for the given index metadata
*/
public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) {
final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId);
final String identifier = snapshotIndexMetadataIdentifier(snapshotId, indexId);
if (identifier == null) {
return snapshotId.getUUID();
} else {
return identifiers.get(identifier);
}
}

/**
* Gets the {@link org.elasticsearch.cluster.metadata.IndexMetadata} identifier for the given snapshot
* if the snapshot contains the referenced index, otherwise it returns {@code null}.
*/
@Nullable
public String snapshotIndexMetadataIdentifier(SnapshotId snapshotId, IndexId indexId) {
return lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId);
}

/**
* Create a new instance with the given snapshot and index metadata uuids and identifiers added.
*
Expand Down
Loading