Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INGEST] Interrupt the current thread if evaluation grok expressions take too long #31024

Merged
merged 6 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,24 @@ The above request will return a response body containing a key-value representat

This can be useful to reference as the built-in patterns change across versions.

[[grok-watchdog]]
==== Grok watchdog

Grok expressions that take too long to execute are interrupted and
the grok processor then fails with an exception. The grok
processor has a watchdog thread that determines when evaluation of
a grok expression takes too long and is controlled by the following
settings:

[[grok-watchdog-options]]
.Grok watchdog settings
[options="header"]
|======
| Name | Default | Description
| `ingest.grok.watchdog.interval` | 1s | How often to check whether there are grok evaluations that take longer than the maximum allowed execution time.
| `ingest.grok.watchdog.max_execution_time` | 1s | The maximum allowed execution of a grok expression evaluation.
|======

[[gsub-processor]]
=== Gsub Processor
Converts a string field by applying a regular expression and a replacement.
Expand Down
53 changes: 41 additions & 12 deletions libs/grok/src/main/java/org/elasticsearch/grok/Grok.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,24 @@ public final class Grok {
private final Map<String, String> patternBank;
private final boolean namedCaptures;
private final Regex compiledExpression;
private final ThreadWatchdog threadWatchdog;

public Grok(Map<String, String> patternBank, String grokPattern) {
this(patternBank, grokPattern, true);
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
}

@SuppressWarnings("unchecked")

public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
this(patternBank, grokPattern, true, threadWatchdog);
}

Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
}

private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
this.patternBank = patternBank;
this.namedCaptures = namedCaptures;
this.threadWatchdog = threadWatchdog;

for (Map.Entry<String, String> entry : patternBank.entrySet()) {
String name = entry.getKey();
Expand Down Expand Up @@ -163,7 +172,13 @@ public String toRegex(String grokPattern) {
byte[] grokPatternBytes = grokPattern.getBytes(StandardCharsets.UTF_8);
Matcher matcher = GROK_PATTERN_REGEX.matcher(grokPatternBytes);

int result = matcher.search(0, grokPatternBytes.length, Option.NONE);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
} finally {
threadWatchdog.unregister();
}
if (result != -1) {
Region region = matcher.getEagerRegion();
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
Expand Down Expand Up @@ -205,7 +220,13 @@ public String toRegex(String grokPattern) {
*/
public boolean match(String text) {
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
int result = matcher.search(0, text.length(), Option.DEFAULT);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, text.length(), Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
return (result != -1);
}

Expand All @@ -220,8 +241,20 @@ public Map<String, Object> captures(String text) {
byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8);
Map<String, Object> fields = new HashMap<>();
Matcher matcher = compiledExpression.matcher(textAsBytes);
int result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
if (result != -1 && compiledExpression.numberOfNames() > 0) {
int result;
try {
threadWatchdog.register();
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
if (result == Matcher.INTERRUPTED) {
throw new RuntimeException("grok pattern matching was interrupted after [" +
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
} else if (result == Matcher.FAILED) {
// TODO: I think we should throw an error here?
return null;
} else if (compiledExpression.numberOfNames() > 0) {
Region region = matcher.getEagerRegion();
for (Iterator<NameEntry> entry = compiledExpression.namedBackrefIterator(); entry.hasNext();) {
NameEntry e = entry.next();
Expand All @@ -235,13 +268,9 @@ public Map<String, Object> captures(String text) {
break;
}
}

}
return fields;
} else if (result != -1) {
return fields;
}
return null;
return fields;
}

public static Map<String, String> getBuiltinPatterns() {
Expand Down
148 changes: 148 additions & 0 deletions libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.grok;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

/**
* Protects against long running operations that happen between the register and unregister invocations.
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
* will be interrupted.
*
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
* that for every 30k iterations it checks if the current thread is interrupted and if so
* returns {@link org.joni.Matcher#INTERRUPTED}.
*/
public interface ThreadWatchdog {

/**
* Registers the current thread and interrupts the current thread
* if the takes too long for this thread to invoke {@link #unregister()}.
*/
void register();

/**
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
*/
long maxExecutionTimeInMillis();

/**
* Unregisters the current thread and prevents it from being interrupted.
*/
void unregister();

/**
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
* then interrupts these threads.
*
* @param interval The fixed interval to check if there are threads to interrupt
* @param maxExecutionTime The time a thread has the execute an operation.
* @param relativeTimeSupplier A supplier that returns relative time
* @param scheduler A scheduler that is able to execute a command for each fixed interval
*/
static ThreadWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}

/**
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
*/
static ThreadWatchdog noop() {
return Noop.INSTANCE;
}

class Noop implements ThreadWatchdog {

private static final Noop INSTANCE = new Noop();

private Noop() {
}

@Override
public void register() {
}

@Override
public long maxExecutionTimeInMillis() {
return Long.MAX_VALUE;
}

@Override
public void unregister() {
}
}

class Default implements ThreadWatchdog {

private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
this.interval = interval;
this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
scheduler.apply(interval, this::interruptLongRunningExecutions);
}

public void register() {
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
assert previousValue == null;
}

@Override
public long maxExecutionTimeInMillis() {
return maxExecutionTime;
}

public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());
assert previousValue != null;
}

private void interruptLongRunningExecutions() {
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
entry.getKey().interrupt();
// not removing the entry here, this happens in the unregister() method.
}
}
scheduler.apply(interval, this::interruptLongRunningExecutions);
}

}

}
39 changes: 31 additions & 8 deletions libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,24 @@
package org.elasticsearch.grok;

import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;


public class GrokTests extends ESTestCase {
private Map<String, String> basePatterns;

@Before
public void setup() {
basePatterns = Grok.getBuiltinPatterns();
}
private static final Map<String, String> basePatterns = Grok.getBuiltinPatterns();

public void testMatchWithoutCaptures() {
String line = "value";
Expand Down Expand Up @@ -416,4 +412,31 @@ public void testMultipleNamedCapturesWithSameName() {
expected.put("num", "1");
assertThat(grok.captures("12"), equalTo(expected));
}

public void testExponentialExpressions() {
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed

String grokPattern = "Bonsuche mit folgender Anfrage: Belegart->\\[%{WORD:param2},(?<param5>(\\s*%{NOTSPACE})*)\\] " +
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not happen so we can rethrow this as an AssertionError?

throw new AssertionError(e);
}
Thread t = new Thread(() -> {
if (run.get()) {
command.run();
}
});
t.start();
return null;
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
run.set(false);
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
}
}
Loading