diff --git a/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java b/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java index 3d48074..04bebd6 100644 --- a/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java +++ b/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/ConfigurationStateUpdater.java @@ -147,7 +147,7 @@ private synchronized PipelineInterpreter.State reloadAndSave() { } ImmutableSetMultimap streamPipelineConnections = ImmutableSetMultimap.copyOf(connections); - final PipelineInterpreter.State newState = stateFactory.newState(currentPipelines, streamPipelineConnections, commonClassLoader); + final PipelineInterpreter.State newState = stateFactory.newState(currentPipelines, streamPipelineConnections); latestState.set(newState); return newState; } diff --git a/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java b/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java index 1e749ef..7b415d6 100644 --- a/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java +++ b/plugin/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java @@ -16,6 +16,9 @@ */ package org.graylog.plugins.pipelineprocessor.processors; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -27,11 +30,6 @@ import com.google.common.collect.Sets; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; - import org.graylog.plugins.pipelineprocessor.EvaluationContext; import org.graylog.plugins.pipelineprocessor.ast.Pipeline; import org.graylog.plugins.pipelineprocessor.ast.Rule; @@ -54,17 +52,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.inject.Named; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.inject.Inject; -import javax.inject.Named; - import static com.codahale.metrics.MetricRegistry.name; import static org.jooq.lambda.tuple.Tuple.tuple; @@ -120,8 +119,8 @@ public Messages process(Messages messages, InterpreterListener interpreterListen // message id + stream id final Set> processingBlacklist = Sets.newHashSet(); - final List fullyProcessed = Lists.newArrayList(); - List toProcess = Lists.newArrayList(messages); + final List toProcess = Lists.newArrayList(messages); + final List fullyProcessed = Lists.newArrayListWithExpectedSize(toProcess.size()); while (!toProcess.isEmpty()) { final MessageCollection currentSet = new MessageCollection(toProcess); @@ -208,9 +207,9 @@ private ImmutableSet selectPipelines(InterpreterListener interpreterLi .filter(streamId -> !processingBlacklist.contains(tuple(msgId, streamId))) .filter(streamConnection::containsKey) .collect(Collectors.toSet()); - final ImmutableSet pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream() + final ImmutableSet pipelinesToRun = streamsIds.stream() .flatMap(streamId -> streamConnection.get(streamId).stream()) - .collect(Collectors.toSet())); + .collect(ImmutableSet.toImmutableSet()); interpreterListener.processStreams(message, pipelinesToRun, streamsIds); log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds); return pipelinesToRun; @@ -235,11 +234,11 @@ public List processForPipelines(Message message, Set pipelineIds, InterpreterListener interpreterListener, State state) { - final ImmutableSet pipelinesToRun = ImmutableSet.copyOf(pipelineIds - .stream() - .map(pipelineId -> state.getCurrentPipelines().get(pipelineId)) - .filter(pipeline -> pipeline != null) - .collect(Collectors.toSet())); + final Map currentPipelines = state.getCurrentPipelines(); + final ImmutableSet pipelinesToRun = pipelineIds.stream() + .map(currentPipelines::get) + .filter(Objects::nonNull) + .collect(ImmutableSet.toImmutableSet()); return processForResolvedPipelines(message, message.getId(), pipelinesToRun, interpreterListener, state); } @@ -292,24 +291,30 @@ private void evaluateStage(Stage stage, final EvaluationContext context = new EvaluationContext(message); // 3. iterate over all the stages in these pipelines and execute them in order - final ArrayList rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size()); - boolean anyRulesMatched = false; - for (Rule rule : stage.getRules()) { - anyRulesMatched |= evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener); + final List stageRules = stage.getRules(); + final List rulesToRun = new ArrayList<>(stageRules.size()); + boolean anyRulesMatched = stageRules.isEmpty(); // If there are no rules, we can simply continue to the next stage + boolean allRulesMatched = true; + for (Rule rule : stageRules) { + final boolean ruleCondition = evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener); + anyRulesMatched |= ruleCondition; + allRulesMatched &= ruleCondition; } - for (Rule rule : rulesToRun) + for (Rule rule : rulesToRun) { if (!executeRuleActions(rule, message, msgId, pipeline, context, interpreterListener)) { // if any of the rules raise an error, skip the rest of the rules break; } + } // stage needed to match all rule conditions to enable the next stage, // record that it is ok to proceed with this pipeline // OR // any rule could match, but at least one had to, // record that it is ok to proceed with the pipeline - if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size())) - || (rulesToRun.size() > 0 && anyRulesMatched)) { + final boolean matchAllSuccess = stage.matchAll() && allRulesMatched; + final boolean matchEitherSuccess = !stage.matchAll() && anyRulesMatched; + if (matchAllSuccess || matchEitherSuccess) { interpreterListener.continuePipelineExecution(pipeline, stage); log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage", msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"); @@ -388,7 +393,7 @@ private boolean evaluateRuleCondition(Rule rule, String msgId, Pipeline pipeline, EvaluationContext context, - ArrayList rulesToRun, InterpreterListener interpreterListener) { + List rulesToRun, InterpreterListener interpreterListener) { interpreterListener.evaluateRule(rule, pipeline); final GeneratedRule generatedRule = rule.generatedRule(); boolean matched = generatedRule != null ? generatedRule.when(context) : rule.when().evaluateBool(context); @@ -442,19 +447,16 @@ public static class State { private final ImmutableMap currentPipelines; private final ImmutableSetMultimap streamPipelineConnections; private final LoadingCache, StageIterator.Configuration> cache; - private final ClassLoader commonClassLoader; private final boolean cachedIterators; @AssistedInject public State(@Assisted ImmutableMap currentPipelines, @Assisted ImmutableSetMultimap streamPipelineConnections, - @Nullable @Assisted ClassLoader commonClassLoader, MetricRegistry metricRegistry, @Named("processbuffer_processors") int processorCount, @Named("cached_stageiterators") boolean cachedIterators) { this.currentPipelines = currentPipelines; this.streamPipelineConnections = streamPipelineConnections; - this.commonClassLoader = commonClassLoader; this.cachedIterators = cachedIterators; cache = CacheBuilder.newBuilder() @@ -468,7 +470,7 @@ public StageIterator.Configuration load(@Nonnull Set pipelines) throws }); // we have to remove the metrics, because otherwise we leak references to the cache (and the register call with throw) - metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class,"stage-cache"))); + metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class, "stage-cache"))); MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(name(PipelineInterpreter.class, "stage-cache"), cache)); } @@ -496,8 +498,7 @@ public StageIterator getStageIterator(Set pipelines) { public interface Factory { State newState(ImmutableMap currentPipelines, - ImmutableSetMultimap streamPipelineConnections, - @Nullable ClassLoader commonClassLoader); + ImmutableSetMultimap streamPipelineConnections); } } } diff --git a/plugin/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java b/plugin/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java index cbe627e..6f6b4e1 100644 --- a/plugin/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java +++ b/plugin/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java @@ -16,13 +16,13 @@ */ package org.graylog.plugins.pipelineprocessor.processors; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; - import org.graylog.plugins.pipelineprocessor.ast.Pipeline; import org.graylog.plugins.pipelineprocessor.ast.Rule; import org.graylog.plugins.pipelineprocessor.ast.functions.Function; @@ -41,6 +41,7 @@ import org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbRuleService; import org.graylog.plugins.pipelineprocessor.functions.conversion.StringConversion; import org.graylog.plugins.pipelineprocessor.functions.messages.CreateMessage; +import org.graylog.plugins.pipelineprocessor.functions.messages.SetField; import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry; import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser; import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections; @@ -52,12 +53,12 @@ import org.junit.Test; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.concurrent.Executors; import static com.codahale.metrics.MetricRegistry.name; -import static com.google.common.collect.Sets.newHashSet; import static org.assertj.core.api.Assertions.assertThat; import static org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID; import static org.junit.Assert.assertEquals; @@ -65,6 +66,22 @@ import static org.mockito.Mockito.when; public class PipelineInterpreterTest { + private static final RuleDao RULE_TRUE = RuleDao.create("true", "true", "true", + "rule \"true\"\n" + + "when true\n" + + "then\n" + + "end", null, null); + private static final RuleDao RULE_FALSE = RuleDao.create("false", "false", "false", + "rule \"false\"\n" + + "when false\n" + + "then\n" + + "end", null, null); + private static final RuleDao RULE_ADD_FOOBAR = RuleDao.create("add_foobar", "add_foobar", "add_foobar", + "rule \"add_foobar\"\n" + + "when true\n" + + "then\n" + + " set_field(\"foobar\", \"covfefe\");\n" + + "end", null, null); @Test public void testCreateMessage() { @@ -84,7 +101,7 @@ public void testCreateMessage() { final PipelineService pipelineService = mock(MongoDbPipelineService.class); when(pipelineService.loadAll()).thenReturn(Collections.singleton( - PipelineDao.create("cde", "title", "description", + PipelineDao.create("p1", "title", "description", "pipeline \"pipeline\"\n" + "stage 0 match all\n" + " rule \"creates message\";\n" + @@ -93,18 +110,145 @@ public void testCreateMessage() { null) )); - final PipelineStreamConnectionsService pipelineStreamConnectionsService = mock( - MongoDbPipelineStreamConnectionsService.class); - final PipelineConnections pipelineConnections = PipelineConnections.create(null, - DEFAULT_STREAM_ID, - newHashSet("cde")); - when(pipelineStreamConnectionsService.loadAll()).thenReturn( - newHashSet(pipelineConnections) - ); + final Map> functions = ImmutableMap.of( + CreateMessage.NAME, new CreateMessage(), + StringConversion.NAME, new StringConversion()); + + final PipelineInterpreter interpreter = createPipelineInterpreter(ruleService, pipelineService, functions); + + Message msg = messageInDefaultStream("original message", "test"); + final Messages processed = interpreter.process(msg); + + final Message[] messages = Iterables.toArray(processed, Message.class); + assertEquals(2, messages.length); + } + + @Test + public void testMatchAllContinuesIfAllRulesMatched() { + final RuleService ruleService = mock(MongoDbRuleService.class); + when(ruleService.loadAll()).thenReturn(ImmutableList.of(RULE_TRUE, RULE_FALSE, RULE_ADD_FOOBAR)); + + final PipelineService pipelineService = mock(MongoDbPipelineService.class); + when(pipelineService.loadAll()).thenReturn(Collections.singleton( + PipelineDao.create("p1", "title", "description", + "pipeline \"pipeline\"\n" + + "stage 0 match all\n" + + " rule \"true\";\n" + + "stage 1 match either\n" + + " rule \"add_foobar\";\n" + + "end\n", + Tools.nowUTC(), + null) + )); + + final Map> functions = ImmutableMap.of(SetField.NAME, new SetField()); + final PipelineInterpreter interpreter = createPipelineInterpreter(ruleService, pipelineService, functions); + + final Messages processed = interpreter.process(messageInDefaultStream("message", "test")); + + final List messages = ImmutableList.copyOf(processed); + assertThat(messages).hasSize(1); + + final Message actualMessage = messages.get(0); + assertThat(actualMessage.getFieldAs(String.class, "foobar")).isEqualTo("covfefe"); + } + + @Test + public void testMatchAllDoesNotContinueIfNotAllRulesMatched() { + final RuleService ruleService = mock(MongoDbRuleService.class); + when(ruleService.loadAll()).thenReturn(ImmutableList.of(RULE_TRUE, RULE_FALSE, RULE_ADD_FOOBAR)); + + final PipelineService pipelineService = mock(MongoDbPipelineService.class); + when(pipelineService.loadAll()).thenReturn(Collections.singleton( + PipelineDao.create("p1", "title", "description", + "pipeline \"pipeline\"\n" + + "stage 0 match all\n" + + " rule \"true\";\n" + + " rule \"false\";\n" + + "stage 1 match either\n" + + " rule \"add_foobar\";\n" + + "end\n", + Tools.nowUTC(), + null) + )); + + final Map> functions = ImmutableMap.of(SetField.NAME, new SetField()); + final PipelineInterpreter interpreter = createPipelineInterpreter(ruleService, pipelineService, functions); + + final Messages processed = interpreter.process(messageInDefaultStream("message", "test")); - final Map> functions = Maps.newHashMap(); - functions.put(CreateMessage.NAME, new CreateMessage()); - functions.put(StringConversion.NAME, new StringConversion()); + final List messages = ImmutableList.copyOf(processed); + assertThat(messages).hasSize(1); + + final Message actualMessage = messages.get(0); + assertThat(actualMessage.hasField("foobar")).isFalse(); + } + + @Test + public void testMatchEitherContinuesIfOneRuleMatched() { + final RuleService ruleService = mock(MongoDbRuleService.class); + when(ruleService.loadAll()).thenReturn(ImmutableList.of(RULE_TRUE, RULE_FALSE, RULE_ADD_FOOBAR)); + + final PipelineService pipelineService = mock(MongoDbPipelineService.class); + when(pipelineService.loadAll()).thenReturn(Collections.singleton( + PipelineDao.create("p1", "title", "description", + "pipeline \"pipeline\"\n" + + "stage 0 match either\n" + + " rule \"true\";\n" + + " rule \"false\";\n" + + "stage 1 match either\n" + + " rule \"add_foobar\";\n" + + "end\n", + Tools.nowUTC(), + null) + )); + + final Map> functions = ImmutableMap.of(SetField.NAME, new SetField()); + final PipelineInterpreter interpreter = createPipelineInterpreter(ruleService, pipelineService, functions); + + final Messages processed = interpreter.process(messageInDefaultStream("message", "test")); + + final List messages = ImmutableList.copyOf(processed); + assertThat(messages).hasSize(1); + + final Message actualMessage = messages.get(0); + assertThat(actualMessage.getFieldAs(String.class, "foobar")).isEqualTo("covfefe"); + } + + @Test + public void testMatchEitherStopsIfNoRuleMatched() { + final RuleService ruleService = mock(MongoDbRuleService.class); + when(ruleService.loadAll()).thenReturn(ImmutableList.of(RULE_TRUE, RULE_FALSE, RULE_ADD_FOOBAR)); + + final PipelineService pipelineService = mock(MongoDbPipelineService.class); + when(pipelineService.loadAll()).thenReturn(Collections.singleton( + PipelineDao.create("p1", "title", "description", + "pipeline \"pipeline\"\n" + + "stage 0 match either\n" + + " rule \"false\";\n" + + "stage 1 match either\n" + + " rule \"add_foobar\";\n" + + "end\n", + Tools.nowUTC(), + null) + )); + + final Map> functions = ImmutableMap.of(SetField.NAME, new SetField()); + final PipelineInterpreter interpreter = createPipelineInterpreter(ruleService, pipelineService, functions); + + final Messages processed = interpreter.process(messageInDefaultStream("message", "test")); + + final List messages = ImmutableList.copyOf(processed); + assertThat(messages).hasSize(1); + + final Message actualMessage = messages.get(0); + assertThat(actualMessage.hasField("foobar")).isFalse(); + } + + private PipelineInterpreter createPipelineInterpreter(RuleService ruleService, PipelineService pipelineService, Map> functions) { + final PipelineStreamConnectionsService pipelineStreamConnectionsService = mock(MongoDbPipelineStreamConnectionsService.class); + final PipelineConnections pipelineConnections = PipelineConnections.create("p1", DEFAULT_STREAM_ID, Collections.singleton("p1")); + when(pipelineStreamConnectionsService.loadAll()).thenReturn(Collections.singleton(pipelineConnections)); final FunctionRegistry functionRegistry = new FunctionRegistry(functions); final PipelineRuleParser parser = new PipelineRuleParser(functionRegistry, new CodeGenerator(JavaCompiler::new)); @@ -117,19 +261,13 @@ public void testCreateMessage() { functionRegistry, Executors.newScheduledThreadPool(1), mock(EventBus.class), - (currentPipelines, streamPipelineConnections, classLoader) -> new PipelineInterpreter.State(currentPipelines, streamPipelineConnections, null, new MetricRegistry(), 1, true), + (currentPipelines, streamPipelineConnections) -> new PipelineInterpreter.State(currentPipelines, streamPipelineConnections, new MetricRegistry(), 1, true), false); - final PipelineInterpreter interpreter = new PipelineInterpreter( + return new PipelineInterpreter( mock(Journal.class), new MetricRegistry(), stateUpdater ); - - Message msg = messageInDefaultStream("original message", "test"); - final Messages processed = interpreter.process(msg); - - final Message[] messages = Iterables.toArray(processed, Message.class); - assertEquals(2, messages.length); } @Test @@ -161,11 +299,9 @@ public void testMetrics() { final PipelineStreamConnectionsService pipelineStreamConnectionsService = new InMemoryPipelineStreamConnectionsService(); pipelineStreamConnectionsService.save(PipelineConnections.create(null, DEFAULT_STREAM_ID, - newHashSet("cde"))); - - final Map> functions = Maps.newHashMap(); + Collections.singleton("cde"))); - final FunctionRegistry functionRegistry = new FunctionRegistry(functions); + final FunctionRegistry functionRegistry = new FunctionRegistry(Collections.emptyMap()); final PipelineRuleParser parser = new PipelineRuleParser(functionRegistry, new CodeGenerator(JavaCompiler::new)); final MetricRegistry metricRegistry = new MetricRegistry(); @@ -176,7 +312,8 @@ public void testMetrics() { metricRegistry, functionRegistry, Executors.newScheduledThreadPool(1), - mock(EventBus.class), (currentPipelines, streamPipelineConnections, commonClassLoader) -> new PipelineInterpreter.State(currentPipelines, streamPipelineConnections, null, new MetricRegistry(), 1, true), + mock(EventBus.class), + (currentPipelines, streamPipelineConnections) -> new PipelineInterpreter.State(currentPipelines, streamPipelineConnections, new MetricRegistry(), 1, true), false); final PipelineInterpreter interpreter = new PipelineInterpreter( mock(Journal.class), @@ -237,5 +374,4 @@ private Message messageInDefaultStream(String message, String source) { return msg; } - } \ No newline at end of file