From 0c9662cacc65455626bb4c964cc4ae3f370ac0b0 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 14 Dec 2020 17:19:39 -0500 Subject: [PATCH] [7.x] Add setting to decommission legacy monitoring cluster alerts (#62668) (#64443) Adds a setting that, when enabled, directs any currently running exporters in monitoring will treat any cluster alert definition as excluded from the list of allowed cluster alert watches. This is the first step to adding a migration path away from using cluster alerts configured by the monitoring plugin and toward those managed by the stack monitoring solutions on the new alerting feature. Co-authored-by: Przemko Robakowski --- .../xpack/monitoring/Monitoring.java | 4 + .../xpack/monitoring/exporter/Exporters.java | 5 +- .../exporter/http/HttpExporter.java | 3 +- .../exporter/local/LocalExporter.java | 10 +- .../http/HttpExporterResourceTests.java | 56 +++++- .../local/LocalExporterIntegTestCase.java | 18 +- .../LocalExporterResourceIntegTests.java | 159 +++++++++++++++++- .../test/MockClusterAlertScriptEngine.java | 101 +++++++++++ .../test/MonitoringIntegTestCase.java | 9 +- 9 files changed, 345 insertions(+), 20 deletions(-) create mode 100644 x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index c3589e4dc26dc..bdcef93e71fa9 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -82,6 +82,9 @@ public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin public static final Setting CLEAN_WATCHER_HISTORY = boolSetting("xpack.watcher.history.cleaner_service.enabled", true, Setting.Property.Dynamic, Setting.Property.NodeScope, Setting.Property.Deprecated); + public static final Setting MIGRATION_DECOMMISSION_ALERTS = boolSetting("xpack.monitoring.migration.decommission_alerts", + false, Setting.Property.Dynamic, Setting.Property.NodeScope); + protected final Settings settings; private final boolean transportClientMode; @@ -164,6 +167,7 @@ public List> getSettings() { List> settings = new ArrayList<>(); settings.add(MonitoringField.HISTORY_DURATION); settings.add(CLEAN_WATCHER_HISTORY); + settings.add(MIGRATION_DECOMMISSION_ALERTS); settings.add(MonitoringService.ENABLED); settings.add(MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED); settings.add(MonitoringService.INTERVAL); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java index 7cff593def80a..7e1df36b4ba0b 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java @@ -23,6 +23,7 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; @@ -62,7 +63,9 @@ public Exporters(Settings settings, Map factories, final List> dynamicSettings = getSettings().stream().filter(Setting::isDynamic).collect(Collectors.toList()); - clusterService.getClusterSettings().addSettingsUpdateConsumer(this::setExportersSetting, dynamicSettings); + final List> updateSettings = new ArrayList>(dynamicSettings); + updateSettings.add(Monitoring.MIGRATION_DECOMMISSION_ALERTS); + clusterService.getClusterSettings().addSettingsUpdateConsumer(this::setExportersSetting, updateSettings); HttpExporter.registerSettingValidators(clusterService, sslService); // this ensures that logging is happening by adding an empty consumer per affix setting for (Setting.AffixSetting affixSetting : dynamicSettings) { diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java index 8699aad3fa5e0..2fdf80e5d0484 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLConfigurationSettings; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; @@ -908,7 +909,7 @@ private static void configureClusterAlertsResources(final Config config, final S // add a resource per watch for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { - final boolean blacklisted = blacklist.contains(watchId); + final boolean blacklisted = blacklist.contains(watchId) || Monitoring.MIGRATION_DECOMMISSION_ALERTS.get(config.settings()); // lazily load the cluster state to fetch the cluster UUID once it's loaded final Supplier uniqueWatchId = () -> ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId); final Supplier watch = blacklisted ? null : () -> ClusterAlertsUtil.loadWatch(clusterService, watchId); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java index 846c6f81ba8db..7e159d8179f41 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; @@ -105,6 +106,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle private final boolean useIngest; private final DateFormatter dateTimeFormatter; private final List clusterAlertBlacklist; + private final boolean decommissionClusterAlerts; private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final AtomicBoolean installingSomething = new AtomicBoolean(false); @@ -120,6 +122,7 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean this.licenseState = config.licenseState(); this.useIngest = USE_INGEST_PIPELINE_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings()); this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config); + this.decommissionClusterAlerts = Monitoring.MIGRATION_DECOMMISSION_ALERTS.get(config.settings()); this.cleanerService = cleanerService; this.dateTimeFormatter = dateTimeFormatter(config); // if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly @@ -158,8 +161,10 @@ public void licenseStateChanged() { boolean isExporterReady() { // forces the setup to occur if it hasn't already final boolean running = resolveBulk(clusterService.state(), false) != null; + // Report on watcher readiness + boolean alertsProcessed = canUseWatcher() == false || watcherSetup.get(); - return running && installingSomething.get() == false; + return running && installingSomething.get() == false && alertsProcessed; } @Override @@ -455,7 +460,8 @@ private void getClusterAlertsInstallationAsyncActions(final boolean indexExists, for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId); - final boolean addWatch = canAddWatches && clusterAlertBlacklist.contains(watchId) == false; + final boolean addWatch = canAddWatches && clusterAlertBlacklist.contains(watchId) == false && + decommissionClusterAlerts == false; // we aren't sure if no watches exist yet, so add them if (indexExists) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java index 1b1947f76c613..c9b8fb197d8e6 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java @@ -102,7 +102,11 @@ public void setupResources() { } public void awaitCheckAndPublish(final Boolean expected) { - resources.checkAndPublish(client, listener); + awaitCheckAndPublish(resources, expected); + } + + public void awaitCheckAndPublish(HttpResource resource, final Boolean expected) { + resource.checkAndPublish(client, listener); verifyListener(expected); } @@ -484,6 +488,56 @@ public void testWatchPublishBlocksAfterSuccessfulWatcherCheck() { verifyNoMoreInteractions(client); } + public void testDeployClusterAlerts() { + final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES); + final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates; + final int successfulGetPipelines = randomIntBetween(0, EXPECTED_PIPELINES); + final int unsuccessfulGetPipelines = EXPECTED_PIPELINES - successfulGetPipelines; + final Exception exception = failurePutException(); + + whenValidVersionResponse(); + whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates); + whenSuccessfulPutTemplates(unsuccessfulGetTemplates); + whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines); + whenSuccessfulPutPipelines(unsuccessfulGetPipelines); + // license needs to be valid, otherwise we'll do DELETEs, which are tested earlier + whenWatcherCanBeUsed(true); + + // a number of watches are mocked as present + final int existingWatches = randomIntBetween(0, EXPECTED_WATCHES); + + // For completeness's sake. GET/PUT watches wont be called by the resources. + // Instead it tries to DELETE the watches ignoring them not existing. + whenGetWatches(existingWatches, EXPECTED_WATCHES - existingWatches); + whenPerformRequestAsyncWith(client, new RequestMatcher(is("PUT"), startsWith("/_watcher/watch/")), exception); + whenPerformRequestAsyncWith(client, new RequestMatcher(is("DELETE"), startsWith("/_watcher/watch/")), + successfulDeleteResponses(EXPECTED_WATCHES)); + + // Create resources that are configured to remove all watches + Settings removalExporterSettings = Settings.builder() + .put(exporterSettings) + .put("xpack.monitoring.migration.decommission_alerts", true) + .build(); + MultiHttpResource overrideResource = HttpExporter.createResources( + new Exporter.Config("_http", "http", removalExporterSettings, clusterService, licenseState)); + + assertTrue(overrideResource.isDirty()); + awaitCheckAndPublish(overrideResource, true); + // Should proceed + assertFalse(overrideResource.isDirty()); + + verifyVersionCheck(); + verifyGetTemplates(EXPECTED_TEMPLATES); + verifyPutTemplates(unsuccessfulGetTemplates); + verifyGetPipelines(EXPECTED_PIPELINES); + verifyPutPipelines(unsuccessfulGetPipelines); + verifyWatcherCheck(); + verifyGetWatches(0); + verifyPutWatches(0); + verifyDeleteWatches(EXPECTED_WATCHES); + verifyNoMoreInteractions(client); + } + public void testSuccessfulChecksOnElectedMasterNode() { final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES); final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java index acc730f0e3dd3..af5c86ab0b085 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java @@ -32,7 +32,7 @@ public static void setupThreadPool() { } @AfterClass - public static void cleanUpStatic() throws Exception { + public static void cleanUpStatic() { if (THREADPOOL != null) { terminate(THREADPOOL); } @@ -57,6 +57,15 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + /** + * Create a new {@link LocalExporter} with the default exporter settings and name. + * + * @return Never {@code null}. + */ + protected LocalExporter createLocalExporter() { + return createLocalExporter(exporterName, null); + } + /** * Create a new {@link LocalExporter}. Expected usage: *

@@ -68,12 +77,11 @@ protected Settings nodeSettings(int nodeOrdinal) {
      *
      * @return Never {@code null}.
      */
-    protected LocalExporter createLocalExporter() {
-        final Settings settings = localExporterSettings();
+    protected LocalExporter createLocalExporter(String exporterName, Settings exporterSettings) {
         final XPackLicenseState licenseState = TestUtils.newTestLicenseState();
-        final Exporter.Config config = new Exporter.Config(exporterName, "local", settings, clusterService(), licenseState);
+        final Exporter.Config config = new Exporter.Config(exporterName, "local", exporterSettings, clusterService(), licenseState);
         final CleanerService cleanerService =
-                new CleanerService(settings, clusterService().getClusterSettings(), THREADPOOL, licenseState);
+            new CleanerService(exporterSettings, clusterService().getClusterSettings(), THREADPOOL, licenseState);
 
         return new LocalExporter(config, client(), cleanerService);
     }
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
index 8dab66e810843..44c74c7cbfcb2 100644
--- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
@@ -5,18 +5,32 @@
  */
 package org.elasticsearch.xpack.monitoring.exporter.local;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.ingest.PipelineConfiguration;
+import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
+import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction;
 import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
@@ -31,10 +45,18 @@
                               numDataNodes = 1, numClientNodes = 0, transportClientRatio = 0.0, supportsDedicatedMasters = false)
 public class LocalExporterResourceIntegTests extends LocalExporterIntegTestCase {
 
-    public LocalExporterResourceIntegTests() throws Exception {
+    public LocalExporterResourceIntegTests() {
         super();
     }
 
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put("xpack.license.self_generated.type", "trial")
+            .build();
+    }
+
     private final MonitoredSystem system = randomFrom(MonitoredSystem.values());
 
     public void testCreateWhenResourcesNeedToBeAddedOrUpdated() throws Exception {
@@ -56,12 +78,45 @@ public void testCreateWhenResourcesShouldNotBeReplaced() throws Exception {
         assertPipelinesNotUpdated();
     }
 
+    public void testRemoveWhenResourcesShouldBeRemoved() throws Exception {
+        putResources(newEnoughVersion());
+
+        assertResourcesExist();
+        waitNoPendingTasksOnAll();
+
+        Settings exporterSettings = Settings.builder().put(localExporterSettings())
+            .put("xpack.monitoring.migration.decommission_alerts", true).build();
+
+        createResources("decommission_local", exporterSettings);
+        waitNoPendingTasksOnAll();
+        assertBusy(() -> {
+            assertTemplatesExist();
+            assertPipelinesExist();
+            assertNoWatchesExist();
+        });
+
+    }
+
+    @Override
+    protected Settings localExporterSettings() {
+        // Override the settings for local exporters created in this test, make sure watcher is enabled so we can test
+        // cluster alert creation and decommissioning
+        return Settings.builder()
+            .put(super.localExporterSettings())
+            .put("xpack.monitoring.exporters." + exporterName +  ".cluster_alerts.management.enabled", true)
+            .build();
+    }
+
     private void createResources() throws Exception {
+        createResources(exporterName, localExporterSettings());
+    }
+
+    private void createResources(String exporterName, Settings exporterSettings) throws Exception {
         // wait until the cluster is ready (this is done at the "Exporters" level)
         // this is not a busy assertion because it's checked earlier
         assertThat(clusterService().state().version(), not(ClusterState.UNKNOWN_VERSION));
 
-        try (LocalExporter exporter = createLocalExporter()) {
+        try (LocalExporter exporter = createLocalExporter(exporterName, exporterSettings)) {
             assertBusy(() -> assertThat(exporter.isExporterReady(), is(true)));
         }
     }
@@ -102,6 +157,7 @@ private void putResources(final Integer version) throws Exception {
 
         putTemplate(version);
         putPipelines(version);
+        putWatches(version);
     }
 
     private void putTemplate(final Integer version) throws Exception {
@@ -111,13 +167,13 @@ private void putTemplate(final Integer version) throws Exception {
         assertAcked(client().admin().indices().preparePutTemplate(templateName).setSource(source, XContentType.JSON).get());
     }
 
-    private void putPipelines(final Integer version) throws Exception {
+    private void putPipelines(final Integer version) {
         for (final String pipelineId : MonitoringTemplateUtils.PIPELINE_IDS) {
             putPipeline(MonitoringTemplateUtils.pipelineName(pipelineId), version);
         }
     }
 
-    private void putPipeline(final String pipelineName, final Integer version) throws Exception {
+    private void putPipeline(final String pipelineName, final Integer version) {
         assertAcked(client().admin().cluster().preparePutPipeline(pipelineName, replaceablePipeline(version), XContentType.JSON).get());
     }
 
@@ -150,6 +206,55 @@ private BytesReference replaceablePipeline(final Integer version) {
         }
     }
 
+    /**
+     * Create a cluster alert that does nothing.
+     * @param version Version to add to the watch, if any
+     */
+    private void putWatches(final Integer version) throws Exception {
+        for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
+            final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService(), watchId);
+            final BytesReference watch = generateWatchSource(watchId, clusterService().state().metadata().clusterUUID(), version);
+            client().execute(PutWatchAction.INSTANCE, new PutWatchRequest(uniqueWatchId, watch, XContentType.JSON))
+                .actionGet();
+        }
+    }
+
+    /**
+     * Generates a basic watch that loosely represents a monitoring cluster alert but ultimately does nothing.
+     */
+    private static BytesReference generateWatchSource(final String id, final String clusterUUID, final Integer version) throws Exception {
+        final XContentBuilder builder = jsonBuilder().startObject();
+        builder
+            .startObject("metadata")
+                .startObject("xpack")
+                    .field("cluster_uuid", clusterUUID);
+                    if(version != null) {
+                        builder.field("version_created", Integer.toString(version));
+                    }
+        builder
+                    .field("watch", id)
+                .endObject()
+            .endObject()
+            .startObject("trigger")
+                .startObject("schedule")
+                    .field("interval", "30m")
+                .endObject()
+            .endObject()
+            .startObject("input")
+                .startObject("simple")
+                    .field("ignore", "ignore")
+                .endObject()
+            .endObject()
+            .startObject("condition")
+                .startObject("never")
+                .endObject()
+            .endObject()
+            .startObject("actions")
+            .endObject();
+
+        return BytesReference.bytes(builder.endObject());
+    }
+
     private Integer oldVersion() {
         final int minimumVersion = Math.min(ClusterAlertsUtil.LAST_UPDATED_VERSION, MonitoringTemplateUtils.LAST_UPDATED_VERSION);
 
@@ -179,6 +284,51 @@ private void assertPipelinesExist() {
         }
     }
 
+    private void assertWatchesExist() {
+        // Check if watches index exists
+        if (client().admin().indices().prepareGetIndex().addIndices(".watches").get().getIndices().length == 0) {
+            fail("Expected [.watches] index with cluster alerts present, but no [.watches] index was found");
+        }
+
+        String clusterUUID = clusterService().state().getMetadata().clusterUUID();
+        SearchSourceBuilder searchSource = SearchSourceBuilder.searchSource()
+            .query(QueryBuilders.matchQuery("metadata.xpack.cluster_uuid", clusterUUID));
+        Set watchIds = new HashSet<>(Arrays.asList(ClusterAlertsUtil.WATCH_IDS));
+        for (SearchHit hit : client().prepareSearch(".watches").setSource(searchSource).get().getHits().getHits()) {
+            String watchId = ObjectPath.eval("metadata.xpack.watch", hit.getSourceAsMap());
+            assertNotNull("Missing watch ID", watchId);
+            assertTrue("found unexpected watch id", watchIds.contains(watchId));
+
+            String version = ObjectPath.eval("metadata.xpack.version_created", hit.getSourceAsMap());
+            assertNotNull("Missing version from returned watch [" + watchId + "]", version);
+            assertTrue(Version.fromId(Integer.parseInt(version)).onOrAfter(Version.fromId(ClusterAlertsUtil.LAST_UPDATED_VERSION)));
+
+            String uuid = ObjectPath.eval("metadata.xpack.cluster_uuid", hit.getSourceAsMap());
+            assertNotNull("Missing cluster uuid", uuid);
+            assertEquals(clusterUUID, uuid);
+        }
+    }
+
+    private void assertNoWatchesExist() {
+        // Check if watches index exists
+        if (client().admin().indices().prepareGetIndex().addIndices(".watches").get().getIndices().length == 0) {
+            fail("Expected [.watches] index with cluster alerts present, but no [.watches] index was found");
+        }
+
+        String clusterUUID = clusterService().state().getMetadata().clusterUUID();
+        SearchSourceBuilder searchSource = SearchSourceBuilder.searchSource()
+            .query(QueryBuilders.matchQuery("metadata.xpack.cluster_uuid", clusterUUID));
+        SearchResponse searchResponse = client().prepareSearch(".watches").setSource(searchSource).get();
+        if (searchResponse.getHits().getTotalHits().value > 0) {
+            List invalidWatches = new ArrayList<>();
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                invalidWatches.add(ObjectPath.eval("metadata.xpack.watch", hit.getSourceAsMap()));
+            }
+            fail("Found [" + searchResponse.getHits().getTotalHits().value + "] invalid watches when none were expected: "
+                + invalidWatches);
+        }
+    }
+
     private void assertResourcesExist() throws Exception {
         createResources();
 
@@ -187,6 +337,7 @@ private void assertResourcesExist() throws Exception {
         assertBusy(() -> {
             assertTemplatesExist();
             assertPipelinesExist();
+            assertWatchesExist();
         });
     }
 
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java
new file mode 100644
index 0000000000000..77860f20d5e78
--- /dev/null
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.test;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.script.MockDeterministicScript;
+import org.elasticsearch.script.ScriptContext;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.xpack.core.monitoring.test.MockPainlessScriptEngine;
+import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
+import org.elasticsearch.xpack.core.watcher.watch.Payload;
+import org.elasticsearch.xpack.watcher.condition.WatcherConditionScript;
+import org.elasticsearch.xpack.watcher.transform.script.WatcherTransformScript;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock scripting engine that allows for scripts used in the watcher cluster alert definitions to be parsed
+ * for the purposes of testing their creation and decommissioning. Their accuracy as watch definitions should
+ * be verified via some other avenue.
+ */
+public class MockClusterAlertScriptEngine extends MockPainlessScriptEngine {
+
+    /**
+     * The plugin that creates this mock script engine. Overrides the original mock engine to inject this
+     * implementation instead of the parent class.
+     */
+    public static class TestPlugin extends MockPainlessScriptEngine.TestPlugin {
+        @Override
+        public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) {
+            return new MockClusterAlertScriptEngine();
+        }
+    }
+
+    @Override
+    public  T compile(String name, String script, ScriptContext context, Map options) {
+        // Conditions - Default to false evaluation in testing
+        if (context.instanceClazz.equals(WatcherConditionScript.class)) {
+            return context.factoryClazz.cast(new MockWatcherConditionScript(MockDeterministicScript.asDeterministic(p -> false)));
+        }
+
+        // Transform - Return empty Map from the transform function
+        if (context.instanceClazz.equals(WatcherTransformScript.class)) {
+            return context.factoryClazz.cast(new MockWatcherTransformScript(MockDeterministicScript.asDeterministic(
+                p -> new HashMap())));
+        }
+
+        // We want to just add an allowance for watcher scripts, and to delegate everything else to the parent class.
+        return super.compile(name, script, context, options);
+    }
+
+    /**
+     * A mock watcher condition script that executes a given function instead of whatever the source provided was.
+     */
+    static class MockWatcherConditionScript implements WatcherConditionScript.Factory {
+
+        private final MockDeterministicScript script;
+
+        MockWatcherConditionScript(MockDeterministicScript script) {
+            this.script = script;
+        }
+
+        @Override
+        public WatcherConditionScript newInstance(Map params, WatchExecutionContext watcherContext) {
+            return new WatcherConditionScript(params, watcherContext) {
+                @Override
+                public boolean execute() {
+                    return (boolean) script.apply(getParams());
+                }
+            };
+        }
+    }
+
+    /**
+     * A mock watcher transformation script that performs a given function instead of whatever the source provided was.
+     */
+    static class MockWatcherTransformScript implements WatcherTransformScript.Factory {
+
+        private final MockDeterministicScript script;
+
+        MockWatcherTransformScript(MockDeterministicScript script) {
+            this.script = script;
+        }
+
+        @Override
+        public WatcherTransformScript newInstance(Map params, WatchExecutionContext watcherContext, Payload payload) {
+            return new WatcherTransformScript(params, watcherContext, payload) {
+                @Override
+                public Object execute() {
+                    return script.apply(getParams());
+                }
+            };
+        }
+    }
+}
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
index cf6c5fae9a501..ca932825ddc59 100644
--- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
@@ -24,7 +24,6 @@
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.monitoring.client.MonitoringClient;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
-import org.elasticsearch.xpack.core.monitoring.test.MockPainlessScriptEngine;
 import org.elasticsearch.xpack.monitoring.LocalStateMonitoring;
 import org.elasticsearch.xpack.monitoring.MonitoringService;
 import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
@@ -82,13 +81,13 @@ protected Collection> getMockPlugins() {
 
     @Override
     protected Collection> nodePlugins() {
-        return Arrays.asList(LocalStateMonitoring.class, MockPainlessScriptEngine.TestPlugin.class,
+        return Arrays.asList(LocalStateMonitoring.class, MockClusterAlertScriptEngine.TestPlugin.class,
                 MockIngestPlugin.class, CommonAnalysisPlugin.class);
     }
 
     @Override
     protected Collection> transportClientPlugins() {
-        return Arrays.asList(XPackClientPlugin.class, MockPainlessScriptEngine.TestPlugin.class,
+        return Arrays.asList(XPackClientPlugin.class, MockClusterAlertScriptEngine.TestPlugin.class,
                 MockIngestPlugin.class, CommonAnalysisPlugin.class);
     }
 
@@ -202,9 +201,7 @@ protected void waitForMonitoringIndices() throws Exception {
     }
 
     private void awaitIndexExists(final String index) throws Exception {
-        assertBusy(() -> {
-            assertIndicesExists(index);
-        }, 30, TimeUnit.SECONDS);
+        assertBusy(() -> assertIndicesExists(index), 30, TimeUnit.SECONDS);
     }
 
     private void assertIndicesExists(String... indices) {