diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index c3589e4dc26dc..bdcef93e71fa9 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -82,6 +82,9 @@ public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin public static final Setting CLEAN_WATCHER_HISTORY = boolSetting("xpack.watcher.history.cleaner_service.enabled", true, Setting.Property.Dynamic, Setting.Property.NodeScope, Setting.Property.Deprecated); + public static final Setting MIGRATION_DECOMMISSION_ALERTS = boolSetting("xpack.monitoring.migration.decommission_alerts", + false, Setting.Property.Dynamic, Setting.Property.NodeScope); + protected final Settings settings; private final boolean transportClientMode; @@ -164,6 +167,7 @@ public List> getSettings() { List> settings = new ArrayList<>(); settings.add(MonitoringField.HISTORY_DURATION); settings.add(CLEAN_WATCHER_HISTORY); + settings.add(MIGRATION_DECOMMISSION_ALERTS); settings.add(MonitoringService.ENABLED); settings.add(MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED); settings.add(MonitoringService.INTERVAL); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java index 7cff593def80a..7e1df36b4ba0b 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java @@ -23,6 +23,7 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; @@ -62,7 +63,9 @@ public Exporters(Settings settings, Map factories, final List> dynamicSettings = getSettings().stream().filter(Setting::isDynamic).collect(Collectors.toList()); - clusterService.getClusterSettings().addSettingsUpdateConsumer(this::setExportersSetting, dynamicSettings); + final List> updateSettings = new ArrayList>(dynamicSettings); + updateSettings.add(Monitoring.MIGRATION_DECOMMISSION_ALERTS); + clusterService.getClusterSettings().addSettingsUpdateConsumer(this::setExportersSetting, updateSettings); HttpExporter.registerSettingValidators(clusterService, sslService); // this ensures that logging is happening by adding an empty consumer per affix setting for (Setting.AffixSetting affixSetting : dynamicSettings) { diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java index 8699aad3fa5e0..2fdf80e5d0484 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLConfigurationSettings; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; @@ -908,7 +909,7 @@ private static void configureClusterAlertsResources(final Config config, final S // add a resource per watch for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { - final boolean blacklisted = blacklist.contains(watchId); + final boolean blacklisted = blacklist.contains(watchId) || Monitoring.MIGRATION_DECOMMISSION_ALERTS.get(config.settings()); // lazily load the cluster state to fetch the cluster UUID once it's loaded final Supplier uniqueWatchId = () -> ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId); final Supplier watch = blacklisted ? null : () -> ClusterAlertsUtil.loadWatch(clusterService, watchId); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java index 846c6f81ba8db..7e159d8179f41 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; @@ -105,6 +106,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle private final boolean useIngest; private final DateFormatter dateTimeFormatter; private final List clusterAlertBlacklist; + private final boolean decommissionClusterAlerts; private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final AtomicBoolean installingSomething = new AtomicBoolean(false); @@ -120,6 +122,7 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean this.licenseState = config.licenseState(); this.useIngest = USE_INGEST_PIPELINE_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings()); this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config); + this.decommissionClusterAlerts = Monitoring.MIGRATION_DECOMMISSION_ALERTS.get(config.settings()); this.cleanerService = cleanerService; this.dateTimeFormatter = dateTimeFormatter(config); // if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly @@ -158,8 +161,10 @@ public void licenseStateChanged() { boolean isExporterReady() { // forces the setup to occur if it hasn't already final boolean running = resolveBulk(clusterService.state(), false) != null; + // Report on watcher readiness + boolean alertsProcessed = canUseWatcher() == false || watcherSetup.get(); - return running && installingSomething.get() == false; + return running && installingSomething.get() == false && alertsProcessed; } @Override @@ -455,7 +460,8 @@ private void getClusterAlertsInstallationAsyncActions(final boolean indexExists, for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId); - final boolean addWatch = canAddWatches && clusterAlertBlacklist.contains(watchId) == false; + final boolean addWatch = canAddWatches && clusterAlertBlacklist.contains(watchId) == false && + decommissionClusterAlerts == false; // we aren't sure if no watches exist yet, so add them if (indexExists) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java index 1b1947f76c613..c9b8fb197d8e6 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java @@ -102,7 +102,11 @@ public void setupResources() { } public void awaitCheckAndPublish(final Boolean expected) { - resources.checkAndPublish(client, listener); + awaitCheckAndPublish(resources, expected); + } + + public void awaitCheckAndPublish(HttpResource resource, final Boolean expected) { + resource.checkAndPublish(client, listener); verifyListener(expected); } @@ -484,6 +488,56 @@ public void testWatchPublishBlocksAfterSuccessfulWatcherCheck() { verifyNoMoreInteractions(client); } + public void testDeployClusterAlerts() { + final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES); + final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates; + final int successfulGetPipelines = randomIntBetween(0, EXPECTED_PIPELINES); + final int unsuccessfulGetPipelines = EXPECTED_PIPELINES - successfulGetPipelines; + final Exception exception = failurePutException(); + + whenValidVersionResponse(); + whenGetTemplates(successfulGetTemplates, unsuccessfulGetTemplates); + whenSuccessfulPutTemplates(unsuccessfulGetTemplates); + whenGetPipelines(successfulGetPipelines, unsuccessfulGetPipelines); + whenSuccessfulPutPipelines(unsuccessfulGetPipelines); + // license needs to be valid, otherwise we'll do DELETEs, which are tested earlier + whenWatcherCanBeUsed(true); + + // a number of watches are mocked as present + final int existingWatches = randomIntBetween(0, EXPECTED_WATCHES); + + // For completeness's sake. GET/PUT watches wont be called by the resources. + // Instead it tries to DELETE the watches ignoring them not existing. + whenGetWatches(existingWatches, EXPECTED_WATCHES - existingWatches); + whenPerformRequestAsyncWith(client, new RequestMatcher(is("PUT"), startsWith("/_watcher/watch/")), exception); + whenPerformRequestAsyncWith(client, new RequestMatcher(is("DELETE"), startsWith("/_watcher/watch/")), + successfulDeleteResponses(EXPECTED_WATCHES)); + + // Create resources that are configured to remove all watches + Settings removalExporterSettings = Settings.builder() + .put(exporterSettings) + .put("xpack.monitoring.migration.decommission_alerts", true) + .build(); + MultiHttpResource overrideResource = HttpExporter.createResources( + new Exporter.Config("_http", "http", removalExporterSettings, clusterService, licenseState)); + + assertTrue(overrideResource.isDirty()); + awaitCheckAndPublish(overrideResource, true); + // Should proceed + assertFalse(overrideResource.isDirty()); + + verifyVersionCheck(); + verifyGetTemplates(EXPECTED_TEMPLATES); + verifyPutTemplates(unsuccessfulGetTemplates); + verifyGetPipelines(EXPECTED_PIPELINES); + verifyPutPipelines(unsuccessfulGetPipelines); + verifyWatcherCheck(); + verifyGetWatches(0); + verifyPutWatches(0); + verifyDeleteWatches(EXPECTED_WATCHES); + verifyNoMoreInteractions(client); + } + public void testSuccessfulChecksOnElectedMasterNode() { final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES); final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java index acc730f0e3dd3..af5c86ab0b085 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java @@ -32,7 +32,7 @@ public static void setupThreadPool() { } @AfterClass - public static void cleanUpStatic() throws Exception { + public static void cleanUpStatic() { if (THREADPOOL != null) { terminate(THREADPOOL); } @@ -57,6 +57,15 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + /** + * Create a new {@link LocalExporter} with the default exporter settings and name. + * + * @return Never {@code null}. + */ + protected LocalExporter createLocalExporter() { + return createLocalExporter(exporterName, null); + } + /** * Create a new {@link LocalExporter}. Expected usage: *

@@ -68,12 +77,11 @@ protected Settings nodeSettings(int nodeOrdinal) {
      *
      * @return Never {@code null}.
      */
-    protected LocalExporter createLocalExporter() {
-        final Settings settings = localExporterSettings();
+    protected LocalExporter createLocalExporter(String exporterName, Settings exporterSettings) {
         final XPackLicenseState licenseState = TestUtils.newTestLicenseState();
-        final Exporter.Config config = new Exporter.Config(exporterName, "local", settings, clusterService(), licenseState);
+        final Exporter.Config config = new Exporter.Config(exporterName, "local", exporterSettings, clusterService(), licenseState);
         final CleanerService cleanerService =
-                new CleanerService(settings, clusterService().getClusterSettings(), THREADPOOL, licenseState);
+            new CleanerService(exporterSettings, clusterService().getClusterSettings(), THREADPOOL, licenseState);
 
         return new LocalExporter(config, client(), cleanerService);
     }
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
index 8dab66e810843..44c74c7cbfcb2 100644
--- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java
@@ -5,18 +5,32 @@
  */
 package org.elasticsearch.xpack.monitoring.exporter.local;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.ingest.PipelineConfiguration;
+import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
+import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction;
 import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
@@ -31,10 +45,18 @@
                               numDataNodes = 1, numClientNodes = 0, transportClientRatio = 0.0, supportsDedicatedMasters = false)
 public class LocalExporterResourceIntegTests extends LocalExporterIntegTestCase {
 
-    public LocalExporterResourceIntegTests() throws Exception {
+    public LocalExporterResourceIntegTests() {
         super();
     }
 
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put("xpack.license.self_generated.type", "trial")
+            .build();
+    }
+
     private final MonitoredSystem system = randomFrom(MonitoredSystem.values());
 
     public void testCreateWhenResourcesNeedToBeAddedOrUpdated() throws Exception {
@@ -56,12 +78,45 @@ public void testCreateWhenResourcesShouldNotBeReplaced() throws Exception {
         assertPipelinesNotUpdated();
     }
 
+    public void testRemoveWhenResourcesShouldBeRemoved() throws Exception {
+        putResources(newEnoughVersion());
+
+        assertResourcesExist();
+        waitNoPendingTasksOnAll();
+
+        Settings exporterSettings = Settings.builder().put(localExporterSettings())
+            .put("xpack.monitoring.migration.decommission_alerts", true).build();
+
+        createResources("decommission_local", exporterSettings);
+        waitNoPendingTasksOnAll();
+        assertBusy(() -> {
+            assertTemplatesExist();
+            assertPipelinesExist();
+            assertNoWatchesExist();
+        });
+
+    }
+
+    @Override
+    protected Settings localExporterSettings() {
+        // Override the settings for local exporters created in this test, make sure watcher is enabled so we can test
+        // cluster alert creation and decommissioning
+        return Settings.builder()
+            .put(super.localExporterSettings())
+            .put("xpack.monitoring.exporters." + exporterName +  ".cluster_alerts.management.enabled", true)
+            .build();
+    }
+
     private void createResources() throws Exception {
+        createResources(exporterName, localExporterSettings());
+    }
+
+    private void createResources(String exporterName, Settings exporterSettings) throws Exception {
         // wait until the cluster is ready (this is done at the "Exporters" level)
         // this is not a busy assertion because it's checked earlier
         assertThat(clusterService().state().version(), not(ClusterState.UNKNOWN_VERSION));
 
-        try (LocalExporter exporter = createLocalExporter()) {
+        try (LocalExporter exporter = createLocalExporter(exporterName, exporterSettings)) {
             assertBusy(() -> assertThat(exporter.isExporterReady(), is(true)));
         }
     }
@@ -102,6 +157,7 @@ private void putResources(final Integer version) throws Exception {
 
         putTemplate(version);
         putPipelines(version);
+        putWatches(version);
     }
 
     private void putTemplate(final Integer version) throws Exception {
@@ -111,13 +167,13 @@ private void putTemplate(final Integer version) throws Exception {
         assertAcked(client().admin().indices().preparePutTemplate(templateName).setSource(source, XContentType.JSON).get());
     }
 
-    private void putPipelines(final Integer version) throws Exception {
+    private void putPipelines(final Integer version) {
         for (final String pipelineId : MonitoringTemplateUtils.PIPELINE_IDS) {
             putPipeline(MonitoringTemplateUtils.pipelineName(pipelineId), version);
         }
     }
 
-    private void putPipeline(final String pipelineName, final Integer version) throws Exception {
+    private void putPipeline(final String pipelineName, final Integer version) {
         assertAcked(client().admin().cluster().preparePutPipeline(pipelineName, replaceablePipeline(version), XContentType.JSON).get());
     }
 
@@ -150,6 +206,55 @@ private BytesReference replaceablePipeline(final Integer version) {
         }
     }
 
+    /**
+     * Create a cluster alert that does nothing.
+     * @param version Version to add to the watch, if any
+     */
+    private void putWatches(final Integer version) throws Exception {
+        for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
+            final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService(), watchId);
+            final BytesReference watch = generateWatchSource(watchId, clusterService().state().metadata().clusterUUID(), version);
+            client().execute(PutWatchAction.INSTANCE, new PutWatchRequest(uniqueWatchId, watch, XContentType.JSON))
+                .actionGet();
+        }
+    }
+
+    /**
+     * Generates a basic watch that loosely represents a monitoring cluster alert but ultimately does nothing.
+     */
+    private static BytesReference generateWatchSource(final String id, final String clusterUUID, final Integer version) throws Exception {
+        final XContentBuilder builder = jsonBuilder().startObject();
+        builder
+            .startObject("metadata")
+                .startObject("xpack")
+                    .field("cluster_uuid", clusterUUID);
+                    if(version != null) {
+                        builder.field("version_created", Integer.toString(version));
+                    }
+        builder
+                    .field("watch", id)
+                .endObject()
+            .endObject()
+            .startObject("trigger")
+                .startObject("schedule")
+                    .field("interval", "30m")
+                .endObject()
+            .endObject()
+            .startObject("input")
+                .startObject("simple")
+                    .field("ignore", "ignore")
+                .endObject()
+            .endObject()
+            .startObject("condition")
+                .startObject("never")
+                .endObject()
+            .endObject()
+            .startObject("actions")
+            .endObject();
+
+        return BytesReference.bytes(builder.endObject());
+    }
+
     private Integer oldVersion() {
         final int minimumVersion = Math.min(ClusterAlertsUtil.LAST_UPDATED_VERSION, MonitoringTemplateUtils.LAST_UPDATED_VERSION);
 
@@ -179,6 +284,51 @@ private void assertPipelinesExist() {
         }
     }
 
+    private void assertWatchesExist() {
+        // Check if watches index exists
+        if (client().admin().indices().prepareGetIndex().addIndices(".watches").get().getIndices().length == 0) {
+            fail("Expected [.watches] index with cluster alerts present, but no [.watches] index was found");
+        }
+
+        String clusterUUID = clusterService().state().getMetadata().clusterUUID();
+        SearchSourceBuilder searchSource = SearchSourceBuilder.searchSource()
+            .query(QueryBuilders.matchQuery("metadata.xpack.cluster_uuid", clusterUUID));
+        Set watchIds = new HashSet<>(Arrays.asList(ClusterAlertsUtil.WATCH_IDS));
+        for (SearchHit hit : client().prepareSearch(".watches").setSource(searchSource).get().getHits().getHits()) {
+            String watchId = ObjectPath.eval("metadata.xpack.watch", hit.getSourceAsMap());
+            assertNotNull("Missing watch ID", watchId);
+            assertTrue("found unexpected watch id", watchIds.contains(watchId));
+
+            String version = ObjectPath.eval("metadata.xpack.version_created", hit.getSourceAsMap());
+            assertNotNull("Missing version from returned watch [" + watchId + "]", version);
+            assertTrue(Version.fromId(Integer.parseInt(version)).onOrAfter(Version.fromId(ClusterAlertsUtil.LAST_UPDATED_VERSION)));
+
+            String uuid = ObjectPath.eval("metadata.xpack.cluster_uuid", hit.getSourceAsMap());
+            assertNotNull("Missing cluster uuid", uuid);
+            assertEquals(clusterUUID, uuid);
+        }
+    }
+
+    private void assertNoWatchesExist() {
+        // Check if watches index exists
+        if (client().admin().indices().prepareGetIndex().addIndices(".watches").get().getIndices().length == 0) {
+            fail("Expected [.watches] index with cluster alerts present, but no [.watches] index was found");
+        }
+
+        String clusterUUID = clusterService().state().getMetadata().clusterUUID();
+        SearchSourceBuilder searchSource = SearchSourceBuilder.searchSource()
+            .query(QueryBuilders.matchQuery("metadata.xpack.cluster_uuid", clusterUUID));
+        SearchResponse searchResponse = client().prepareSearch(".watches").setSource(searchSource).get();
+        if (searchResponse.getHits().getTotalHits().value > 0) {
+            List invalidWatches = new ArrayList<>();
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                invalidWatches.add(ObjectPath.eval("metadata.xpack.watch", hit.getSourceAsMap()));
+            }
+            fail("Found [" + searchResponse.getHits().getTotalHits().value + "] invalid watches when none were expected: "
+                + invalidWatches);
+        }
+    }
+
     private void assertResourcesExist() throws Exception {
         createResources();
 
@@ -187,6 +337,7 @@ private void assertResourcesExist() throws Exception {
         assertBusy(() -> {
             assertTemplatesExist();
             assertPipelinesExist();
+            assertWatchesExist();
         });
     }
 
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java
new file mode 100644
index 0000000000000..77860f20d5e78
--- /dev/null
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockClusterAlertScriptEngine.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.test;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.script.MockDeterministicScript;
+import org.elasticsearch.script.ScriptContext;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.xpack.core.monitoring.test.MockPainlessScriptEngine;
+import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
+import org.elasticsearch.xpack.core.watcher.watch.Payload;
+import org.elasticsearch.xpack.watcher.condition.WatcherConditionScript;
+import org.elasticsearch.xpack.watcher.transform.script.WatcherTransformScript;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock scripting engine that allows for scripts used in the watcher cluster alert definitions to be parsed
+ * for the purposes of testing their creation and decommissioning. Their accuracy as watch definitions should
+ * be verified via some other avenue.
+ */
+public class MockClusterAlertScriptEngine extends MockPainlessScriptEngine {
+
+    /**
+     * The plugin that creates this mock script engine. Overrides the original mock engine to inject this
+     * implementation instead of the parent class.
+     */
+    public static class TestPlugin extends MockPainlessScriptEngine.TestPlugin {
+        @Override
+        public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) {
+            return new MockClusterAlertScriptEngine();
+        }
+    }
+
+    @Override
+    public  T compile(String name, String script, ScriptContext context, Map options) {
+        // Conditions - Default to false evaluation in testing
+        if (context.instanceClazz.equals(WatcherConditionScript.class)) {
+            return context.factoryClazz.cast(new MockWatcherConditionScript(MockDeterministicScript.asDeterministic(p -> false)));
+        }
+
+        // Transform - Return empty Map from the transform function
+        if (context.instanceClazz.equals(WatcherTransformScript.class)) {
+            return context.factoryClazz.cast(new MockWatcherTransformScript(MockDeterministicScript.asDeterministic(
+                p -> new HashMap())));
+        }
+
+        // We want to just add an allowance for watcher scripts, and to delegate everything else to the parent class.
+        return super.compile(name, script, context, options);
+    }
+
+    /**
+     * A mock watcher condition script that executes a given function instead of whatever the source provided was.
+     */
+    static class MockWatcherConditionScript implements WatcherConditionScript.Factory {
+
+        private final MockDeterministicScript script;
+
+        MockWatcherConditionScript(MockDeterministicScript script) {
+            this.script = script;
+        }
+
+        @Override
+        public WatcherConditionScript newInstance(Map params, WatchExecutionContext watcherContext) {
+            return new WatcherConditionScript(params, watcherContext) {
+                @Override
+                public boolean execute() {
+                    return (boolean) script.apply(getParams());
+                }
+            };
+        }
+    }
+
+    /**
+     * A mock watcher transformation script that performs a given function instead of whatever the source provided was.
+     */
+    static class MockWatcherTransformScript implements WatcherTransformScript.Factory {
+
+        private final MockDeterministicScript script;
+
+        MockWatcherTransformScript(MockDeterministicScript script) {
+            this.script = script;
+        }
+
+        @Override
+        public WatcherTransformScript newInstance(Map params, WatchExecutionContext watcherContext, Payload payload) {
+            return new WatcherTransformScript(params, watcherContext, payload) {
+                @Override
+                public Object execute() {
+                    return script.apply(getParams());
+                }
+            };
+        }
+    }
+}
diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
index cf6c5fae9a501..ca932825ddc59 100644
--- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
+++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java
@@ -24,7 +24,6 @@
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.monitoring.client.MonitoringClient;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
-import org.elasticsearch.xpack.core.monitoring.test.MockPainlessScriptEngine;
 import org.elasticsearch.xpack.monitoring.LocalStateMonitoring;
 import org.elasticsearch.xpack.monitoring.MonitoringService;
 import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
@@ -82,13 +81,13 @@ protected Collection> getMockPlugins() {
 
     @Override
     protected Collection> nodePlugins() {
-        return Arrays.asList(LocalStateMonitoring.class, MockPainlessScriptEngine.TestPlugin.class,
+        return Arrays.asList(LocalStateMonitoring.class, MockClusterAlertScriptEngine.TestPlugin.class,
                 MockIngestPlugin.class, CommonAnalysisPlugin.class);
     }
 
     @Override
     protected Collection> transportClientPlugins() {
-        return Arrays.asList(XPackClientPlugin.class, MockPainlessScriptEngine.TestPlugin.class,
+        return Arrays.asList(XPackClientPlugin.class, MockClusterAlertScriptEngine.TestPlugin.class,
                 MockIngestPlugin.class, CommonAnalysisPlugin.class);
     }
 
@@ -202,9 +201,7 @@ protected void waitForMonitoringIndices() throws Exception {
     }
 
     private void awaitIndexExists(final String index) throws Exception {
-        assertBusy(() -> {
-            assertIndicesExists(index);
-        }, 30, TimeUnit.SECONDS);
+        assertBusy(() -> assertIndicesExists(index), 30, TimeUnit.SECONDS);
     }
 
     private void assertIndicesExists(String... indices) {