Skip to content

Commit

Permalink
Fix listener
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Aug 6, 2024
1 parent f72702f commit 8e85231
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCentralizedCreateAndRestoreShallowCopy() throws Exception {
public void testCreateShallowCopyV2() throws Exception {

Settings snapshotSettings = Settings.builder().put("snapshot.shallow_snapshot_v2", true).build();
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(snapshotSettings).build());
Expand Down
31 changes: 13 additions & 18 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,10 @@ public boolean isShallowSnapV2() {
* @param listener snapshot completion listener
*/
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
startCreateSnapshot(
request,
ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure)
);
startCreateSnapshot(request, listener);
}

public void startCreateSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
public void startCreateSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
Repository repository = repositoriesService.repository(request.repository());
boolean remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings());
logger.debug("remote_store_index_shallow_copy setting is set as [{}]", remoteStoreIndexShallowCopy);
Expand All @@ -286,7 +283,10 @@ public void startCreateSnapshot(final CreateSnapshotRequest request, final Actio
if (remoteStoreIndexShallowCopy && isShallowSnapV2 && request.indices().length == 0) {
createShallowSnapshotV2(request, listener);
} else {
createSnapshot(request, listener);
createSnapshot(
request,
ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure)
);
}
}

Expand Down Expand Up @@ -440,7 +440,7 @@ public TimeValue timeout() {
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}

public void createShallowSnapshotV2(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
public void createShallowSnapshotV2(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
long pinnedTimestamp = System.currentTimeMillis();
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
Expand Down Expand Up @@ -530,15 +530,10 @@ public void createShallowSnapshotV2(final CreateSnapshotRequest request, final A
throw new SnapshotException(repositoryName, snapshotName, "Aborting Snapshot, no longer cluster manager");
}
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(

repoData -> {
completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(repoData, snapshotInfo));
listener.onResponse(snapshot);
},

e -> failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, e.toString()))
);
pinnedTimestampListener.whenComplete(repoData -> {
// completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(repoData, snapshotInfo));
listener.onResponse(snapshotInfo);
}, listener::onFailure);

repository.finalizeSnapshot(
shardGenerations,
Expand Down Expand Up @@ -566,7 +561,7 @@ public void onResponse(RepositoryData repositoryData) {
@Override
public void onFailure(Exception e) {
logger.error("Failed to upload files to snapshot repo {} for snapshot {} ", repositoryName, snapshotName);
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, e.toString()));
// failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, e.toString()));
listener.onFailure(e);
}
}
Expand All @@ -576,7 +571,7 @@ public void onFailure(Exception e) {
} catch (Exception e) {
assert false : new AssertionError(e);
logger.error("Snapshot {} creation failed with exception {}", snapshot.getSnapshotId().getName(), e);
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, e.toString()));
// failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, e.toString()));
listener.onFailure(e);
}
}
Expand Down

0 comments on commit 8e85231

Please sign in to comment.