diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index 226bddfdcaa8f..52bd5d732b640 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -198,8 +198,10 @@ image::images/action-throttling.jpg[align="center"] You can use the `foreach` field in an action to trigger the configured action for every element within that array. -In order to protect from long running watches, after one hundred runs with an -foreach loop the execution is gracefully stopped. +In order to protect from long running watches, you can use the `max_iterations` +field to limit the maximum amount of runs that each watch executes. If this limit +is reached, the execution is gracefully stopped. If not set, this field defaults +to one hundred. [source,js] -------------------------------------------------- @@ -224,6 +226,7 @@ PUT _watcher/watch/log_event_watch "actions" : { "log_hits" : { "foreach" : "ctx.payload.hits.hits", <1> + "max_iterations" : 500, "logging" : { "text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}" } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index 951f305edf5bc..7337e1e417763 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -47,8 +47,6 @@ public class ActionWrapper implements ToXContentObject { - private final int MAXIMUM_FOREACH_RUNS = 100; - private String id; @Nullable private final ExecutableCondition condition; @@ -58,18 +56,21 @@ public class ActionWrapper implements ToXContentObject { private final ExecutableAction action; @Nullable private String path; + private final Integer maxIterations; public ActionWrapper(String id, ActionThrottler throttler, @Nullable ExecutableCondition condition, @Nullable ExecutableTransform transform, ExecutableAction action, - @Nullable String path) { + @Nullable String path, + @Nullable Integer maxIterations) { this.id = id; this.condition = condition; this.throttler = throttler; this.transform = transform; this.action = action; this.path = path; + this.maxIterations = (maxIterations != null) ? maxIterations : 100; } public String id() { @@ -177,7 +178,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path); } else { for (Object o : collection) { - if (runs >= MAXIMUM_FOREACH_RUNS) { + if (runs >= maxIterations) { break; } if (o instanceof Map) { @@ -216,6 +217,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); } builder.endArray(); + builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations); return builder; } }); @@ -279,7 +281,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } if (Strings.isEmpty(path) == false) { builder.field(WatchField.FOREACH.getPreferredName(), path); + builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations); } + builder.field(action.type(), action, params); return builder.endObject(); } @@ -294,6 +298,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse TimeValue throttlePeriod = null; String path = null; ExecutableAction action = null; + Integer maxIterations = null; String currentFieldName = null; XContentParser.Token token; @@ -316,6 +321,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value", pe, watchId, actionId, currentFieldName); } + } else if (WatchField.MAX_ITERATIONS.match(currentFieldName, parser.getDeprecationHandler())) { + maxIterations = parser.intValue(); } else { // it's the type of the action ActionFactory actionFactory = actionRegistry.factory(currentFieldName); @@ -332,7 +339,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse } ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState); - return new ActionWrapper(actionId, throttler, condition, transform, action, path); + return new ActionWrapper(actionId, throttler, condition, transform, action, path, maxIterations); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java index 1bcb62447bf76..069794816c91e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java @@ -14,6 +14,7 @@ public final class WatchField { public static final ParseField ACTIONS = new ParseField("actions"); public static final ParseField TRANSFORM = new ParseField("transform"); public static final ParseField FOREACH = new ParseField("foreach"); + public static final ParseField MAX_ITERATIONS = new ParseField("max_iterations"); public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis"); public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period"); public static final ParseField METADATA = new ParseField("metadata"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index 8931319501b6d..e6825a0cac93e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -54,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase { private Watch watch = mock(Watch.class); @SuppressWarnings("unchecked") private ExecutableAction executableAction = mock(ExecutableAction.class); - private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null); + private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null, null); public void testThatUnmetActionConditionResetsAckStatus() throws Exception { WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED))); @@ -84,7 +84,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { final ExecutableAction executableAction = new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()); ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, - "ctx.payload.my_path"); + "ctx.payload.my_path", null); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", @@ -111,7 +111,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { public void testThatSpecifiedPathIsNotCollection() { ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, - "ctx.payload.my_path"); + "ctx.payload.my_path", null); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map")); when(ctx.payload()).thenReturn(payload); @@ -127,7 +127,7 @@ public void testThatSpecifiedPathIsNotCollection() { public void testEmptyCollection() { ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, - "ctx.payload.my_path"); + "ctx.payload.my_path", null); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList())); when(ctx.payload()).thenReturn(payload); @@ -143,7 +143,7 @@ public void testEmptyCollection() { public void testPartialFailure() throws Exception { ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, - "ctx.payload.my_path"); + "ctx.payload.my_path", null); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of( @@ -165,9 +165,9 @@ public void testPartialFailure() throws Exception { assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE)); } - public void testLimitOfNumberOfActionsExecuted() throws Exception { + public void testDefaultLimitOfNumberOfActionsExecuted() throws Exception { ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, - "ctx.payload.my_path"); + "ctx.payload.my_path", null); WatchExecutionContext ctx = mockExecutionContent(watch); List> itemsPayload = new ArrayList<>(); for (int i = 0; i < 101; i++) { @@ -193,11 +193,49 @@ public void testLimitOfNumberOfActionsExecuted() throws Exception { assertThat(map.get("foreach"), instanceOf(List.class)); List> actions = (List) map.get("foreach"); assertThat(actions, hasSize(100)); + assertThat(map, hasKey("max_iterations")); + assertThat(map.get("max_iterations"), is(100)); assertThat(map, hasKey("number_of_actions_executed")); assertThat(map.get("number_of_actions_executed"), is(100)); } } + public void testConfiguredLimitOfNumberOfActionsExecuted() throws Exception { + int randomMaxIterations = randomIntBetween(1, 1000); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path", randomMaxIterations); + WatchExecutionContext ctx = mockExecutionContent(watch); + List> itemsPayload = new ArrayList<>(); + for (int i = 0; i < randomMaxIterations + 1; i++) { + final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);; + final Payload singleItemPayload = new Payload.Simple(Map.of("key", String.valueOf(i))); + itemsPayload.add(Map.of("key", String.valueOf(i))); + when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult); + } + + Payload.Simple payload = new Payload.Simple(Map.of("my_path", itemsPayload)); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + + // check that action toXContent contains all the results + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + final String json = Strings.toString(builder); + final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); + assertThat(map, hasKey("foreach")); + assertThat(map.get("foreach"), instanceOf(List.class)); + List> actions = (List) map.get("foreach"); + assertThat(actions, hasSize(randomMaxIterations)); + assertThat(map, hasKey("max_iterations")); + assertThat(map.get("max_iterations"), is(randomMaxIterations)); + assertThat(map, hasKey("number_of_actions_executed")); + assertThat(map.get("number_of_actions_executed"), is(randomMaxIterations)); + } + } + private WatchExecutionContext mockExecutionContent(Watch watch) { WatchExecutionContext ctx = mock(WatchExecutionContext.class); when(watch.id()).thenReturn("watchId"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 53f060b22c3e1..d42ddfeb2fe2c 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -227,7 +227,7 @@ public void testExecute() throws Exception { when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -313,7 +313,7 @@ public void testExecuteFailedInput() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -378,7 +378,7 @@ public void testExecuteFailedCondition() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -442,7 +442,7 @@ public void testExecuteFailedWatchTransform() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -520,7 +520,7 @@ public void testExecuteFailedActionTransform() throws Exception { when(action.logger()).thenReturn(logger); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -600,7 +600,7 @@ public void testExecuteInner() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -649,7 +649,7 @@ public void testExecuteInnerThrottled() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -712,7 +712,7 @@ public void testExecuteInnerConditionNotMet() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -769,7 +769,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); when(action.logger()).thenReturn(logger); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -817,7 +817,7 @@ public void testExecuteConditionNotMet() throws Exception { ExecutableCondition actionCondition = mock(ExecutableCondition.class); ExecutableTransform actionTransform = mock(ExecutableTransform.class); ExecutableAction action = mock(ExecutableAction.class); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -946,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index aa1231baa17a4..c10f010f98ef3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -158,7 +158,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}")); httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits")); actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()), - logger, httpClient, engine), null)); + logger, httpClient, engine), null, null)); EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build(); @@ -166,7 +166,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null); ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine, new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap()); - actions.add(new ActionWrapper("_email", null, null, null, executale, null)); + actions.add(new ActionWrapper("_email", null, null, null, executale, null, null)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); Map statuses = new HashMap<>(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index feadeba084d3e..e810c14615b73 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -441,7 +441,7 @@ public void testParseWatchWithoutTriggerDoesNotWork() throws Exception { private WatchParser createWatchparser() throws Exception { LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null); List actions = Collections.singletonList(new ActionWrapper("_logging_", randomThrottler(), null, null, - new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()), null)); + new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()), null, null)); ScheduleRegistry scheduleRegistry = registry(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.SECONDS))); @@ -585,7 +585,8 @@ private List randomActions() { randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS); list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()), null)); + new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, + Collections.emptyMap()), null, null)); } if (randomBoolean()) { ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null; @@ -596,7 +597,7 @@ private List randomActions() { list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)), null)); + TimeValue.timeValueSeconds(30)), null, null)); } if (randomBoolean()) { HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000)) @@ -606,7 +607,7 @@ private List randomActions() { WebhookAction action = new WebhookAction(httpRequest); list.add(new ActionWrapper("_webhook_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableWebhookAction(action, logger, httpClient, templateEngine), null)); + new ExecutableWebhookAction(action, logger, httpClient, templateEngine), null, null)); } return list; }