Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Fix handling of "match all"/"match either" #193

Merged
merged 1 commit into from
Jun 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private synchronized PipelineInterpreter.State reloadAndSave() {
}
ImmutableSetMultimap<String, Pipeline> streamPipelineConnections = ImmutableSetMultimap.copyOf(connections);

final PipelineInterpreter.State newState = stateFactory.newState(currentPipelines, streamPipelineConnections, commonClassLoader);
final PipelineInterpreter.State newState = stateFactory.newState(currentPipelines, streamPipelineConnections);
latestState.set(newState);
return newState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.graylog.plugins.pipelineprocessor.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -27,11 +30,6 @@
import com.google.common.collect.Sets;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
Expand All @@ -54,17 +52,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;

import static com.codahale.metrics.MetricRegistry.name;
import static org.jooq.lambda.tuple.Tuple.tuple;

Expand Down Expand Up @@ -120,8 +119,8 @@ public Messages process(Messages messages, InterpreterListener interpreterListen
// message id + stream id
final Set<Tuple2<String, String>> processingBlacklist = Sets.newHashSet();

final List<Message> fullyProcessed = Lists.newArrayList();
List<Message> toProcess = Lists.newArrayList(messages);
final List<Message> toProcess = Lists.newArrayList(messages);
final List<Message> fullyProcessed = Lists.newArrayListWithExpectedSize(toProcess.size());

while (!toProcess.isEmpty()) {
final MessageCollection currentSet = new MessageCollection(toProcess);
Expand Down Expand Up @@ -208,9 +207,9 @@ private ImmutableSet<Pipeline> selectPipelines(InterpreterListener interpreterLi
.filter(streamId -> !processingBlacklist.contains(tuple(msgId, streamId)))
.filter(streamConnection::containsKey)
.collect(Collectors.toSet());
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream()
final ImmutableSet<Pipeline> pipelinesToRun = streamsIds.stream()
.flatMap(streamId -> streamConnection.get(streamId).stream())
.collect(Collectors.toSet()));
.collect(ImmutableSet.toImmutableSet());
interpreterListener.processStreams(message, pipelinesToRun, streamsIds);
log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds);
return pipelinesToRun;
Expand All @@ -235,11 +234,11 @@ public List<Message> processForPipelines(Message message,
Set<String> pipelineIds,
InterpreterListener interpreterListener,
State state) {
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(pipelineIds
.stream()
.map(pipelineId -> state.getCurrentPipelines().get(pipelineId))
.filter(pipeline -> pipeline != null)
.collect(Collectors.toSet()));
final Map<String, Pipeline> currentPipelines = state.getCurrentPipelines();
final ImmutableSet<Pipeline> pipelinesToRun = pipelineIds.stream()
.map(currentPipelines::get)
.filter(Objects::nonNull)
.collect(ImmutableSet.toImmutableSet());

return processForResolvedPipelines(message, message.getId(), pipelinesToRun, interpreterListener, state);
}
Expand Down Expand Up @@ -292,24 +291,30 @@ private void evaluateStage(Stage stage,
final EvaluationContext context = new EvaluationContext(message);

// 3. iterate over all the stages in these pipelines and execute them in order
final ArrayList<Rule> rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size());
boolean anyRulesMatched = false;
for (Rule rule : stage.getRules()) {
anyRulesMatched |= evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener);
final List<Rule> stageRules = stage.getRules();
final List<Rule> rulesToRun = new ArrayList<>(stageRules.size());
boolean anyRulesMatched = stageRules.isEmpty(); // If there are no rules, we can simply continue to the next stage
boolean allRulesMatched = true;
for (Rule rule : stageRules) {
final boolean ruleCondition = evaluateRuleCondition(rule, message, msgId, pipeline, context, rulesToRun, interpreterListener);
anyRulesMatched |= ruleCondition;
allRulesMatched &= ruleCondition;
}

for (Rule rule : rulesToRun)
for (Rule rule : rulesToRun) {
if (!executeRuleActions(rule, message, msgId, pipeline, context, interpreterListener)) {
// if any of the rules raise an error, skip the rest of the rules
break;
}
}
// stage needed to match all rule conditions to enable the next stage,
// record that it is ok to proceed with this pipeline
// OR
// any rule could match, but at least one had to,
// record that it is ok to proceed with the pipeline
if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size()))
|| (rulesToRun.size() > 0 && anyRulesMatched)) {
final boolean matchAllSuccess = stage.matchAll() && allRulesMatched;
final boolean matchEitherSuccess = !stage.matchAll() && anyRulesMatched;
if (matchAllSuccess || matchEitherSuccess) {
interpreterListener.continuePipelineExecution(pipeline, stage);
log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
Expand Down Expand Up @@ -388,7 +393,7 @@ private boolean evaluateRuleCondition(Rule rule,
String msgId,
Pipeline pipeline,
EvaluationContext context,
ArrayList<Rule> rulesToRun, InterpreterListener interpreterListener) {
List<Rule> rulesToRun, InterpreterListener interpreterListener) {
interpreterListener.evaluateRule(rule, pipeline);
final GeneratedRule generatedRule = rule.generatedRule();
boolean matched = generatedRule != null ? generatedRule.when(context) : rule.when().evaluateBool(context);
Expand Down Expand Up @@ -442,19 +447,16 @@ public static class State {
private final ImmutableMap<String, Pipeline> currentPipelines;
private final ImmutableSetMultimap<String, Pipeline> streamPipelineConnections;
private final LoadingCache<Set<Pipeline>, StageIterator.Configuration> cache;
private final ClassLoader commonClassLoader;
private final boolean cachedIterators;

@AssistedInject
public State(@Assisted ImmutableMap<String, Pipeline> currentPipelines,
@Assisted ImmutableSetMultimap<String, Pipeline> streamPipelineConnections,
@Nullable @Assisted ClassLoader commonClassLoader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the server startup. How did this work in your setup?

2017-06-29 18:06:20,707 ERROR: org.graylog2.bootstrap.CmdLineTool - Guice error (more detail on log level debug): org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State has @AssistedInject constructors, but none of them match the parameters in method org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State$Factory.newState().  Unable to create AssistedInject factory.
Disconnected from the target VM, address: '127.0.0.1:44391', transport: 'socket'
com.google.inject.CreationException: Unable to create injector, see the following errors:

1) No implementation for org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State$Factory was bound.
  while locating org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State$Factory
    for the 9th parameter of org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater.<init>(ConfigurationStateUpdater.java:74)
  while locating org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater
    for the 3rd parameter of org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.<init>(PipelineInterpreter.java:83)
  at org.graylog2.plugin.PluginModule.addMessageProcessor(PluginModule.java:152) (via modules: org.graylog2.shared.bindings.PluginBindings -> org.graylog.plugins.pipelineprocessor.PipelineProcessorModule)

2) org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State has @AssistedInject constructors, but none of them match the parameters in method org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State$Factory.newState().  Unable to create AssistedInject factory.
  while locating org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State
  at org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter$State$Factory.newState(PipelineInterpreter.java:1)

2 errors
	at com.google.inject.internal.Errors.throwCreationExceptionIfErrorsExist(Errors.java:470)
	at com.google.inject.internal.InternalInjectorCreator.initializeStatically(InternalInjectorCreator.java:155)
	at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:107)
	at com.google.inject.Guice.createInjector(Guice.java:99)
	at org.graylog2.shared.bindings.GuiceInjectorHolder.createInjector(GuiceInjectorHolder.java:34)
	at org.graylog2.bootstrap.CmdLineTool.setupInjector(CmdLineTool.java:379)
	at org.graylog2.bootstrap.CmdLineTool.run(CmdLineTool.java:193)
	at org.graylog2.bootstrap.Main.main(Main.java:44)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were some uncommitted changes in my working directory. 😞

MetricRegistry metricRegistry,
@Named("processbuffer_processors") int processorCount,
@Named("cached_stageiterators") boolean cachedIterators) {
this.currentPipelines = currentPipelines;
this.streamPipelineConnections = streamPipelineConnections;
this.commonClassLoader = commonClassLoader;
this.cachedIterators = cachedIterators;

cache = CacheBuilder.newBuilder()
Expand All @@ -468,7 +470,7 @@ public StageIterator.Configuration load(@Nonnull Set<Pipeline> pipelines) throws
});

// we have to remove the metrics, because otherwise we leak references to the cache (and the register call with throw)
metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class,"stage-cache")));
metricRegistry.removeMatching((name, metric) -> name.startsWith(name(PipelineInterpreter.class, "stage-cache")));
MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(name(PipelineInterpreter.class, "stage-cache"), cache));
}

Expand Down Expand Up @@ -496,8 +498,7 @@ public StageIterator getStageIterator(Set<Pipeline> pipelines) {

public interface Factory {
State newState(ImmutableMap<String, Pipeline> currentPipelines,
ImmutableSetMultimap<String, Pipeline> streamPipelineConnections,
@Nullable ClassLoader commonClassLoader);
ImmutableSetMultimap<String, Pipeline> streamPipelineConnections);
}
}
}
Loading