Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Allow to execute actions for each element in array #41997

Merged
merged 24 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
54efef5
Watcher: Allow to execute actions for each element in array
spinscale May 9, 2019
6f26c7b
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale May 9, 2019
11a96e1
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale May 10, 2019
c3cb96d
fix test
spinscale May 10, 2019
d58818e
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale May 22, 2019
3d0ce81
fix path to use ctx by converting context to map
spinscale May 22, 2019
8a2af63
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale May 22, 2019
7bc27f0
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale May 29, 2019
46901c8
WIP
spinscale May 31, 2019
4e4ba21
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jun 19, 2019
db479d7
fix test
spinscale Jun 19, 2019
983e815
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jun 19, 2019
d4d0490
fix docs
spinscale Jun 19, 2019
1d9456c
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jun 19, 2019
66b284c
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jun 26, 2019
14dcc8d
review comments: limit runs to 100, doc updates
spinscale Jun 26, 2019
2073d46
review comment: lift restriction on passing a map
spinscale Jun 26, 2019
086fe24
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jul 1, 2019
95606e6
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jul 1, 2019
8029528
Fix tests
spinscale Jul 1, 2019
9c0a0ba
update template
spinscale Jul 1, 2019
edc19a8
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jul 2, 2019
b989cfb
remove tabs
spinscale Jul 2, 2019
c6d433e
Merge branch 'master' into 1809-add-foreach-loop-in-action
spinscale Jul 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions x-pack/docs/en/watcher/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,49 @@ of a watch during its execution:
image::images/action-throttling.jpg[align="center"]


[[action-foreach]]
=== Running an action for each element in an array

You can use the `foreach` field in an action to trigger the configured action
for every element within that array.

In order to protect from long running watches, after one hundred runs with an
foreach loop the execution is gracefully stopped.

[source,js]
--------------------------------------------------
PUT _watcher/watch/log_event_watch
{
"trigger" : {
"schedule" : { "interval" : "5m" }
},
"input" : {
"search" : {
"request" : {
"indices" : "log-events",
"body" : {
"query" : { "match" : { "status" : "error" } }
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 } }
},
"actions" : {
"log_hits" : {
"foreach" : "ctx.payload.hits.hits", <1>
"logging" : {
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
}
}
}
}
--------------------------------------------------
// CONSOLE

<1> The logging statement will be executed for each of the returned search hits.

[[action-conditions]]
=== Adding conditions to actions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.script.JodaCompatibleZonedDateTime;
import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler;
import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField;
Expand All @@ -30,29 +34,42 @@
import java.time.Clock;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

public class ActionWrapper implements ToXContentObject {

private final int MAXIMUM_FOREACH_RUNS = 100;

private String id;
@Nullable
private final ExecutableCondition condition;
@Nullable
private final ExecutableTransform<Transform, Transform.Result> transform;
private final ActionThrottler throttler;
private final ExecutableAction<? extends Action> action;
@Nullable
private String path;

public ActionWrapper(String id, ActionThrottler throttler,
@Nullable ExecutableCondition condition,
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
ExecutableAction<? extends Action> action) {
ExecutableAction<? extends Action> action,
@Nullable String path) {
this.id = id;
this.condition = condition;
this.throttler = throttler;
this.transform = transform;
this.action = action;
this.path = path;
}

public String id() {
Expand Down Expand Up @@ -140,16 +157,90 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
}
}
try {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
} catch (Exception e) {
action.logger().error(
if (Strings.isEmpty(path)) {
try {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
} catch (Exception e) {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
}
} else {
try {
List<Action.Result> results = new ArrayList<>();
Object object = ObjectPath.eval(path, toMap(ctx));
int runs = 0;
if (object instanceof Collection) {
Collection collection = Collection.class.cast(object);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an upper limit here ? I am fine with a hard coded 100 size (or something like that) just so that we don't allow this to be a DOS vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set it hundred, also included the number of executed actions in the JSON that gets written to watch history

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 seems quite arbitrary from my perspective. Any chance to make this an option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please open a new issue to make this configurable plus the rationale. The main goal here was to not let the watch run extremely long and block other watch executions, as well as its own executions.

Thank you!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed as #45169

if (collection.isEmpty()) {
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
} else {
for (Object o : collection) {
if (runs >= MAXIMUM_FOREACH_RUNS) {
break;
}
if (o instanceof Map) {
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
} else {
results.add(action.execute(id, ctx, new Payload.Simple("_value", o)));
}
runs++;
}
}
} else if (object == null) {
throw new ElasticsearchException("specified foreach object was null: [{}]", path);
} else {
throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path);
}

// check if we have mixed results, then set to partial failure
final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet());
Action.Result.Status status;
if (statuses.size() == 1) {
status = statuses.iterator().next();
} else {
status = Action.Result.Status.PARTIAL_FAILURE;
}

final int numberOfActionsExecuted = runs;
return new ActionWrapperResult(id, conditionResult, transformResult,
new Action.Result(action.type(), status) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("number_of_actions_executed", numberOfActionsExecuted);
builder.startArray(WatchField.FOREACH.getPreferredName());
for (Action.Result result : results) {
builder.startObject();
result.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}
});
} catch (Exception e) {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
}
}
}

