Skip to content

Commit

Permalink
Watcher max_iterations with foreach action execution (#45715)
Browse files Browse the repository at this point in the history
Prior to this commit the foreach action execution had a hard coded 
limit to 100 iterations. This commit allows the max number of 
iterations to be a configuration ('max_iterations') on the foreach 
action. The default remains 100.
  • Loading branch information
jbonn360 authored and jakelandis committed Aug 27, 2019
1 parent e85dcb4 commit 859e387
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 31 deletions.
7 changes: 5 additions & 2 deletions x-pack/docs/en/watcher/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ image::images/action-throttling.jpg[align="center"]
You can use the `foreach` field in an action to trigger the configured action
for every element within that array.

In order to protect from long running watches, after one hundred runs with an
foreach loop the execution is gracefully stopped.
In order to protect from long running watches, you can use the `max_iterations`
field to limit the maximum amount of runs that each watch executes. If this limit
is reached, the execution is gracefully stopped. If not set, this field defaults
to one hundred.

[source,js]
--------------------------------------------------
Expand All @@ -224,6 +226,7 @@ PUT _watcher/watch/log_event_watch
"actions" : {
"log_hits" : {
"foreach" : "ctx.payload.hits.hits", <1>
"max_iterations" : 500,
"logging" : {
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@

public class ActionWrapper implements ToXContentObject {

private final int MAXIMUM_FOREACH_RUNS = 100;

private String id;
@Nullable
private final ExecutableCondition condition;
Expand All @@ -58,18 +56,21 @@ public class ActionWrapper implements ToXContentObject {
private final ExecutableAction<? extends Action> action;
@Nullable
private String path;
private final Integer maxIterations;

public ActionWrapper(String id, ActionThrottler throttler,
@Nullable ExecutableCondition condition,
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
ExecutableAction<? extends Action> action,
@Nullable String path) {
@Nullable String path,
@Nullable Integer maxIterations) {
this.id = id;
this.condition = condition;
this.throttler = throttler;
this.transform = transform;
this.action = action;
this.path = path;
this.maxIterations = (maxIterations != null) ? maxIterations : 100;
}

public String id() {
Expand Down Expand Up @@ -177,7 +178,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
} else {
for (Object o : collection) {
if (runs >= MAXIMUM_FOREACH_RUNS) {
if (runs >= maxIterations) {
break;
}
if (o instanceof Map) {
Expand Down Expand Up @@ -216,6 +217,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
}
builder.endArray();
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
return builder;
}
});
Expand Down Expand Up @@ -279,7 +281,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
if (Strings.isEmpty(path) == false) {
builder.field(WatchField.FOREACH.getPreferredName(), path);
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
}

builder.field(action.type(), action, params);
return builder.endObject();
}
Expand All @@ -294,6 +298,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
TimeValue throttlePeriod = null;
String path = null;
ExecutableAction<? extends Action> action = null;
Integer maxIterations = null;

String currentFieldName = null;
XContentParser.Token token;
Expand All @@ -316,6 +321,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value",
pe, watchId, actionId, currentFieldName);
}
} else if (WatchField.MAX_ITERATIONS.match(currentFieldName, parser.getDeprecationHandler())) {
maxIterations = parser.intValue();
} else {
// it's the type of the action
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);
Expand All @@ -332,7 +339,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
}

ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
return new ActionWrapper(actionId, throttler, condition, transform, action, path, maxIterations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public final class WatchField {
public static final ParseField ACTIONS = new ParseField("actions");
public static final ParseField TRANSFORM = new ParseField("transform");
public static final ParseField FOREACH = new ParseField("foreach");
public static final ParseField MAX_ITERATIONS = new ParseField("max_iterations");
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
public static final ParseField METADATA = new ParseField("metadata");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase {
private Watch watch = mock(Watch.class);
@SuppressWarnings("unchecked")
private ExecutableAction<Action> executableAction = mock(ExecutableAction.class);
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null);
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null, null);

public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
final ExecutableAction<LoggingAction> executableAction =
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine());
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);

WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
Expand All @@ -111,7 +111,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {

public void testThatSpecifiedPathIsNotCollection() {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map"));
when(ctx.payload()).thenReturn(payload);
Expand All @@ -127,7 +127,7 @@ public void testThatSpecifiedPathIsNotCollection() {

public void testEmptyCollection() {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList()));
when(ctx.payload()).thenReturn(payload);
Expand All @@ -143,7 +143,7 @@ public void testEmptyCollection() {

public void testPartialFailure() throws Exception {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
List.of(
Expand All @@ -165,9 +165,9 @@ public void testPartialFailure() throws Exception {
assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE));
}

public void testLimitOfNumberOfActionsExecuted() throws Exception {
public void testDefaultLimitOfNumberOfActionsExecuted() throws Exception {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
List<Map<String, String>> itemsPayload = new ArrayList<>();
for (int i = 0; i < 101; i++) {
Expand All @@ -193,11 +193,49 @@ public void testLimitOfNumberOfActionsExecuted() throws Exception {
assertThat(map.get("foreach"), instanceOf(List.class));
List<Map<String, Object>> actions = (List) map.get("foreach");
assertThat(actions, hasSize(100));
assertThat(map, hasKey("max_iterations"));
assertThat(map.get("max_iterations"), is(100));
assertThat(map, hasKey("number_of_actions_executed"));
assertThat(map.get("number_of_actions_executed"), is(100));
}
}

public void testConfiguredLimitOfNumberOfActionsExecuted() throws Exception {
int randomMaxIterations = randomIntBetween(1, 1000);
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path", randomMaxIterations);
WatchExecutionContext ctx = mockExecutionContent(watch);
List<Map<String, String>> itemsPayload = new ArrayList<>();
for (int i = 0; i < randomMaxIterations + 1; i++) {
final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);;
final Payload singleItemPayload = new Payload.Simple(Map.of("key", String.valueOf(i)));
itemsPayload.add(Map.of("key", String.valueOf(i)));
when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult);
}

Payload.Simple payload = new Payload.Simple(Map.of("my_path", itemsPayload));
when(ctx.payload()).thenReturn(payload);
when(executableAction.logger()).thenReturn(logger);

ActionWrapperResult result = wrapper.execute(ctx);
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));

// check that action toXContent contains all the results
try (XContentBuilder builder = jsonBuilder()) {
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
final String json = Strings.toString(builder);
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
assertThat(map, hasKey("foreach"));
assertThat(map.get("foreach"), instanceOf(List.class));
List<Map<String, Object>> actions = (List) map.get("foreach");
assertThat(actions, hasSize(randomMaxIterations));
assertThat(map, hasKey("max_iterations"));
assertThat(map.get("max_iterations"), is(randomMaxIterations));
assertThat(map, hasKey("number_of_actions_executed"));
assertThat(map.get("number_of_actions_executed"), is(randomMaxIterations));
}
}

private WatchExecutionContext mockExecutionContent(Watch watch) {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(watch.id()).thenReturn("watchId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testExecute() throws Exception {
when(action.type()).thenReturn("MY_AWESOME_TYPE");
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -313,7 +313,7 @@ public void testExecuteFailedInput() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -378,7 +378,7 @@ public void testExecuteFailedCondition() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testExecuteFailedWatchTransform() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -520,7 +520,7 @@ public void testExecuteFailedActionTransform() throws Exception {
when(action.logger()).thenReturn(logger);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -600,7 +600,7 @@ public void testExecuteInner() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -649,7 +649,7 @@ public void testExecuteInnerThrottled() throws Exception {

ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -712,7 +712,7 @@ public void testExecuteInnerConditionNotMet() throws Exception {

ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -769,7 +769,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
when(action.logger()).thenReturn(logger);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -817,7 +817,7 @@ public void testExecuteConditionNotMet() throws Exception {
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
Expand Down Expand Up @@ -946,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
when(action.type()).thenReturn("MY_AWESOME_TYPE");
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down
Loading

0 comments on commit 859e387

Please sign in to comment.