diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsRequest.java new file mode 100644 index 0000000000000..83cf60eeb22bc --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsRequest.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client.transform; + +import org.elasticsearch.client.Validatable; + +import java.util.Objects; + +public class UpgradeTransformsRequest implements Validatable { + + private Boolean dryRun; + + public UpgradeTransformsRequest() {} + + public Boolean isDryRun() { + return dryRun; + } + + /** + * Whether to only check for an upgrade without taking action + * + * @param dryRun {@code true} will only check for upgrades + */ + public void setDryRun(boolean dryRun) { + this.dryRun = dryRun; + } + + @Override + public int hashCode() { + return Objects.hash(dryRun); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpgradeTransformsRequest other = (UpgradeTransformsRequest) obj; + return Objects.equals(dryRun, other.dryRun); + } + +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsResponse.java new file mode 100644 index 0000000000000..f1c7095bd7599 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/UpgradeTransformsResponse.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client.transform; + +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.Objects; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class UpgradeTransformsResponse { + + public static final ParseField NO_ACTION = new ParseField("no_action"); + public static final ParseField UPDATED = new ParseField("updated"); + public static final ParseField NEEDS_UPDATE = new ParseField("needs_update"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "upgrade_transform", + true, + args -> { + long updated = args[0] == null ? 0L : (Long) args[0]; + long noAction = args[1] == null ? 0L : (Long) args[1]; + long needsUpdate = args[2] == null ? 0L : (Long) args[2]; + + return new UpgradeTransformsResponse(updated, noAction, needsUpdate); + } + ); + + static { + PARSER.declareLong(optionalConstructorArg(), UPDATED); + PARSER.declareLong(optionalConstructorArg(), NO_ACTION); + PARSER.declareLong(optionalConstructorArg(), NEEDS_UPDATE); + } + + public static UpgradeTransformsResponse fromXContent(final XContentParser parser) { + return UpgradeTransformsResponse.PARSER.apply(parser, null); + } + + private final long updated; + private final long noAction; + private final long needsUpdate; + + public UpgradeTransformsResponse(long updated, long noAction, long needsUpdate) { + this.updated = updated; + this.noAction = noAction; + this.needsUpdate = needsUpdate; + } + + @Override + public int hashCode() { + return Objects.hash(updated, noAction, needsUpdate); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final UpgradeTransformsResponse that = (UpgradeTransformsResponse) other; + return this.updated == that.updated && this.noAction == that.noAction && this.needsUpdate == that.needsUpdate; + } + + public long getUpdated() { + return updated; + } + + public long getNoAction() { + return noAction; + } + + public long getNeedsUpdate() { + return needsUpdate; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/UpgradeTransformsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/UpgradeTransformsResponseTests.java new file mode 100644 index 0000000000000..11a4b2c77fa31 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/UpgradeTransformsResponseTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client.transform; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class UpgradeTransformsResponseTests extends ESTestCase { + + public void testXContentParser() throws IOException { + xContentTester( + this::createParser, + UpgradeTransformsResponseTests::createTestInstance, + UpgradeTransformsResponseTests::toXContent, + UpgradeTransformsResponse::fromXContent + ).assertToXContentEquivalence(false).supportsUnknownFields(false).test(); + } + + private static UpgradeTransformsResponse createTestInstance() { + return new UpgradeTransformsResponse(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + + private static void toXContent(UpgradeTransformsResponse response, XContentBuilder builder) throws IOException { + builder.startObject(); + if (response.getUpdated() != 0) { + builder.field("updated", response.getUpdated()); + } + if (response.getNoAction() != 0) { + builder.field("no_action", response.getNoAction()); + } + if (response.getNeedsUpdate() != 0) { + builder.field("needs_update", response.getNeedsUpdate()); + } + builder.endObject(); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + List namedXContents = searchModule.getNamedXContents(); + namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers()); + + return new NamedXContentRegistry(namedXContents); + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/hlrc/UpgradeTransformsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/hlrc/UpgradeTransformsResponseTests.java new file mode 100644 index 0000000000000..895a7e6304c09 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/hlrc/UpgradeTransformsResponseTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client.transform.hlrc; + +import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.client.transform.UpgradeTransformsResponse; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response; + +import java.io.IOException; + +public class UpgradeTransformsResponseTests extends AbstractResponseTestCase< + Response, + org.elasticsearch.client.transform.UpgradeTransformsResponse> { + + public static Response randomUpgradeResponse() { + return new Response(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + + @Override + protected Response createServerTestInstance(XContentType xContentType) { + return randomUpgradeResponse(); + } + + @Override + protected UpgradeTransformsResponse doParseToClientInstance(XContentParser parser) throws IOException { + return org.elasticsearch.client.transform.UpgradeTransformsResponse.fromXContent(parser); + } + + @Override + protected void assertInstances(Response serverTestInstance, UpgradeTransformsResponse clientInstance) { + assertEquals(serverTestInstance.getNeedsUpdate(), clientInstance.getNeedsUpdate()); + assertEquals(serverTestInstance.getNoAction(), clientInstance.getNoAction()); + assertEquals(serverTestInstance.getUpdated(), clientInstance.getUpdated()); + } + +} diff --git a/docs/reference/transform/apis/upgrade-transforms.asciidoc b/docs/reference/transform/apis/upgrade-transforms.asciidoc new file mode 100644 index 0000000000000..7c95a31e63243 --- /dev/null +++ b/docs/reference/transform/apis/upgrade-transforms.asciidoc @@ -0,0 +1,57 @@ +[role="xpack"] +[testenv="basic"] +[[upgrade-transforms]] += Upgrade {transform} API + +[subs="attributes"] +++++ +Upgrade {transform} +++++ + +Upgrades all {transform}s. + +[[upgrade-transforms-request]] +== {api-request-title} + +`POST _transform/_upgrade` + +[[upgrade-transforms-prereqs]] +== {api-prereq-title} + +Requires the following privileges: + +* cluster: `manage_transform` (the `transform_admin` built-in role grants this + privilege) +* source indices: `read`, `view_index_metadata` +* destination index: `read`, `index`. + + +[[upgrade-transforms-desc]] +== {api-description-title} + +This API upgrades all existing {transform}s. + +[[upgrade-transforms-query-parms]] +== {api-query-parms-title} + +`dry_run`:: + (Optional, Boolean) When `true`, only checks for updates but does not execute them. + +[[upgrade-transforms-example]] +== {api-examples-title} + +[source,console] +-------------------------------------------------- +POST _transform/_upgrade +-------------------------------------------------- +// TEST[setup:simple_kibana_continuous_pivot] + +When all {transform}s are upgraded, you receive a summary: + +[source,console-result] +---- +{ + "no_action": 1 +} +---- +// TESTRESPONSE[s/"no_action" : 1/"no_action" : $body.no_action/] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/transform.upgrade_transforms.json b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.upgrade_transforms.json new file mode 100644 index 0000000000000..116418abdf1b4 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.upgrade_transforms.json @@ -0,0 +1,31 @@ +{ + "transform.upgrade_transforms":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/upgrade-transforms.html", + "description":"Upgrades all transforms." + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept":["application/json"], + "content_type":["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_transform/_upgrade", + "methods":[ + "POST" + ] + } + ] + }, + "params":{ + "dry_run":{ + "type":"boolean", + "required":false, + "description":"Whether to only check for updates but don't execute" + } + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index 57580486f2b2e..c1ca4a534d4cf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -44,6 +44,7 @@ public final class TransformField { public static final ParseField DELAY = new ParseField("delay"); // TODO: Rename to "defer_data_validation" or similar to emphasize that not all validation is deferred public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation"); + public static final ParseField DRY_RUN = new ParseField("dry_run"); public static final ParseField RETENTION_POLICY = new ParseField("retention_policy"); public static final ParseField MAX_AGE = new ParseField("max_age"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index b14619bebf7dc..8273270ef457f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -55,6 +55,9 @@ public class TransformMessages { public static final String TRANSFORM_FAILED_TO_CREATE_COMPOSITE_AGGREGATION = "Failed to create composite aggregation from {0} function"; public static final String TRANSFORM_CONFIGURATION_INVALID = "Transform configuration [{0}] has invalid elements: [{1}]"; + public static final String TRANSFORM_CONFIGURATION_DEPRECATED = "Transform configuration is at version [{0}]. Use [{1}] or [" + + TransformField.REST_BASE_PATH_TRANSFORMS + + "_upgrade] to update."; public static final String UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]"; public static final String TRANSFORM_UPDATE_CANNOT_CHANGE_SYNC_METHOD = "Cannot change the current sync configuration of transform [{0}] from [{1}] to [{2}]"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsAction.java new file mode 100644 index 0000000000000..bff0d34dbdcae --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsAction.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class UpgradeTransformsAction extends ActionType { + + public static final UpgradeTransformsAction INSTANCE = new UpgradeTransformsAction(); + public static final String NAME = "cluster:admin/transform/upgrade"; + + private UpgradeTransformsAction() { + super(NAME, UpgradeTransformsAction.Response::new); + } + + public static class Request extends MasterNodeRequest { + + private final boolean dryRun; + + public Request(StreamInput in) throws IOException { + super(in); + this.dryRun = in.readBoolean(); + } + + public Request(boolean dryRun) { + super(); + this.dryRun = dryRun; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public boolean isDryRun() { + return dryRun; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(dryRun); + } + + @Override + public int hashCode() { + return Objects.hash(dryRun); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return this.dryRun == other.dryRun; + } + } + + public static class Response extends ActionResponse implements Writeable, ToXContentObject { + + private final long updated; + private final long noAction; + private final long needsUpdate; + + public Response(StreamInput in) throws IOException { + updated = in.readVLong(); + noAction = in.readVLong(); + needsUpdate = in.readVLong(); + } + + public Response(long updated, long noAction, long needsUpdate) { + if (updated < 0 || noAction < 0 || needsUpdate < 0) { + throw new IllegalArgumentException("response counters must be > 0"); + } + + this.updated = updated; + this.noAction = noAction; + this.needsUpdate = needsUpdate; + } + + public long getUpdated() { + return updated; + } + + public long getNoAction() { + return noAction; + } + + public long getNeedsUpdate() { + return needsUpdate; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(updated); + out.writeVLong(noAction); + out.writeVLong(needsUpdate); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (updated != 0L) { + builder.field("updated", updated); + } + if (noAction != 0L) { + builder.field("no_action", noAction); + } + if (needsUpdate != 0L) { + builder.field("needs_update", needsUpdate); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + return this.updated == other.updated + && this.noAction == other.noAction + && this.needsUpdate == other.needsUpdate; + } + + @Override + public int hashCode() { + return Objects.hash(updated, noAction, needsUpdate); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 26def45ff68ca..3ae3d768e4faa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -52,6 +52,7 @@ */ public class TransformConfig extends AbstractDiffable implements Writeable, ToXContentObject { + public static final Version CONFIG_VERSION_LAST_CHANGED = Version.V_7_15_0; public static final String NAME = "data_frame_transform_config"; public static final ParseField HEADERS = new ParseField("headers"); /** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */ @@ -556,7 +557,7 @@ public static TransformConfig rewriteForUpdate(final TransformConfig transformCo // quick check if a rewrite is required, if none found just return the original // a failing quick check, does not mean a rewrite is necessary if (transformConfig.getVersion() != null - && transformConfig.getVersion().onOrAfter(Version.V_7_15_0) + && transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_CHANGED) && (transformConfig.getPivotConfig() == null || transformConfig.getPivotConfig().getMaxPageSearchSize() == null)) { return transformConfig; } @@ -605,7 +606,7 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { } // 3. set align_checkpoints to false for transforms < 7.15 to keep BWC - if (builder.getVersion() != null && builder.getVersion().before(Version.V_7_15_0)) { + if (builder.getVersion() != null && builder.getVersion().before(CONFIG_VERSION_LAST_CHANGED)) { builder.setSettings( new SettingsConfig( builder.getSettings().getMaxPageSearchSize(), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java index b163116ae0016..55e1c38c8ee0a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java @@ -9,13 +9,13 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; @@ -34,6 +34,8 @@ public class TransformConfigUpdate implements Writeable { public static final String NAME = "data_frame_transform_config_update"; + public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, false, @@ -207,7 +209,11 @@ public static TransformConfigUpdate fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - public boolean isNoop(TransformConfig config) { + public boolean isEmpty() { + return this.equals(EMPTY); + } + + boolean isNoop(TransformConfig config) { return isNullOrEqual(source, config.getSource()) && isNullOrEqual(dest, config.getDestination()) && isNullOrEqual(frequency, config.getFrequency()) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index 862f10e997e59..64086fc47e78d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -34,10 +34,11 @@ public final class TransformInternalIndexConstants { public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_13_0; public static final String INDEX_VERSION = "007"; public static final String INDEX_PATTERN = TRANSFORM_PREFIX + "internal-"; + public static final String INDEX_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*"; - public static final String INDEX_NAME_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "internal-*"; + public static final String INDEX_NAME_PATTERN_DEPRECATED = INDEX_PATTERN_DEPRECATED + "*"; // audit index // gh #49730: upped version of audit index to 000002 diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionRequestTests.java new file mode 100644 index 0000000000000..a4cd9b3212890 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionRequestTests.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Request; + +public class UpgradeTransformsActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomBoolean()); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionResponseTests.java new file mode 100644 index 0000000000000..a92ce693ec11c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpgradeTransformsActionResponseTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response; + +public class UpgradeTransformsActionResponseTests extends AbstractWireSerializingTransformTestCase { + + public static Response randomUpgradeResponse() { + return new Response( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + @Override + protected Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + return randomUpgradeResponse(); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 568d36c7d9d7e..c7f30a9b966f5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -11,8 +11,8 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.action.AbstractWireSerializingTransformTestCase; @@ -56,7 +56,7 @@ protected Reader instanceReader() { public void testIsNoop() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { TransformConfig config = randomTransformConfig(); - TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null); + TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertTrue("null update is not noop", update.isNoop(config)); update = new TransformConfigUpdate( config.getSource(), diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 6f7754a65a9a2..159dd9808f106 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -94,6 +94,7 @@ public class Constants { "cluster:admin/transform/start", "cluster:admin/transform/stop", "cluster:admin/transform/update", + "cluster:admin/transform/upgrade", "cluster:admin/transform/validate", // "cluster:admin/voting_config/add_exclusions", // "cluster:admin/voting_config/clear_exclusions", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_upgrade.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_upgrade.yml new file mode 100644 index 0000000000000..3797b42e09a43 --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_upgrade.yml @@ -0,0 +1,50 @@ +setup: + - do: + indices.create: + index: airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + + - do: + transform.put_transform: + transform_id: "upgrading-airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "description": "yaml test transform on airline-data", + "frequency": "60s", + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } +--- +"Test upgrade transform": + - do: + transform.upgrade_transforms: + dry_run: false + # upgrade does not do anything on a fresh install, so we can only test that nothing breaks + - match: { no_action: 1 } +--- +"Test upgrade transform dry run": + - do: + transform.upgrade_transforms: + dry_run: true + # upgrade does not do anything on a fresh install, so we can only test that nothing breaks + - match: { no_action: 1 } diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java index 0581139437def..a0f87ffbaa317 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -33,8 +34,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -234,7 +238,9 @@ public void testExpandIds() throws Exception { 3L, tuple( Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"), - Arrays.asList(transformConfig1, transformConfig2, transformConfig3))), + Arrays.asList(transformConfig1, transformConfig2, transformConfig3) + ) + ), null, null ); @@ -246,7 +252,9 @@ public void testExpandIds() throws Exception { 3L, tuple( Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"), - Arrays.asList(transformConfig1, transformConfig2, transformConfig3))), + Arrays.asList(transformConfig1, transformConfig2, transformConfig3) + ) + ), null, null ); @@ -292,6 +300,126 @@ public void testExpandIds() throws Exception { } ); + // add a duplicate in an old index + String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001"; + String docId = TransformConfig.documentId(transformConfig2.getId()); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformConfig2.getId()); + client().admin() + .indices() + .create(new CreateIndexRequest(oldIndex).mapping(mappings()).origin(ClientHelper.TRANSFORM_ORIGIN)) + .actionGet(); + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(request).actionGet(); + } + + // check that transformConfig2 gets returned, not the one from the old index or both + assertAsync( + listener -> transformConfigManager.expandTransformIds( + "transform1_expand,transform2_expand", + PageParams.defaultParams(), + true, + listener + ), + tuple(2L, tuple(Arrays.asList("transform1_expand", "transform2_expand"), Arrays.asList(transformConfig1, transformConfig2))), + null, + null + ); + + } + + public void testGetAllTransformIdsAndGetAllOutdatedTransformIds() throws Exception { + long numberOfTransformsToGenerate = 100L; + Set transformIds = new HashSet<>(); + + for (long i = 0; i < numberOfTransformsToGenerate; ++i) { + String id = "transform_" + i; + transformIds.add(id); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(id); + assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null); + } + assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null); + + // test recursive retrieval + assertAsync( + listener -> transformConfigManager.expandAllTransformIds(false, 10, listener), + tuple(Long.valueOf(numberOfTransformsToGenerate), transformIds), + null, + null + ); + + assertAsync( + listener -> transformConfigManager.getAllOutdatedTransformIds(listener), + tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.emptySet()), + null, + null + ); + + assertAsync( + listener -> transformConfigManager.expandAllTransformIds(true, 10, listener), + tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.emptySet()), + null, + null + ); + + // add a duplicate in an old index + String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001"; + String transformId = "transform_42"; + String docId = TransformConfig.documentId(transformId); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + client().admin() + .indices() + .create(new CreateIndexRequest(oldIndex).mapping(mappings()).origin(ClientHelper.TRANSFORM_ORIGIN)) + .actionGet(); + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(request).actionGet(); + } + + assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null); + assertAsync( + listener -> transformConfigManager.getAllOutdatedTransformIds(listener), + tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.emptySet()), + null, + null + ); + + // add another old one, but not with an existing id + transformId = "transform_oldindex"; + docId = TransformConfig.documentId(transformId); + transformConfig = TransformConfigTests.randomTransformConfig(transformId); + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(request).actionGet(); + } + + transformIds.add(transformId); + assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null); + assertAsync( + listener -> transformConfigManager.getAllOutdatedTransformIds(listener), + tuple(Long.valueOf(numberOfTransformsToGenerate + 1), Collections.singleton(transformId)), + null, + null + ); + + assertAsync( + listener -> transformConfigManager.expandAllTransformIds(true, 10, listener), + tuple(Long.valueOf(numberOfTransformsToGenerate + 1), Collections.singleton(transformId)), + null, + null + ); } public void testStoredDoc() throws InterruptedException { @@ -302,7 +430,7 @@ public void testStoredDoc() throws InterruptedException { assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null); assertAsync( - listener -> transformConfigManager.getTransformStoredDoc(transformId, listener), + listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener), tuple(storedDocs, firstIndex), null, null @@ -317,7 +445,7 @@ public void testStoredDoc() throws InterruptedException { null ); assertAsync( - listener -> transformConfigManager.getTransformStoredDoc(transformId, listener), + listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener), tuple(updated, secondIndex), null, null @@ -356,7 +484,7 @@ public void testGetStoredDocMultiple() throws InterruptedException { } public void testDeleteOldTransformConfigurations() throws Exception { - String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "1"; + String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001"; String transformId = "transform_test_delete_old_configurations"; String docId = TransformConfig.documentId(transformId); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig("transform_test_delete_old_configurations"); @@ -392,7 +520,7 @@ public void testDeleteOldTransformConfigurations() throws Exception { } public void testDeleteOldTransformStoredDocuments() throws Exception { - String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "1"; + String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001"; String transformId = "transform_test_delete_old_stored_documents"; String docId = TransformStoredDoc.documentId(transformId); TransformStoredDoc transformStoredDoc = TransformStoredDocTests.randomTransformStoredDoc(transformId); @@ -512,7 +640,7 @@ public void testDeleteOldCheckpoints() throws InterruptedException { // test that the other docs are still there assertAsync( - listener -> transformConfigManager.getTransformStoredDoc(transformId, listener), + listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener), tuple(storedDocs, firstIndex), null, null @@ -524,6 +652,60 @@ public void testDeleteOldCheckpoints() throws InterruptedException { null, null ); + } + + public void testDeleteOldIndices() throws Exception { + String oldIndex = (randomBoolean() + ? TransformInternalIndexConstants.INDEX_PATTERN + : TransformInternalIndexConstants.INDEX_PATTERN_DEPRECATED) + "001"; + String transformId = "transform_test_delete_old_indices"; + String docId = TransformConfig.documentId(transformId); + TransformConfig transformConfigOld = TransformConfigTests.randomTransformConfig(transformId); + TransformConfig transformConfigNew = TransformConfigTests.randomTransformConfig(transformId); + + // create config in old index + client().admin() + .indices() + .create(new CreateIndexRequest(oldIndex).mapping(mappings()).origin(ClientHelper.TRANSFORM_ORIGIN)) + .actionGet(); + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = transformConfigOld.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex).source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(request).actionGet(); + } + + // create config in new index + assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfigNew, listener), true, null, null); + + assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true)); + assertThat( + client().get(new GetRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), + is(true) + ); + + // the new/latest one should be returned + assertAsync(listener -> transformConfigManager.getTransformConfiguration(transformId, listener), transformConfigNew, null, null); + + // delete old indices + assertAsync(listener -> transformConfigManager.deleteOldIndices(listener), true, null, null); + + // the config should still be there + assertAsync(listener -> transformConfigManager.getTransformConfiguration(transformId, listener), transformConfigNew, null, null); + + // the old index should not exist anymore + expectThrows( + IndexNotFoundException.class, + () -> assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false)) + ); + // but the latest one should + assertThat( + client().get(new GetRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), + is(true) + ); } + } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 55cb7a1a6cdda..933894225cb9f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -32,8 +32,6 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.NamedXContentRegistry.Entry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.AssociatedIndexDescriptor; @@ -52,6 +50,8 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.NamedXContentRegistry.Entry; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; @@ -69,6 +69,7 @@ import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; import org.elasticsearch.xpack.core.transform.action.compat.DeleteTransformActionDeprecated; import org.elasticsearch.xpack.core.transform.action.compat.GetTransformActionDeprecated; @@ -88,6 +89,7 @@ import org.elasticsearch.xpack.transform.action.TransportStartTransformAction; import org.elasticsearch.xpack.transform.action.TransportStopTransformAction; import org.elasticsearch.xpack.transform.action.TransportUpdateTransformAction; +import org.elasticsearch.xpack.transform.action.TransportUpgradeTransformsAction; import org.elasticsearch.xpack.transform.action.TransportValidateTransformAction; import org.elasticsearch.xpack.transform.action.compat.TransportDeleteTransformActionDeprecated; import org.elasticsearch.xpack.transform.action.compat.TransportGetTransformActionDeprecated; @@ -111,6 +113,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestStartTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestStopTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction; +import org.elasticsearch.xpack.transform.rest.action.RestUpgradeTransformsAction; import org.elasticsearch.xpack.transform.rest.action.compat.RestDeleteTransformActionDeprecated; import org.elasticsearch.xpack.transform.rest.action.compat.RestGetTransformActionDeprecated; import org.elasticsearch.xpack.transform.rest.action.compat.RestGetTransformStatsActionDeprecated; @@ -190,6 +193,7 @@ public List getRestHandlers( new RestPreviewTransformAction(), new RestUpdateTransformAction(), new RestCatTransformAction(), + new RestUpgradeTransformsAction(), // deprecated endpoints, to be removed for 8.0.0 new RestPutTransformActionDeprecated(), @@ -217,6 +221,7 @@ public List getRestHandlers( new ActionHandler<>(UpdateTransformAction.INSTANCE, TransportUpdateTransformAction.class), new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class), new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class), + new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class), // deprecated actions, to be removed for 8.0.0 new ActionHandler<>(PutTransformActionDeprecated.INSTANCE, TransportPutTransformActionDeprecated.class), @@ -264,8 +269,13 @@ public Collection createComponents( ) { TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry); TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService); - TransformCheckpointService checkpointService = - new TransformCheckpointService(Clock.systemUTC(), settings, clusterService, configManager, auditor); + TransformCheckpointService checkpointService = new TransformCheckpointService( + Clock.systemUTC(), + settings, + clusterService, + configManager, + auditor + ); SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC()); transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler)); @@ -334,7 +344,8 @@ public Collection getSystemIndexDescriptors(Settings sett } } - @Override public Collection getAssociatedIndexDescriptors() { + @Override + public Collection getAssociatedIndexDescriptors() { return List.of(new AssociatedIndexDescriptor(AUDIT_INDEX_PATTERN, "Audit index")); } @@ -346,9 +357,10 @@ public void cleanUpFeature( ) { ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( - resetSuccess -> finalListener.onResponse(success), - resetFailure -> { + success -> client.execute( + SetResetModeAction.INSTANCE, + SetResetModeActionRequest.disabled(true), + ActionListener.wrap(resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure); finalListener.onFailure( new ElasticsearchStatusException( @@ -359,13 +371,11 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( - resetSuccess -> finalListener.onFailure(failure), - resetFailure -> { - logger.error( - TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset"), - resetFailure - ); + failure -> client.execute( + SetResetModeAction.INSTANCE, + SetResetModeActionRequest.disabled(false), + ActionListener.wrap(resetSuccess -> finalListener.onFailure(failure), resetFailure -> { + logger.error(TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset"), resetFailure); Exception ex = new ElasticsearchException( TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset") ); @@ -376,13 +386,10 @@ public void cleanUpFeature( ) ); - ActionListener afterWaitingForTasks = ActionListener.wrap( - listTasksResponse -> { - listTasksResponse.rethrowFailures("Waiting for transform indexing tasks"); - SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); - }, - unsetResetModeListener::onFailure - ); + ActionListener afterWaitingForTasks = ActionListener.wrap(listTasksResponse -> { + listTasksResponse.rethrowFailures("Waiting for transform indexing tasks"); + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); + }, unsetResetModeListener::onFailure); ActionListener afterStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> { if (stopTransformsResponse.isAcknowledged() @@ -393,20 +400,17 @@ public void cleanUpFeature( .prepareListTasks() .setActions(TransformField.TASK_NAME) .setWaitForCompletion(true) - .execute(ActionListener.wrap( - listTransformTasks -> { - listTransformTasks.rethrowFailures("Waiting for transform tasks"); - client.admin() - .cluster() - .prepareListTasks() - .setActions("indices:data/write/bulk") - .setDetailed(true) - .setWaitForCompletion(true) - .setDescriptions("*" + TRANSFORM_PREFIX + "*", "*" + TRANSFORM_PREFIX_DEPRECATED + "*") - .execute(afterWaitingForTasks); - }, - unsetResetModeListener::onFailure - )); + .execute(ActionListener.wrap(listTransformTasks -> { + listTransformTasks.rethrowFailures("Waiting for transform tasks"); + client.admin() + .cluster() + .prepareListTasks() + .setActions("indices:data/write/bulk") + .setDetailed(true) + .setWaitForCompletion(true) + .setDescriptions("*" + TRANSFORM_PREFIX + "*", "*" + TRANSFORM_PREFIX_DEPRECATED + "*") + .execute(afterWaitingForTasks); + }, unsetResetModeListener::onFailure)); } else { String errMsg = "Failed to reset Transform: " + (stopTransformsResponse.isAcknowledged() ? "" : "not acknowledged ") @@ -416,25 +420,23 @@ public void cleanUpFeature( + (stopTransformsResponse.getTaskFailures().isEmpty() ? "" : "task failures: " + stopTransformsResponse.getTaskFailures()); - unsetResetModeListener.onResponse(ResetFeatureStateResponse.ResetFeatureStateStatus.failure(this.getFeatureName(), - new ElasticsearchException(errMsg))); + unsetResetModeListener.onResponse( + ResetFeatureStateResponse.ResetFeatureStateStatus.failure(this.getFeatureName(), new ElasticsearchException(errMsg)) + ); } }, unsetResetModeListener::onFailure); - ActionListener afterResetModeSet = ActionListener.wrap( - response -> { - StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request( - Metadata.ALL, - true, - true, - null, - true, - false - ); - client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms); - }, - finalListener::onFailure - ); + ActionListener afterResetModeSet = ActionListener.wrap(response -> { + StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request( + Metadata.ALL, + true, + true, + null, + true, + false + ); + client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms); + }, finalListener::onFailure); client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java new file mode 100644 index 0000000000000..6252a0a5ac778 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java @@ -0,0 +1,336 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; +import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; +import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; +import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.persistence.TransformIndex; + +import java.time.Clock; +import java.util.Map; + +/** + * With {@link TransformUpdater} transforms can be updated or upgraded to the latest version + * + * This implementation is shared between _update and _upgrade + */ +public class TransformUpdater { + + private static final Logger logger = LogManager.getLogger(TransformUpdater.class); + + public static final class UpdateResult { + + // the status of the update + public enum Status { + NONE, // all checks passed, no action taken + UPDATED, // updated + NEEDS_UPDATE // special dry run status + } + + // the new config after the update + private final TransformConfig config; + + // the action taken for the upgrade + private final Status status; + + UpdateResult(final TransformConfig config, final Status status) { + this.config = config; + this.status = status; + } + + public Status getStatus() { + return status; + } + + public TransformConfig getConfig() { + return config; + } + } + + /** + * Update a single transform given a config and update + * + * In addition to applying update to the config, old versions of {@link TransformConfig}, {@link TransformStoredDoc} and + * {@link TransformCheckpoint} are rewritten into the latest format and written back using {@link TransformConfigManager} + * + * @param securityContext the security context + * @param indexNameExpressionResolver index name expression resolver + * @param clusterState the current cluster state + * @param settings settings + * @param client a client + * @param transformConfigManager the transform configuration manager + * @param config the old configuration to update + * @param update the update to apply to the configuration + * @param seqNoPrimaryTermAndIndex sequence id and primary term of the configuration + * @param deferValidation whether to defer some validation checks + * @param dryRun whether to actually write the configuration back or whether to just check for updates + * @param checkAccess whether to run access checks + * @param listener the listener called containing the result of the update + */ + + public static void updateTransform( + SecurityContext securityContext, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterState clusterState, + Settings settings, + Client client, + TransformConfigManager transformConfigManager, + final TransformConfig config, + final TransformConfigUpdate update, + final SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + final boolean deferValidation, + final boolean dryRun, + final boolean checkAccess, + ActionListener listener + ) { + // rewrite config into a new format if necessary + TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config); + TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig; + + // <5> Update checkpoints + ActionListener updateStateListener = ActionListener.wrap(lastCheckpoint -> { + // config was updated, but the transform has no state or checkpoint + if (lastCheckpoint == null || lastCheckpoint == -1) { + listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)); + return; + } + + updateTransformCheckpoint( + config.getId(), + lastCheckpoint, + transformConfigManager, + ActionListener.wrap( + r -> listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)), + listener::onFailure + ) + ); + }, listener::onFailure); + + // <4> Update State document + ActionListener updateTransformListener = ActionListener.wrap( + r -> updateTransformStateAndGetLastCheckpoint(config.getId(), transformConfigManager, updateStateListener), + listener::onFailure + ); + + // <3> Update the transform + ActionListener> validateTransformListener = ActionListener.wrap(destIndexMappings -> { + // If it is a noop or dry run don't write the doc + // skip when: + // - config is in the latest index + // - rewrite did not change the config + // - update is not making any changes + if (config.getVersion() != null + && config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED) + && updatedConfig.equals(config)) { + listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NONE)); + return; + } + + if (dryRun) { + listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NEEDS_UPDATE)); + return; + } + + updateTransformConfiguration( + client, + transformConfigManager, + indexNameExpressionResolver, + updatedConfig, + destIndexMappings, + seqNoPrimaryTermAndIndex, + clusterState, + ActionListener.wrap(r -> updateTransformListener.onResponse(null), listener::onFailure) + ); + }, listener::onFailure); + + // <2> Validate source and destination indices + ActionListener checkPrivilegesListener = ActionListener.wrap( + aVoid -> { validateTransform(updatedConfig, client, deferValidation, validateTransformListener); }, + listener::onFailure + ); + + // <1> Early check to verify that the user can create the destination index and can read from the source + if (checkAccess && XPackSettings.SECURITY_ENABLED.get(settings) && deferValidation == false) { + TransformPrivilegeChecker.checkPrivileges( + "update", + securityContext, + indexNameExpressionResolver, + clusterState, + client, + updatedConfig, + true, + checkPrivilegesListener + ); + } else { // No security enabled, just move on + checkPrivilegesListener.onResponse(null); + } + } + + private static void validateTransform( + TransformConfig config, + Client client, + boolean deferValidation, + ActionListener> listener + ) { + client.execute( + ValidateTransformAction.INSTANCE, + new ValidateTransformAction.Request(config, deferValidation), + ActionListener.wrap(response -> listener.onResponse(response.getDestIndexMappings()), listener::onFailure) + ); + } + + private static void updateTransformStateAndGetLastCheckpoint( + String transformId, + TransformConfigManager transformConfigManager, + ActionListener listener + ) { + transformConfigManager.getTransformStoredDoc(transformId, true, ActionListener.wrap(currentState -> { + if (currentState == null) { + // no state found + listener.onResponse(-1L); + return; + } + + long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint(); + + if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) { + listener.onResponse(lastCheckpoint); + return; + } + + transformConfigManager.putOrUpdateTransformStoredDoc( + currentState.v1(), + currentState.v2(), + ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> { + if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) { + // if a version conflict occurs a new state has been written between us reading and writing. + // this is a benign case, as it means the transform is running and the latest state has been written by it + logger.trace("[{}] could not update transform state during update due to running transform", transformId); + listener.onResponse(lastCheckpoint); + } else { + logger.warn("[{}] failed to persist transform state during update.", transformId); + listener.onFailure(e); + } + }) + ); + }, listener::onFailure)); + } + + private static void updateTransformCheckpoint( + String transformId, + long lastCheckpoint, + TransformConfigManager transformConfigManager, + ActionListener listener + ) { + transformConfigManager.getTransformCheckpointForUpdate(transformId, lastCheckpoint, ActionListener.wrap(checkpointAndVersion -> { + if (checkpointAndVersion == null + || checkpointAndVersion.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) { + listener.onResponse(true); + return; + } + + transformConfigManager.putTransformCheckpoint(checkpointAndVersion.v1(), listener); + }, listener::onFailure)); + } + + private static void updateTransformConfiguration( + Client client, + TransformConfigManager transformConfigManager, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformConfig config, + Map mappings, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + ClusterState clusterState, + ActionListener listener + ) { + // <3> Return to the listener + ActionListener putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> { + transformConfigManager.deleteOldTransformConfigurations(config.getId(), ActionListener.wrap(r -> { + logger.trace("[{}] successfully deleted old transform configurations", config.getId()); + listener.onResponse(null); + }, e -> { + logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", config.getId()), e); + listener.onResponse(null); + })); + }, + // If we failed to INDEX AND we created the destination index, the destination index will still be around + // This is a similar behavior to _start + listener::onFailure + ); + + // <2> Update our transform + ActionListener createDestinationListener = ActionListener.wrap( + createDestResponse -> transformConfigManager.updateTransformConfiguration( + config, + seqNoPrimaryTermAndIndex, + putTransformConfigurationListener + ), + listener::onFailure + ); + + // <1> Create destination index if necessary + String[] dest = indexNameExpressionResolver.concreteIndexNames( + clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination().getIndex() + ); + String[] src = indexNameExpressionResolver.concreteIndexNames( + clusterState, + IndicesOptions.lenientExpandOpen(), + true, + config.getSource().getIndex() + ); + // If we are running, we should verify that the destination index exists and create it if it does not + if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, config.getId()) != null && dest.length == 0 + // Verify we have source indices. The user could defer_validations and if the task is already running + // we allow source indices to disappear. If the source and destination indices do not exist, don't do anything + // the transform will just have to dynamically create the destination index without special mapping. + && src.length > 0) { + createDestinationIndex(client, config, mappings, createDestinationListener); + } else { + createDestinationListener.onResponse(null); + } + } + + private static void createDestinationIndex( + Client client, + TransformConfig config, + Map mappings, + ActionListener listener + ) { + TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( + mappings, + config.getId(), + Clock.systemUTC() + ); + TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener); + } + + private TransformUpdater() {} + +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index c6e1e53e27948..f08733045129e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -24,7 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.ingest.IngestService; @@ -39,23 +37,17 @@ import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response; -import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; -import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; -import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; -import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.persistence.TransformIndex; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.transforms.TransformTask; -import java.time.Clock; import java.util.List; import java.util.Map; @@ -161,89 +153,62 @@ protected void doExecute(Task task, Request request, ActionListener li // GET transform and attempt to update // We don't want the update to complete if the config changed between GET and INDEX transformConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(configAndVersion -> { - final TransformConfig oldConfig = configAndVersion.v1(); - final TransformConfig config = TransformConfig.rewriteForUpdate(oldConfig); - - // If it is a noop don't bother even writing the doc, save the cycles, just return here. - // skip when: - // - config is in the latest index - // - rewrite did not change the config - // - update is not making any changes - if (config.getVersion() != null - && config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED) - && config.equals(oldConfig) - && update.isNoop(config)) { - listener.onResponse(new Response(config)); - return; - } - TransformConfig updatedConfig = update.apply(config); - - final ActionListener updateListener; - if (update.changesSettings(config)) { - PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); - PersistentTasksCustomMetadata.PersistentTask transformTask = tasksMetadata.getTask(request.getId()); - - // to send a request to apply new settings at runtime, several requirements must be met: - // - transform must be running, meaning a task exists - // - transform is not failed (stopped transforms do not have a task) - // - the node where transform is executed on is at least 7.8.0 in order to understand the request - if (transformTask != null - && transformTask.isAssigned() - && transformTask.getState() instanceof TransformState - && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED - && clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) { - request.setNodes(transformTask.getExecutorNode()); - updateListener = ActionListener.wrap(updateResponse -> { - request.setConfig(updateResponse.getConfig()); - super.doExecute(task, request, listener); - }, listener::onFailure); - } else { - updateListener = listener; - } - } else { - updateListener = listener; - } - - // <3> Update the transform - ActionListener validateTransformListener = ActionListener.wrap( - validationResponse -> { - updateTransform( - request, - updatedConfig, - validationResponse.getDestIndexMappings(), - configAndVersion.v2(), - clusterState, - updateListener); - }, - listener::onFailure + TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + clusterState, + settings, + client, + transformConfigManager, + configAndVersion.v1(), + update, + configAndVersion.v2(), + request.isDeferValidation(), + false, // dryRun + true, // checkAccess + ActionListener.wrap(updateResponse -> { + TransformConfig updatedConfig = updateResponse.getConfig(); + auditor.info(updatedConfig.getId(), "Updated transform."); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + + checkTransformConfigAndLogWarnings(updatedConfig); + + if (update.changesSettings(configAndVersion.v1())) { + PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata( + clusterState + ); + PersistentTasksCustomMetadata.PersistentTask transformTask = tasksMetadata.getTask(request.getId()); + + // to send a request to apply new settings at runtime, several requirements must be met: + // - transform must be running, meaning a task exists + // - transform is not failed (stopped transforms do not have a task) + // - the node where transform is executed on is at least 7.8.0 in order to understand the request + if (transformTask != null + && transformTask.isAssigned() + && transformTask.getState() instanceof TransformState + && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED + && clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) { + + request.setNodes(transformTask.getExecutorNode()); + request.setConfig(updatedConfig); + super.doExecute(task, request, listener); + return; + } + } + listener.onResponse(new Response(updatedConfig)); + }, listener::onFailure) ); + }, listener::onFailure)); + } - // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap( - aVoid -> { - client.execute( - ValidateTransformAction.INSTANCE, - new ValidateTransformAction.Request(updatedConfig, request.isDeferValidation()), - validateTransformListener - ); - }, - listener::onFailure); + private void checkTransformConfigAndLogWarnings(TransformConfig config) { + final Function function = FunctionFactory.create(config); + List warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig()); - // <1> Early check to verify that the user can create the destination index and can read from the source - if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { - TransformPrivilegeChecker.checkPrivileges( - "update", - securityContext, - indexNameExpressionResolver, - clusterState, - client, - updatedConfig, - true, - checkPrivilegesListener); - } else { // No security enabled, just move on - checkPrivilegesListener.onResponse(null); - } - }, listener::onFailure)); + for (String warning : warnings) { + logger.warn(new ParameterizedMessage("[{}] {}", config.getId(), warning)); + auditor.warning(config.getId(), warning); + } } @Override @@ -264,77 +229,4 @@ protected Response newResponse( return tasks.get(0); } - private void updateTransform( - Request request, - TransformConfig config, - Map mappings, - SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, - ClusterState clusterState, - ActionListener listener - ) { - final Function function = FunctionFactory.create(config); - - // <3> Return to the listener - ActionListener putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> { - auditor.info(config.getId(), "Updated transform."); - List warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig()); - for (String warning : warnings) { - logger.warn(new ParameterizedMessage("[{}] {}", config.getId(), warning)); - auditor.warning(config.getId(), warning); - } - transformConfigManager.deleteOldTransformConfigurations(request.getId(), ActionListener.wrap(r -> { - logger.trace("[{}] successfully deleted old transform configurations", request.getId()); - listener.onResponse(new Response(config)); - }, e -> { - logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", request.getId()), e); - listener.onResponse(new Response(config)); - })); - }, - // If we failed to INDEX AND we created the destination index, the destination index will still be around - // This is a similar behavior to _start - listener::onFailure - ); - - // <2> Update our transform - ActionListener createDestinationListener = ActionListener.wrap( - createDestResponse -> transformConfigManager.updateTransformConfiguration( - config, - seqNoPrimaryTermAndIndex, - putTransformConfigurationListener - ), - listener::onFailure - ); - - // <1> Create destination index if necessary - String[] dest = indexNameExpressionResolver.concreteIndexNames( - clusterState, - IndicesOptions.lenientExpandOpen(), - config.getDestination().getIndex() - ); - String[] src = indexNameExpressionResolver.concreteIndexNames( - clusterState, - IndicesOptions.lenientExpandOpen(), - true, - config.getSource().getIndex() - ); - // If we are running, we should verify that the destination index exists and create it if it does not - if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, request.getId()) != null && dest.length == 0 - // Verify we have source indices. The user could defer_validations and if the task is already running - // we allow source indices to disappear. If the source and destination indices do not exist, don't do anything - // the transform will just have to dynamically create the destination index without special mapping. - && src.length > 0) { - createDestinationIndex(config, mappings, createDestinationListener); - } else { - createDestinationListener.onResponse(null); - } - } - - private void createDestinationIndex(TransformConfig config, Map mappings, ActionListener listener) { - TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( - mappings, - config.getId(), - Clock.systemUTC() - ); - TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener); - } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java new file mode 100644 index 0000000000000..53b1a78570d36 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java @@ -0,0 +1,244 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Request; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; +import org.elasticsearch.xpack.transform.TransformServices; +import org.elasticsearch.xpack.transform.action.TransformUpdater.UpdateResult; +import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.TransformNodes; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +public class TransportUpgradeTransformsAction extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportUpgradeTransformsAction.class); + private final TransformConfigManager transformConfigManager; + private final SecurityContext securityContext; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final Settings settings; + private final Client client; + private final TransformAuditor auditor; + + @Inject + public TransportUpgradeTransformsAction( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ThreadPool threadPool, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformServices transformServices, + Client client, + Settings settings + ) { + this( + UpgradeTransformsAction.NAME, + transportService, + actionFilters, + clusterService, + threadPool, + indexNameExpressionResolver, + transformServices, + client, + settings + ); + } + + protected TransportUpgradeTransformsAction( + String name, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ThreadPool threadPool, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformServices transformServices, + Client client, + Settings settings + ) { + super( + name, + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + Response::new, + ThreadPool.Names.SAME + ); + this.transformConfigManager = transformServices.getConfigManager(); + this.settings = settings; + + this.client = client; + this.auditor = transformServices.getAuditor(); + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) + ? new SecurityContext(settings, threadPool.getThreadContext()) + : null; + } + + @Override + protected void masterOperation(Task ignoredTask, Request request, ClusterState state, ActionListener listener) + throws Exception { + TransformNodes.warnIfNoTransformNodes(state); + + // do not allow in mixed clusters + if (state.nodes().getMaxNodeVersion().after(state.nodes().getMinNodeVersion())) { + listener.onFailure( + new ElasticsearchStatusException( + "Cannot upgrade transforms. All nodes must be the same version [{}]", + RestStatus.CONFLICT, + state.nodes().getMaxNodeVersion().toString() + ) + ); + return; + } + + recursiveExpandTransformIdsAndUpgrade(request.isDryRun(), ActionListener.wrap(updatesByStatus -> { + final long updated = updatesByStatus.getOrDefault(UpdateResult.Status.UPDATED, 0L); + final long noAction = updatesByStatus.getOrDefault(UpdateResult.Status.NONE, 0L); + final long needsUpdate = updatesByStatus.getOrDefault(UpdateResult.Status.NEEDS_UPDATE, 0L); + + if (request.isDryRun() == false) { + transformConfigManager.deleteOldIndices(ActionListener.wrap(aBool -> { + logger.info("Successfully upgraded all transforms, (updated: [{}], no action [{}])", updated, noAction); + + listener.onResponse(new UpgradeTransformsAction.Response(updated, noAction, needsUpdate)); + }, listener::onFailure)); + } else { + // else: dry run + listener.onResponse(new UpgradeTransformsAction.Response(updated, noAction, needsUpdate)); + } + }, listener::onFailure)); + + } + + @Override + protected ClusterBlockException checkBlock(UpgradeTransformsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private void updateOneTransform(String id, boolean dryRun, ActionListener listener) { + final ClusterState clusterState = clusterService.state(); + + transformConfigManager.getTransformConfigurationForUpdate(id, ActionListener.wrap(configAndVersion -> { + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null); + TransformConfig config = configAndVersion.v1(); + + /* + * keep headers from the original document + * + * TODO: Handle deprecated data_frame_transform roles + * + * The headers store user roles and in case the transform has been created in 7.2-7.4 + * contain the old data_frame_transform_* roles + * + * For 9.x we need to take action as data_frame_transform_* will be removed + * + * Hint: {@link AuthenticationContextSerializer} for decoding the header + */ + update.setHeaders(config.getHeaders()); + + TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + clusterState, + settings, + client, + transformConfigManager, + config, + update, + configAndVersion.v2(), + false, // defer validation + dryRun, + false, // check access, + listener + ); + }, listener::onFailure)); + } + + private void recursiveUpdate( + Deque transformsToUpgrade, + Map updatesByStatus, + boolean dryRun, + ActionListener listener + ) { + String next = transformsToUpgrade.pollFirst(); + + // extra paranoia: return if next is null + if (next == null) { + listener.onResponse(null); + return; + } + + updateOneTransform(next, dryRun, ActionListener.wrap(updateResponse -> { + TransformConfig updatedConfig = updateResponse.getConfig(); + auditor.info(updatedConfig.getId(), "Updated transform."); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + updatesByStatus.compute(updateResponse.getStatus(), (k, v) -> (v == null) ? 1 : v + 1L); + + if (transformsToUpgrade.isEmpty() == false) { + recursiveUpdate(transformsToUpgrade, updatesByStatus, dryRun, listener); + } else { + listener.onResponse(null); + } + }, listener::onFailure)); + } + + private void recursiveExpandTransformIdsAndUpgrade(boolean dryRun, ActionListener> listener) { + transformConfigManager.getAllOutdatedTransformIds(ActionListener.wrap(totalAndIds -> { + + // exit quickly if there is nothing to do + if (totalAndIds.v2().isEmpty()) { + listener.onResponse(Collections.singletonMap(UpdateResult.Status.NONE, totalAndIds.v1())); + return; + } + + Map updatesByStatus = new HashMap<>(); + updatesByStatus.put(UpdateResult.Status.NONE, totalAndIds.v1() - totalAndIds.v2().size()); + + Deque ids = new ArrayDeque<>(totalAndIds.v2()); + + recursiveUpdate( + ids, + updatesByStatus, + dryRun, + ActionListener.wrap(r -> listener.onResponse(updatesByStatus), listener::onFailure) + ); + }, listener::onFailure)); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java index c7ee357051015..1fbe2bc1db618 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java @@ -15,6 +15,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -28,15 +30,9 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -50,6 +46,12 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.transform.TransformField; @@ -63,6 +65,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -94,6 +97,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager { private static final Logger logger = LogManager.getLogger(IndexBasedTransformConfigManager.class); + private static final int MAX_RESULTS_WINDOW = 10_000; private final Client client; private final NamedXContentRegistry xContentRegistry; @@ -249,6 +253,23 @@ public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow ); } + @Override + public void deleteOldIndices(ActionListener listener) { + DeleteIndexRequest deleteRequest = new DeleteIndexRequest( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED, + "-" + TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + + executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + if (response.isAcknowledged() == false) { + listener.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR)); + return; + } + listener.onResponse(true); + }, listener::onFailure)); + } + private void putTransformConfiguration( TransformConfig transformConfig, DocWriteRequest.OpType optType, @@ -326,6 +347,55 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi ); } + @Override + public void getTransformCheckpointForUpdate( + String transformId, + long checkpoint, + ActionListener> checkpointAndVersionListener + ) { + QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint)); + SearchRequest searchRequest = client.prepareSearch( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ) + .setQuery(queryBuilder) + // use sort to get the last + .addSort("_index", SortOrder.DESC) + .setSize(1) + .seqNoAndPrimaryTerm(true) + .setAllowPartialSearchResults(false) + .request(); + + executeAsyncWithOrigin( + client, + TRANSFORM_ORIGIN, + SearchAction.INSTANCE, + searchRequest, + ActionListener.wrap(searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + // do not fail, this _must_ be handled by the caller + checkpointAndVersionListener.onResponse(null); + return; + } + SearchHit hit = searchResponse.getHits().getHits()[0]; + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + parseCheckpointsLenientlyFromSource( + source, + transformId, + ActionListener.wrap( + parsedCheckpoint -> checkpointAndVersionListener.onResponse( + Tuple.tuple( + parsedCheckpoint, + new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()) + ) + ), + checkpointAndVersionListener::onFailure + ) + ); + }, checkpointAndVersionListener::onFailure) + ); + } + @Override public void getTransformConfiguration(String transformId, ActionListener resultListener) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)); @@ -413,6 +483,7 @@ public void expandTransformIds( TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED ) .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) + .addSort("_index", SortOrder.DESC) .setFrom(pageParams.getFrom()) .setTrackTotalHits(true) .setSize(pageParams.getSize()) @@ -438,8 +509,9 @@ public void expandTransformIds( .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) ) { TransformConfig config = TransformConfig.fromXContent(parser, null, true); - ids.add(config.getId()); - configs.add(config); + if (ids.add(config.getId())) { + configs.add(config); + } } catch (IOException e) { foundConfigsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e)); return; @@ -459,7 +531,8 @@ public void expandTransformIds( // in versioned indexes (like transform) if (requiredMatches.isOnlyExact()) { foundConfigsListener.onResponse( - new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))); + new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))) + ); } else { foundConfigsListener.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))); } @@ -468,6 +541,16 @@ public void expandTransformIds( ); } + @Override + public void getAllTransformIds(ActionListener> listener) { + expandAllTransformIds(false, MAX_RESULTS_WINDOW, ActionListener.wrap(r -> listener.onResponse(r.v2()), listener::onFailure)); + } + + @Override + public void getAllOutdatedTransformIds(ActionListener>> listener) { + expandAllTransformIds(true, MAX_RESULTS_WINDOW, listener); + } + @Override public void deleteTransform(String transformId, ActionListener listener) { DeleteByQueryRequest request = createDeleteByQueryRequest(); @@ -547,6 +630,7 @@ public void putOrUpdateTransformStoredDoc( @Override public void getTransformStoredDoc( String transformId, + boolean allowNoMatch, ActionListener> resultListener ) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)); @@ -569,9 +653,15 @@ public void getTransformStoredDoc( searchRequest, ActionListener.wrap(searchResponse -> { if (searchResponse.getHits().getHits().length == 0) { - resultListener.onFailure( - new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)) - ); + if (allowNoMatch) { + resultListener.onResponse(null); + } else { + resultListener.onFailure( + new ResourceNotFoundException( + TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId) + ) + ); + } return; } SearchHit searchHit = searchResponse.getHits().getHits()[0]; @@ -714,6 +804,87 @@ private QueryBuilder buildQueryFromTokenizedIds(String[] idTokens, String resour return QueryBuilders.constantScoreQuery(queryBuilder); } + /** + * Expand all transform ids + * + * @param filterForOutdated if true, only returns outdated ids (after de-duplication) + * @param maxResultWindow the max result window size (exposed for testing) + * @param listener listener to call containing transform ids + */ + void expandAllTransformIds(boolean filterForOutdated, int maxResultWindow, ActionListener>> listener) { + PageParams startPage = new PageParams(0, maxResultWindow); + + Set collectedIds = new HashSet<>(); + recursiveExpandAllTransformIds(collectedIds, 0, filterForOutdated, maxResultWindow, null, startPage, listener); + } + + private void recursiveExpandAllTransformIds( + Set collectedIds, + long total, + boolean filterForOutdated, + int maxResultWindow, + String lastId, + PageParams page, + ActionListener>> listener + ) { + SearchRequest request = client.prepareSearch( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ) + .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) + .addSort("_index", SortOrder.DESC) + .setFrom(page.getFrom()) + .setSize(page.getSize()) + .setFetchSource(false) + .addDocValueField(TransformField.ID.getPreferredName()) + .request(); + + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + ActionListener.wrap(searchResponse -> { + long totalHits = total; + String idOfLastHit = lastId; + + for (SearchHit hit : searchResponse.getHits().getHits()) { + String id = hit.field(TransformField.ID.getPreferredName()).getValue(); + + // paranoia + if (Strings.isNullOrEmpty(id)) { + continue; + } + + // only count hits if looking for outdated transforms + if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) { + ++totalHits; + } else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) { + ++totalHits; + } + idOfLastHit = id; + } + + if (searchResponse.getHits().getHits().length == page.getSize()) { + PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow); + + recursiveExpandAllTransformIds( + collectedIds, + totalHits, + filterForOutdated, + maxResultWindow, + idOfLastHit, + nextPage, + listener + ); + return; + } + + listener.onResponse(new Tuple<>(totalHits, collectedIds)); + }, listener::onFailure), + client::search + ); + } + private static Tuple getStatusAndReason(final BulkByScrollResponse response) { RestStatus status = RestStatus.OK; Throwable reason = new Exception("Unknown error"); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index a122d16edc883..35b952014dfcd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; public interface TransformConfigManager { @@ -85,14 +86,36 @@ void updateTransformConfiguration( */ void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener); + /** + * This deletes all _old_ internal storages(indices) except the most recent one. + * + * CAUTION: Deletes data without checks! Special method for upgrades. + * + * @param listener listener to call on completion + */ + void deleteOldIndices(ActionListener listener); + /** * Get a stored checkpoint, requires the transform id as well as the checkpoint id * * @param transformId the transform id * @param checkpoint the checkpoint - * @param resultListener listener to call after request has been made + * @param checkpointListener listener to call after request has been made + */ + void getTransformCheckpoint(String transformId, long checkpoint, ActionListener checkpointListener); + + /** + * Get a stored checkpoint, requires the transform id as well as the checkpoint id. This function is only for internal use. + * + * @param transformId the transform id + * @param checkpoint the checkpoint + * @param checkpointAndVersionListener listener to call after inner request has returned */ - void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener); + void getTransformCheckpointForUpdate( + String transformId, + long checkpoint, + ActionListener> checkpointAndVersionListener + ); /** * Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET @@ -132,6 +155,20 @@ void expandTransformIds( ActionListener, List>>> foundConfigsListener ); + /** + * Get all transform ids + * + * @param listener The listener to call with the collected ids + */ + void getAllTransformIds(ActionListener> listener); + + /** + * Get all transform ids that aren't using the latest index. + * + * @param listener The listener to call with total number of transforms and the list of transform ids. + */ + void getAllOutdatedTransformIds(ActionListener>> listener); + /** * This deletes the configuration and all other documents corresponding to the transform id (e.g. checkpoints). * @@ -146,7 +183,11 @@ void putOrUpdateTransformStoredDoc( ActionListener listener ); - void getTransformStoredDoc(String transformId, ActionListener> resultListener); + void getTransformStoredDoc( + String transformId, + boolean allowNoMatch, + ActionListener> resultListener + ); void getTransformStoredDocs(Collection transformIds, ActionListener> listener); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestUpgradeTransformsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestUpgradeTransformsAction.java new file mode 100644 index 0000000000000..821c39c9c967b --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestUpgradeTransformsAction.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.rest.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestUpgradeTransformsAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(POST, TransformField.REST_BASE_PATH_TRANSFORMS + "_upgrade")); + } + + @Override + public String getName() { + return "transform_upgrade_transforms_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + if (restRequest.hasContent()) { + throw new IllegalArgumentException("upgrade does not allow a request body"); + } + + boolean dryRun = restRequest.paramAsBoolean(TransformField.DRY_RUN.getPreferredName(), false); + + return channel -> client.execute( + UpgradeTransformsAction.INSTANCE, + new UpgradeTransformsAction.Request(dryRun), + new RestToXContentListener<>(channel) + ); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index f86a5412890a6..1ac4c6cacff1d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -273,7 +273,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ValidationException validationException = config.validate(null); if (validationException == null) { indexerBuilder.setTransformConfig(config); - transformServices.getConfigManager().getTransformStoredDoc(transformId, transformStatsActionListener); + transformServices.getConfigManager().getTransformStoredDoc(transformId, false, transformStatsActionListener); } else { auditor.error(transformId, validationException.getMessage()); markAsFailed( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java new file mode 100644 index 0000000000000..b9754960bd22e --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformUpdaterTests.java @@ -0,0 +1,343 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; +import org.elasticsearch.xpack.core.security.user.User; +import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.action.TransformUpdater.UpdateResult; +import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; +import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; +import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class TransformUpdaterTests extends ESTestCase { + + private static final String USER_NAME = "bob"; + private final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, null) { + @Override + public User getUser() { + return new User(USER_NAME); + } + }; + private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); + private Client client; + private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build(); + + private static class MyMockClient extends NoOpClient { + + MyMockClient(String testName) { + super(testName); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof HasPrivilegesRequest) { + listener.onResponse((Response) new HasPrivilegesResponse()); + } else if (request instanceof ValidateTransformAction.Request) { + listener.onResponse((Response) new ValidateTransformAction.Response(Collections.emptyMap())); + } else { + super.doExecute(action, request, listener); + } + } + } + + @Before + public void setupClient() { + if (client != null) { + client.close(); + } + client = new MyMockClient(getTestName()); + } + + @After + public void tearDownClient() { + client.close(); + } + + public void testTransformUpdateNoAction() throws InterruptedException { + TransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig maxCompatibleConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + Version.CURRENT + ); + transformConfigManager.putTransformConfiguration(maxCompatibleConfig, ActionListener.wrap(r -> {}, e -> {})); + assertConfiguration( + listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener), + config -> {} + ); + + TransformConfigUpdate update = TransformConfigUpdate.EMPTY; + assertUpdate( + listener -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + maxCompatibleConfig, + update, + null, // seqNoPrimaryTermAndIndex + true, + false, + false, + listener + ), + updateResult -> { + assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); + assertEquals(maxCompatibleConfig, updateResult.getConfig()); + } + ); + assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener), config -> { + assertNotNull(config); + assertEquals(Version.CURRENT, config.getVersion()); + }); + + TransformConfig minCompatibleConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + TransformConfig.CONFIG_VERSION_LAST_CHANGED + ); + transformConfigManager.putTransformConfiguration(minCompatibleConfig, ActionListener.wrap(r -> {}, e -> {})); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + minCompatibleConfig, + update, + null, // seqNoPrimaryTermAndIndex + true, + false, + false, + listener + ), + updateResult -> { + assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); + assertEquals(minCompatibleConfig, updateResult.getConfig()); + } + ); + assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> { + assertNotNull(config); + assertEquals(TransformConfig.CONFIG_VERSION_LAST_CHANGED, config.getVersion()); + }); + } + + public void testTransformUpdateRewrite() throws InterruptedException { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED) + ) + ); + + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.wrap(r -> {}, e -> {})); + TransformCheckpoint checkpoint = new TransformCheckpoint( + oldConfig.getId(), + 0L, // timestamp + 42L, // checkpoint + Collections.singletonMap("index_1", new long[] { 1, 2, 3, 4 }), // index checkpoints + 0L + ); + transformConfigManager.putOldTransformCheckpoint(checkpoint, ActionListener.wrap(r -> {}, e -> {})); + + TransformStoredDoc stateDoc = new TransformStoredDoc( + oldConfig.getId(), + new TransformState( + TransformTaskState.STARTED, + IndexerState.INDEXING, + null, // position + 42L, // checkpoint + null, // reason + null, // progress + null, // node attributes + false // shouldStopAtNextCheckpoint + ), + TransformIndexerStatsTests.randomStats() + ); + transformConfigManager.putOrUpdateOldTransformStoredDoc(stateDoc, null, ActionListener.wrap(r -> {}, e -> {})); + + assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> {}); + + TransformConfigUpdate update = TransformConfigUpdate.EMPTY; + assertUpdate( + listener -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + update, + null, // seqNoPrimaryTermAndIndex + true, + false, + false, + listener + ), + updateResult -> { + assertEquals(UpdateResult.Status.UPDATED, updateResult.getStatus()); + assertNotEquals(oldConfig, updateResult.getConfig()); + } + ); + assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> { + assertNotNull(config); + assertEquals(Version.CURRENT, config.getVersion()); + }); + + assertCheckpoint( + listener -> transformConfigManager.getTransformCheckpointForUpdate(oldConfig.getId(), 42L, listener), + checkpointAndVersion -> { + assertEquals(InMemoryTransformConfigManager.CURRENT_INDEX, checkpointAndVersion.v2().getIndex()); + assertEquals(42L, checkpointAndVersion.v1().getCheckpoint()); + assertEquals(checkpoint.getIndicesCheckpoints(), checkpointAndVersion.v1().getIndicesCheckpoints()); + } + ); + + assertStoredState( + listener -> transformConfigManager.getTransformStoredDoc(oldConfig.getId(), false, listener), + storedDocAndVersion -> { + assertEquals(InMemoryTransformConfigManager.CURRENT_INDEX, storedDocAndVersion.v2().getIndex()); + assertEquals(stateDoc.getTransformState(), storedDocAndVersion.v1().getTransformState()); + assertEquals(stateDoc.getTransformStats(), storedDocAndVersion.v1().getTransformStats()); + } + ); + + // same as dry run + TransformConfig oldConfigForDryRunUpdate = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED) + ) + ); + + transformConfigManager.putOldTransformConfiguration(oldConfigForDryRunUpdate, ActionListener.wrap(r -> {}, e -> {})); + assertConfiguration( + listener -> transformConfigManager.getTransformConfiguration(oldConfigForDryRunUpdate.getId(), listener), + config -> {} + ); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfigForDryRunUpdate, + update, + null, // seqNoPrimaryTermAndIndex + true, + true, + false, + listener + ), + updateResult -> { + assertEquals(UpdateResult.Status.NEEDS_UPDATE, updateResult.getStatus()); + assertNotEquals(oldConfigForDryRunUpdate, updateResult.getConfig()); + assertEquals(Version.CURRENT, updateResult.getConfig().getVersion()); + } + ); + assertConfiguration( + listener -> transformConfigManager.getTransformConfiguration(oldConfigForDryRunUpdate.getId(), listener), + config -> { + assertNotNull(config); + assertEquals(oldConfigForDryRunUpdate, config); + } + ); + } + + private void assertUpdate(Consumer> function, Consumer furtherTests) + throws InterruptedException { + assertAsync(function, furtherTests); + } + + private void assertConfiguration(Consumer> function, Consumer furtherTests) + throws InterruptedException { + assertAsync(function, furtherTests); + } + + private void assertCheckpoint( + Consumer>> function, + Consumer> furtherTests + ) throws InterruptedException { + assertAsync(function, furtherTests); + } + + private void assertStoredState( + Consumer>> function, + Consumer> furtherTests + ) throws InterruptedException { + assertAsync(function, furtherTests); + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java index c09280e0a428f..d9fb1dad2aebb 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java @@ -9,8 +9,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.core.Tuple; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,12 +35,32 @@ */ public class InMemoryTransformConfigManager implements TransformConfigManager { + public static String CURRENT_INDEX = "index-1"; + public static String OLD_INDEX = "index-0"; + private final Map> checkpoints = new HashMap<>(); private final Map configs = new HashMap<>(); private final Map transformStoredDocs = new HashMap<>(); + // for mocking updates + private final Map> oldCheckpoints = new HashMap<>(); + private final Map oldConfigs = new HashMap<>(); + private final Map oldTransformStoredDocs = new HashMap<>(); + public InMemoryTransformConfigManager() {} + public void putOldTransformCheckpoint(TransformCheckpoint checkpoint, ActionListener listener) { + oldCheckpoints.compute(checkpoint.getTransformId(), (id, listOfCheckpoints) -> { + if (listOfCheckpoints == null) { + listOfCheckpoints = new ArrayList(); + } + listOfCheckpoints.add(checkpoint); + return listOfCheckpoints; + }); + + listener.onResponse(true); + } + @Override public void putTransformCheckpoint(TransformCheckpoint checkpoint, ActionListener listener) { checkpoints.compute(checkpoint.getTransformId(), (id, listOfCheckpoints) -> { @@ -54,6 +74,11 @@ public void putTransformCheckpoint(TransformCheckpoint checkpoint, ActionListene listener.onResponse(true); } + public void putOldTransformConfiguration(TransformConfig transformConfig, ActionListener listener) { + oldConfigs.put(transformConfig.getId(), transformConfig); + listener.onResponse(true); + } + @Override public void putTransformConfiguration(TransformConfig transformConfig, ActionListener listener) { configs.put(transformConfig.getId(), transformConfig); @@ -66,7 +91,6 @@ public void updateTransformConfiguration( SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ActionListener listener ) { - // for now we ignore seqNoPrimaryTermAndIndex configs.put(transformConfig.getId(), transformConfig); listener.onResponse(true); @@ -74,23 +98,51 @@ public void updateTransformConfiguration( @Override public void deleteOldTransformConfigurations(String transformId, ActionListener listener) { - configs.remove(transformId); + oldConfigs.remove(transformId); listener.onResponse(true); } @Override public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener) { - listener.onResponse(transformStoredDocs.remove(transformId) == null ? 0L : 1L); + long deletedDocs = oldTransformStoredDocs.remove(transformId) == null ? 0L : 1L; + listener.onResponse(deletedDocs); } @Override public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener) { - List checkpointsById = checkpoints.get(transformId); - int sizeBeforeDelete = checkpointsById.size(); + long deletedDocs = 0; + + final List checkpointsById = checkpoints.get(transformId); if (checkpointsById != null) { + final int sizeBeforeDelete = checkpointsById.size(); checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; }); + deletedDocs += sizeBeforeDelete - checkpointsById.size(); + } + + // old checkpoints + final List checkpointsByIdOld = oldCheckpoints.get(transformId); + if (checkpointsByIdOld != null) { + final int sizeBeforeDeleteOldCheckpoints = checkpointsByIdOld.size(); + checkpointsByIdOld.removeIf(cp -> cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan); + deletedDocs += sizeBeforeDeleteOldCheckpoints - checkpointsByIdOld.size(); + } + + listener.onResponse(deletedDocs); + } + + @Override + public void deleteOldIndices(ActionListener listener) { + if (oldCheckpoints.isEmpty() && oldConfigs.isEmpty() && oldTransformStoredDocs.isEmpty()) { + listener.onResponse(true); + return; } - listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size())); + + // found old documents, emulate index deletion + oldCheckpoints.clear(); + oldConfigs.clear(); + oldTransformStoredDocs.clear(); + + listener.onResponse(true); } @Override @@ -106,12 +158,56 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi } } + checkpointsById = oldCheckpoints.get(transformId); + + if (checkpointsById != null) { + for (TransformCheckpoint t : checkpointsById) { + if (t.getCheckpoint() == checkpoint) { + resultListener.onResponse(t); + return; + } + } + } + resultListener.onResponse(TransformCheckpoint.EMPTY); } + @Override + public void getTransformCheckpointForUpdate( + String transformId, + long checkpoint, + ActionListener> checkpointAndVersionListener + ) { + List checkpointsById = checkpoints.get(transformId); + if (checkpointsById != null) { + for (TransformCheckpoint t : checkpointsById) { + if (t.getCheckpoint() == checkpoint) { + checkpointAndVersionListener.onResponse(Tuple.tuple(t, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX))); + return; + } + } + } + + checkpointsById = oldCheckpoints.get(transformId); + if (checkpointsById != null) { + for (TransformCheckpoint t : checkpointsById) { + if (t.getCheckpoint() == checkpoint) { + checkpointAndVersionListener.onResponse(Tuple.tuple(t, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX))); + return; + } + } + } + + checkpointAndVersionListener.onResponse(null); + } + @Override public void getTransformConfiguration(String transformId, ActionListener resultListener) { TransformConfig config = configs.get(transformId); + if (config == null) { + config = oldConfigs.get(transformId); + } + if (config == null) { resultListener.onFailure( new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) @@ -127,14 +223,20 @@ public void getTransformConfigurationForUpdate( ActionListener> configAndVersionListener ) { TransformConfig config = configs.get(transformId); - if (config == null) { + TransformConfig oldConfig = oldConfigs.get(transformId); + + if (config == null && oldConfig == null) { configAndVersionListener.onFailure( new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId)) ); return; } - configAndVersionListener.onResponse(Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1"))); + if (config != null) { + configAndVersionListener.onResponse(Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX))); + } else { + configAndVersionListener.onResponse(Tuple.tuple(oldConfig, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX))); + } } @Override @@ -144,39 +246,72 @@ public void expandTransformIds( boolean allowNoMatch, ActionListener, List>>> foundConfigsListener ) { - if (Regex.isMatchAllPattern(transformIdsExpression)) { - List ids = new ArrayList<>(configs.keySet()); - foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, new ArrayList<>(configs.values())))); + List ids = new ArrayList<>(); + List configsAsList = new ArrayList<>(); + configs.entrySet().forEach(entry -> { + ids.add(entry.getKey()); + configsAsList.add(entry.getValue()); + }); + + oldConfigs.entrySet().forEach(entry -> { + if (configs.containsKey(entry.getKey()) == false) { + ids.add(entry.getKey()); + configsAsList.add(entry.getValue()); + } + }); + + foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, configsAsList))); return; } if (Regex.isSimpleMatchPattern(transformIdsExpression) == false) { if (configs.containsKey(transformIdsExpression)) { foundConfigsListener.onResponse( - new Tuple<>( - 1L, Tuple.tuple(singletonList(transformIdsExpression), singletonList(configs.get(transformIdsExpression))))); + new Tuple<>(1L, Tuple.tuple(singletonList(transformIdsExpression), singletonList(configs.get(transformIdsExpression)))) + ); } else { foundConfigsListener.onResponse(new Tuple<>(0L, Tuple.tuple(emptyList(), emptyList()))); } return; } - Set ids = new LinkedHashSet<>(); - Set matchedConfigs = new LinkedHashSet<>(); - configs.keySet().forEach(id -> { - if (Regex.simpleMatch(transformIdsExpression, id)) { - ids.add(id); - matchedConfigs.add(configs.get(id)); + + List ids = new ArrayList<>(); + List configsAsList = new ArrayList<>(); + configs.entrySet().forEach(entry -> { + if (Regex.simpleMatch(transformIdsExpression, entry.getKey())) { + ids.add(entry.getKey()); + configsAsList.add(entry.getValue()); + } + }); + + oldConfigs.entrySet().forEach(entry -> { + if (configs.containsKey(entry.getKey()) == false && Regex.simpleMatch(transformIdsExpression, entry.getKey())) { + ids.add(entry.getKey()); + configsAsList.add(entry.getValue()); } }); - foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(matchedConfigs)))); + foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, configsAsList))); } @Override public void deleteTransform(String transformId, ActionListener listener) { configs.remove(transformId); + oldConfigs.remove(transformId); transformStoredDocs.remove(transformId); + oldTransformStoredDocs.remove(transformId); checkpoints.remove(transformId); + oldCheckpoints.remove(transformId); + } + + public void putOrUpdateOldTransformStoredDoc( + TransformStoredDoc storedDoc, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + ActionListener listener + ) { + // for now we ignore seqNoPrimaryTermAndIndex + oldTransformStoredDocs.put(storedDoc.getId(), storedDoc); + listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX)); } @Override @@ -185,27 +320,37 @@ public void putOrUpdateTransformStoredDoc( SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ActionListener listener ) { - // for now we ignore seqNoPrimaryTermAndIndex transformStoredDocs.put(storedDoc.getId(), storedDoc); - listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1")); + listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX)); } @Override public void getTransformStoredDoc( String transformId, + boolean allowNoMatch, ActionListener> resultListener ) { - TransformStoredDoc storedDoc = transformStoredDocs.get(transformId); - if (storedDoc == null) { - resultListener.onFailure( - new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)) - ); + if (storedDoc != null) { + resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX))); return; } - resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1"))); + storedDoc = oldTransformStoredDocs.get(transformId); + if (storedDoc != null) { + resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX))); + return; + } + + if (allowNoMatch) { + resultListener.onResponse(null); + return; + } + + resultListener.onFailure( + new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)) + ); } @Override @@ -215,6 +360,11 @@ public void getTransformStoredDocs(Collection transformIds, ActionListen TransformStoredDoc storedDoc = transformStoredDocs.get(transformId); if (storedDoc != null) { docs.add(storedDoc); + } else { + storedDoc = oldTransformStoredDocs.get(transformId); + if (storedDoc != null) { + docs.add(storedDoc); + } } } listener.onResponse(docs); @@ -225,4 +375,18 @@ public void refresh(ActionListener listener) { listener.onResponse(true); } + @Override + public void getAllTransformIds(ActionListener> listener) { + Set allIds = new HashSet<>(configs.keySet()); + allIds.addAll(oldConfigs.keySet()); + listener.onResponse(allIds); + } + + @Override + public void getAllOutdatedTransformIds(ActionListener>> listener) { + Set outdatedIds = new HashSet<>(oldConfigs.keySet()); + outdatedIds.removeAll(configs.keySet()); + listener.onResponse(new Tuple<>(Long.valueOf(configs.size() + outdatedIds.size()), outdatedIds)); + } + }