private Map<String, Object> toMap(WatchExecutionContext ctx) {
Map<String, Object> model = new HashMap<>();
model.put("id", ctx.id().value());
model.put("watch_id", ctx.id().watchId());
model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC));
model.put("trigger", ctx.triggerEvent().data());
model.put("metadata", ctx.watch().metadata());
model.put("vars", ctx.vars());
if (ctx.payload().data() != null) {
model.put("payload", ctx.payload().data());
}
return Map.of("ctx", model);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -186,6 +277,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(transform.type(), transform, params)
.endObject();
}
if (Strings.isEmpty(path) == false) {
builder.field(WatchField.FOREACH.getPreferredName(), path);
}
builder.field(action.type(), action, params);
return builder.endObject();
}
Expand All @@ -198,6 +292,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
ExecutableCondition condition = null;
ExecutableTransform<Transform, Transform.Result> transform = null;
TimeValue throttlePeriod = null;
String path = null;
ExecutableAction<? extends Action> action = null;

String currentFieldName = null;
Expand All @@ -208,6 +303,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
} else {
if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) {
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
} else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) {
path = parser.text();
} else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) {
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
} else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -235,7 +332,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
}

ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
return new ActionWrapper(actionId, throttler, condition, transform, action);
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transfo
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null));
return this;
}

Expand All @@ -111,7 +111,13 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Conditi
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null));
return this;
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path,
Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path));
return this;
}

Expand Down Expand Up @@ -186,16 +192,18 @@ public final BytesReference buildAsBytes(XContentType contentType) {
static class TransformedAction implements ToXContentObject {

private final Action action;
@Nullable private String path;
@Nullable private final TimeValue throttlePeriod;
@Nullable private final Condition condition;
@Nullable private final Transform transform;

TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
@Nullable Condition condition, @Nullable Transform transform) {
@Nullable Condition condition, @Nullable Transform transform, @Nullable String path) {
this.throttlePeriod = throttlePeriod;
this.condition = condition;
this.transform = transform;
this.action = action;
this.path = path;
}

@Override
Expand All @@ -215,6 +223,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(transform.type(), transform, params)
.endObject();
}
if (path != null) {
builder.field("foreach", path);
}
builder.field(action.type(), action, params);
return builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 7: add full exception stack traces for better debugging
// version 8: fix slack attachment property not to be dynamic, causing field type issues
// version 9: add a user field defining which user executed the watch
// version 10: add support for foreach path in actions
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "9";
public static final String INDEX_TEMPLATE_VERSION = "10";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public final class WatchField {
public static final ParseField CONDITION = new ParseField("condition");
public static final ParseField ACTIONS = new ParseField("actions");
public static final ParseField TRANSFORM = new ParseField("transform");
public static final ParseField FOREACH = new ParseField("foreach");
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
public static final ParseField METADATA = new ParseField("metadata");
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@
"reason" : {
"type" : "keyword"
},
"number_of_actions_executed": {
"type": "integer"
},
"foreach" : {
"type": "object",
"enabled" : false
},
"email": {
"type": "object",
"dynamic": true,
Expand Down
Loading