Skip to content

Commit

Permalink
[Transform] Make _reset action stop transforms without force first (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Jan 30, 2024
1 parent e259bcd commit 44d5035
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/104870.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 104870
summary: Make `_reset` action stop transforms without force first
area: Transform
type: bug
issues:
- 100596
- 104825
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public void cleanup() throws Exception {
cleanUp();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/104825")
@SuppressWarnings("unchecked")
public void testTransformFeatureReset() throws Exception {
String indexName = "basic-crud-reviews";
Expand Down Expand Up @@ -90,6 +89,10 @@ public void testTransformFeatureReset() throws Exception {
.build();

putTransform(continuousTransformId, Strings.toString(config), RequestOptions.DEFAULT);

// Sleep for a few seconds so that we cover transform being stopped at various stages.
Thread.sleep(randomLongBetween(0, 5_000));

startTransform(continuousTransformId, RequestOptions.DEFAULT);

assertOK(client().performRequest(new Request(HttpPost.METHOD_NAME, "/_features/_reset")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void cleanUpFeature(
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
}, unsetResetModeListener::onFailure);

ActionListener<StopTransformAction.Response> afterStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> {
ActionListener<StopTransformAction.Response> afterForceStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> {
if (stopTransformsResponse.isAcknowledged()
&& stopTransformsResponse.getTaskFailures().isEmpty()
&& stopTransformsResponse.getNodeFailures().isEmpty()) {
Expand Down Expand Up @@ -439,12 +439,31 @@ public void cleanUpFeature(
}
}, unsetResetModeListener::onFailure);

ActionListener<StopTransformAction.Response> afterStoppingTransforms = ActionListener.wrap(
afterForceStoppingTransforms::onResponse,
e -> {
logger.info("Error while trying to stop the transforms, will try again with force=true", e);
StopTransformAction.Request forceStopTransformsRequest = new StopTransformAction.Request(
Metadata.ALL,
true,
// Set force=true to make sure all the transforms persistent tasks are stopped.
true,
null,
true,
false
);
client.execute(StopTransformAction.INSTANCE, forceStopTransformsRequest, afterForceStoppingTransforms);
}
);

ActionListener<AcknowledgedResponse> afterResetModeSet = ActionListener.wrap(response -> {
StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request(
Metadata.ALL,
true,
true,
null,
// Set force=false in order to let transforms finish gracefully.
false,
// Do not give it too much time. If there is a problem, there will be another try with force=true.
TimeValue.timeValueSeconds(10),
true,
false
);
Expand Down

0 comments on commit 44d5035

Please sign in to comment.