Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Fix handling of "match all"/"match either"
Browse files Browse the repository at this point in the history
The pipeline interpreter had a bug regarding the handling of the "match all" and "match either"
statements which caused pipelines containing stages with "match all" to continue processing even
if not all rules in the stage were executed.

Fixes Graylog2/graylog2-server#3924
  • Loading branch information
Jochen Schalanda committed Jun 29, 2017
1 parent 5b6e676 commit 3215eda
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.graylog.plugins.pipelineprocessor.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -27,11 +30,6 @@
import com.google.common.collect.Sets;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
Expand All @@ -54,17 +52,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;

import static com.codahale.metrics.MetricRegistry.name;
import static org.jooq.lambda.tuple.Tuple.tuple;

Expand Down Expand Up @@ -120,8 +119,8 @@ public Messages process(Messages messages, InterpreterListener interpreterListen
// message id + stream id
final Set<Tuple2<String, String>> processingBlacklist = Sets.newHashSet();

final List<Message> fullyProcessed = Lists.newArrayList();
List<Message> toProcess = Lists.newArrayList(messages);
final List<Message> toProcess = Lists.newArrayList(messages);
final List<Message> fullyProcessed = Lists.newArrayListWithExpectedSize(toProcess.size());

while (!toProcess.isEmpty()) {
final MessageCollection currentSet = new MessageCollection(toProcess);
Expand Down Expand Up @@ -208,9 +207,9 @@ private ImmutableSet<Pipeline> selectPipelines(InterpreterListener interpreterLi
.filter(streamId -> !processingBlacklist.contains(tuple(msgId, streamId)))
.filter(streamConnection::containsKey)
.collect(Collectors.toSet());
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream()
final ImmutableSet<Pipeline> pipelinesToRun = streamsIds.stream()
.flatMap(streamId -> streamConnection.get(streamId).stream())
.collect(Collectors.toSet()));
.collect(ImmutableSet.toImmutableSet());
interpreterListener.processStreams(message, pipelinesToRun, streamsIds);
log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds);
return pipelinesToRun;
Expand All @@ -235,11 +234,11 @@ public List<Message> processForPipelines(Message message,
Set<String> pipelineIds,
InterpreterListener interpreterListener,
State state) {
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(pipelineIds
.stream()
.map(pipelineId -> state.getCurrentPipelines().get(pipelineId))
.filter(pipeline -> pipeline != null)
.collect(Collectors.toSet()));
final Map<String, Pipeline> currentPipelines = state.getCurrentPipelines();
final ImmutableSet<Pipeline> pipelinesToRun = pipelineIds.stream()
.map(currentPipelines::get)
.filter(Objects::nonNull)
.collect(ImmutableSet.toImmutableSet());

return processForResolvedPipelines(message, message.getId(), pipelinesToRun, interpreterListener, state);
}
Expand Down Expand Up @@ -292,24 +291,30 @@ private void evaluateStage(Stage stage,
final EvaluationContext context = new EvaluationContext(message);

// 3. iterate over all the stages in these pipelines and execute them in order
final ArrayList<Rule> rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size());
boolean anyRulesMatched = false;
for (Rule rule : stage.getRules()) {
anyRulesMatched |= evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener);
final List<Rule> stageRules = stage.getRules();
final List<Rule> rulesToRun = new ArrayList<>(stageRules.size());
boolean anyRulesMatched = stageRules.isEmpty(); // If there are no rules, we can simply continue to the next stage
boolean allRulesMatched = true;
for (Rule rule : stageRules) {
final boolean ruleCondition = evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener);
anyRulesMatched |= ruleCondition;
allRulesMatched &= ruleCondition;
}

for (Rule rule : rulesToRun)
for (Rule rule : rulesToRun) {
if (!executeRuleActions(rule, message, msgId, pipeline, context, interpreterListener)) {
// if any of the rules raise an error, skip the rest of the rules
break;
}
}
// stage needed to match all rule conditions to enable the next stage,
// record that it is ok to proceed with this pipeline
// OR
// any rule could match, but at least one had to,
// record that it is ok to proceed with the pipeline
if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size()))
|| (rulesToRun.size() > 0 && anyRulesMatched)) {
final boolean matchAllSuccess = stage.matchAll() && allRulesMatched && rulesToRun.size() == stageRules.size();
final boolean matchEitherSuccess = !stage.matchAll() && anyRulesMatched;
if (matchAllSuccess || matchEitherSuccess) {
interpreterListener.continuePipelineExecution(pipeline, stage);
log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
Expand Down Expand Up @@ -388,7 +393,7 @@ private boolean evaluateRuleCondition(Rule rule,
String msgId,
Pipeline pipeline,
EvaluationContext context,
ArrayList<Rule> rulesToRun, InterpreterListener interpreterListener) {
List<Rule> rulesToRun, InterpreterListener interpreterListener) {
interpreterListener.evaluateRule(rule, pipeline);
final GeneratedRule generatedRule = rule.generatedRule();
boolean matched = generatedRule != null ? generatedRule.when(context) : rule.when().evaluateBool(context);
Expand Down Expand Up @@ -442,19 +447,16 @@ public static class State {
private final ImmutableMap<String, Pipeline> currentPipelines;
private final ImmutableSetMultimap<String, Pipeline> streamPipelineConnections;
private final LoadingCache<Set<Pipeline>, StageIterator.Configuration> cache;
private final ClassLoader commonClassLoader;
private final boolean cachedIterators;

@AssistedInject
public State(@Assisted ImmutableMap<String, Pipeline> currentPipelines,
@Assisted ImmutableSetMultimap<String, Pipeline> streamPipelineConnections,
@Nullable @Assisted ClassLoader commonClassLoader,
MetricRegistry metricRegistry,
@Named("processbuffer_processors") int processorCount,
@Named("cached_stageiterators") boolean cachedIterators) {
this.currentPipelines = currentPipelines;
this.streamPipelineConnections = streamPipelineConnections;
this.commonClassLoader = commonClassLoader;
this.cachedIterators = cachedIterators;

cache = CacheBuilder.newBuilder()
Expand All @@ -468,7 +470,7 @@ public StageIterator.Configuration load(@Nonnull Set<Pipeline> pipelines) throws
});

// we have to remove the metrics, because otherwise we leak references to the cache (and the register call with throw)
metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class,"stage-cache")));
metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class, "stage-cache")));
MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(name(PipelineInterpreter.class, "stage-cache"), cache));
}

Expand Down
Loading

0 comments on commit 3215eda

Please sign in to comment.