Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INGEST] Interrupt the current thread if evaluation grok expressions take too long #31024

Merged
merged 6 commits into from
Jun 12, 2018

Conversation

martijnvg
Copy link
Member

This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.

The thread interrupter in the background checks every 3 seconds whether there are threads
execution the org.joni.Matcher#search(...) method for longer than 5 seconds and
if so interrupts these threads.

Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns org.joni.Matcher#INTERRUPTED

PR for #28731

…take too long

This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.

The thread interrupter in the background checks every 3 seconds whether there are threads
execution the org.joni.Matcher#search() method for longer than 5 seconds and
if so interrupts these threads.

Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns org.joni.Matcher#INTERRUPTED

Closes elastic#28731
@martijnvg martijnvg added >bug review :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.4.0 labels Jun 1, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

Copy link
Contributor

@talevy talevy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, looks great overall!

if (result == Matcher.INTERRUPTED) {
throw new IllegalArgumentException("grok pattern matching is too complex and takes too long to execute");
} else if (result == Matcher.FAILED) {
// I think we should throw an error here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and change current behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly but I think it should be a follow-up so let's go with a // TODO: here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this should be done in a follow up.

I don't think is different behaviour, because in GrokProcessor we throw an error when null is returned.

/**
* Provides a thread pool
*/
// TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something you want to discuss?
I see no issue with leaving the Threadpool available to processors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.

Copy link
Member Author

@martijnvg martijnvg Jun 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I had doubts about directly exposing TP to all processors, because this will send to wrong message to processor implementors, there should be no need to fork a thread. I will expose the minimum necessary.

@@ -45,6 +49,10 @@
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin {

static final Map<String, String> GROK_PATTERNS = Grok.getBuiltinPatterns();
static final Setting<TimeValue> GUARD_INTERVAL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add documentation for these settings

static final Setting<TimeValue> GUARD_INTERVAL =
Setting.timeSetting("ingest.grok.guard.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope);
static final Setting<TimeValue> GUARD_MAX_EXECUTION_TIME =
Setting.timeSetting("ingest.grok.guard.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is discussing these timeout values worth doing?

I took a look at some other defaults of ours, like in the low-level rest client. It sets its timeout to 1sec by default. Should we be more aggressive here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change the defaults to 1 second. I think that should be good enough for now?

Also now I think about if maybe timeout is better than max_execution_time?

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall implementation looks good. I left some comment about some details.

* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
*/
static ThreadInterrupter noop() {
return new Noop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can reference a singleton instance?

/**
* De-registers the current thread and prevents it from being interrupted.
*/
void deregister();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be unregister.

*
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
* that that for every 30k iterations it checks if the current thread is interrupted and if so
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a double that here.

try {
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
long threadTime = entry.getValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This local looks unnecessary?

return new Noop();
}

class Noop implements ThreadInterrupter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it odd that the interface is named ThreadInterrupted and then we have a Noop implementation that does nothing; that is, it implements an interface that indicates that it interrupts threads yet it doesn't interrupt threads. I think a name that avoids this confusion is ThreadWatchdog. This is not an uncommon name for this idea.

}
}
} finally {
scheduler.apply(interval, this::interruptLongRunningExecutions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially risky. What if the try block throws an OutOfMemoryError and the scheduler#apply throws a RejectedExecutionException? We lose the OutOfMemoryError and miss dying with dignity. Now, the scheduler#apply should really on throw a RejectedExecutionException if the scheduler is shutdown which should only happen when we are shutting down, but let's not take any chances here that we lose a fatal error?

BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not happen so we can rethrow this as an AssertionError?

ThreadInterrupter guard = ThreadInterrupter.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here: rethrow as an AssertionError.

// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
Thread thread = new Thread(() -> {
guard.register();
while (run.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have this thread check it's interrupt status and break the loop if it detect it is interrupted? That would be a more realistic implementation of cooperative interruption?

/**
* Provides a thread pool
*/
// TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.

@martijnvg
Copy link
Member Author

@talevy @jasontedor Thanks for reviewing. I've updated this PR.

@martijnvg
Copy link
Member Author

retest this please

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few more comments.

[[grok-watchdog]]
==== Grok watchdog

Grok expression that take too long to execute are interrupted and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expression -> expressions

==== Grok watchdog

Grok expression that take too long to execute are interrupted and
the the grok processor then fails with an exception. The grok
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the the -> the

Grok expression that take too long to execute are interrupted and
the the grok processor then fails with an exception. The grok
processor has a watchdog thread that determines when evaluation
a grok expression takes too long and is controlled by the following
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evaluation a -> evaluation of a

* @return The maximum allowed time for a thread to invoke {@link #unregister()} after {@link #register()}
* has been invoked before this ThreadWatchDog starts to interrupting that thread.
*/
long maxExecutionTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a TimeValue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeValue is not accessible in the grok module. I don't think we want to make grok module depend on elasticsearch-core module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a shame. I agree. Can we make the name of this method reflect that the time is represented in milliseconds then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes: c21a408

}
if (result == Matcher.INTERRUPTED) {
throw new IllegalArgumentException("grok pattern matching was interrupted after [" +
threadWatchdog.maxExecutionTime() + "] ms");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we keep ThreadWatchdog#maxExecutionTime as TimeValue then we can let TimeValue do the formatting for us?

threadWatchdog.unregister();
}
if (result == Matcher.INTERRUPTED) {
throw new IllegalArgumentException("grok pattern matching was interrupted after [" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IllegalArgumentException feels wrong to me? I think a generic RuntimeException is okay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that now too. We're just not able to digest a grok expression here and interrupting a thread was our last resort, so it is not the user's fault.

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@talevy talevy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@martijnvg martijnvg merged commit 6030d4b into elastic:master Jun 12, 2018
martijnvg added a commit that referenced this pull request Jun 12, 2018
…take too long (#31024)

This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.

The thread interrupter in the background checks every 3 seconds whether there are threads
execution the org.joni.Matcher#search() method for longer than 5 seconds and
if so interrupts these threads.

Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns org.joni.Matcher#INTERRUPTED

Closes #28731
dnhatn added a commit that referenced this pull request Jun 14, 2018
* master:
  Remove RestGetAllAliasesAction (#31308)
  Temporary fix for broken build
  Reenable Checkstyle's unused import rule (#31270)
  Remove remaining unused imports before merging #31270
  Fix non-REST doc snippet
  [DOC] Extend SQL docs
  Immediately flush channel after writing to buffer (#31301)
  [DOCS] Shortens ML API intros
  Use quotes in the call invocation (#31249)
  move security ingest processors to a sub ingest directory (#31306)
  Add 5.6.11 version constant.
  Fix version detection.
  SQL: Whitelist SQL utility class for better scripting (#30681)
  [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299)
  CCS: don't proxy requests for already connected node (#31273)
  Mute ScriptedMetricAggregatorTests testSelfReferencingAggStateAfterMap
  [test] opensuse packaging turn up debug logging
  Add unreleased version 6.3.1
  Removes experimental tag from scripted_metric aggregation (#31298)
  [Rollup] Metric config parser must use builder so validation runs (#31159)
  [ML] Check licence when datafeeds use cross cluster search  (#31247)
  Add notion of internal index settings (#31286)
  Test: Remove broken yml test feature (#31255)
  REST hl client: cluster health to default to cluster level (#31268)
  [ML] Update test thresholds to account for changes to memory control (#31289)
  Log warnings when cluster state publication failed to some nodes (#31233)
  Fix AntFixture waiting condition (#31272)
  Ignore numeric shard count if waiting for ALL (#31265)
  [ML] Implement new rules design (#31110)
  index_prefixes back-compat should test 6.3 (#30951)
  Core: Remove plain execute method on TransportAction (#30998)
  Update checkstyle to 8.10.1 (#31269)
  Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202)
  Modify pipelining handlers to require full requests (#31280)
  Revert upgrade to Netty 4.1.25.Final (#31282)
  Use armored input stream for reading public key (#31229)
  Fix Netty 4 Server Transport tests. Again.
  REST hl client: adjust wait_for_active_shards param in cluster health (#31266)
  REST high-level Client: remove deprecated API methods (#31200)
  [DOCS] Mark SQL feature as experimental
  [DOCS] Updates machine learning custom URL screenshots (#31222)
  Fix naming conventions check for XPackTestCase
  Fix security Netty 4 transport tests
  Fix race in clear scroll (#31259)
  [DOCS] Clarify audit index settings when remote indexing (#30923)
  Delete typos in SAML docs (#31199)
  REST high-level client: add Cluster Health API (#29331)
  [ML][TEST] Mute tests using rules (#31204)
  Support RequestedAuthnContext (#31238)
  SyncedFlushResponse to implement ToXContentObject (#31155)
  Add Get Aliases API to the high-level REST client (#28799)
  Remove some line length supressions (#31209)
  Validate xContentType in PutWatchRequest. (#31088)
  [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024)
  Suppress extras FS on caching directory tests
  Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)"
  Revert "Fix snippets in upgrade docs"
  Fix snippets in upgrade docs
  [DOCS] Added 6.3 info & updated the upgrade table. (#30940)
  LLClient: Support host selection (#30523)
  Upgrade to Netty 4.1.25.Final (#31232)
  Enable custom credentials for core REST tests (#31235)
  Move ESIndexLevelReplicationTestCase to test framework (#31243)
  Encapsulate Translog in Engine (#31220)
  HLRest: Add get index templates API (#31161)
  Remove all unused imports and fix CRLF (#31207)
  [Tests] Fix self-referencing tests
  [TEST] Fix testRecoveryAfterPrimaryPromotion
  [Docs] Remove mention pattern files in Grok processor (#31170)
  Use stronger write-once semantics for Azure repository (#30437)
  Don't swallow exceptions on replication (#31179)
  Limit the number of concurrent requests per node (#31206)
  Call ensureNoSelfReferences() on _agg state variable after scripted metric agg script executions (#31044)
  Move java version checker back to its own jar (#30708)
  [test] add fix for rare virtualbox error (#31212)
dnhatn added a commit that referenced this pull request Jun 14, 2018
* 6.x:
  SQL: Fix build on Java 10
  [Tests] Mutualize fixtures code in BaseHttpFixture (#31210)
  [TEST] Fix RemoteClusterClientTests#testEnsureWeReconnect
  [ML] Update test thresholds to account for changes to memory control (#31289)
  Reenable Checkstyle's unused import rule (#31270)
  [ML] Check licence when datafeeds use cross cluster search  (#31247)
  Fix non-REST doc snippet
  [DOC] Extend SQL docs
  [DOCS] Shortens ML API intros
  Use quotes in the call invocation (#31249)
  move security ingest processors to a sub ingest directory (#31306)
  SQL: Whitelist SQL utility class for better scripting (#30681)
  Add 5.6.11 version constant.
  Fix version detection.
  [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299)
  Add missing release notes.
  Security: fix token bwc with pre 6.0.0-beta2 (#31254)
  Fix compilation error in UpdateSettingsIT (#31304)
  Test: Remove broken yml test feature (#31255)
  Add unreleased version 6.3.1
  [Rollup] Metric config parser must use builder so validation runs (#31159)
  Removes experimental tag from scripted_metric aggregation (#31298)
  [DOCS] Removes coming tag from 6.3.0 release notes
  6.3 release notes.
  Add notion of internal index settings (#31286)
  REST high-level client: add Cluster Health API (#29331)
  Remove leftover usage of deprecated client API
  SyncedFlushResponse to implement ToXContentObject (#31155)
  Add Get Aliases API to the high-level REST client (#28799)
  HLRest: Add get index templates API (#31161)
  Log warnings when cluster state publication failed to some nodes (#31233)
  Fix AntFixture waiting condition (#31272)
  [TEST] Mute RecoveryIT.testHistoryUUIDIsGenerated
  Ignore numeric shard count if waiting for ALL (#31265)
  Update checkstyle to 8.10.1 (#31269)
  Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202)
  Revert upgrade to Netty 4.1.25.Final (#31282)
  Use armored input stream for reading public key (#31229)
  [DOCS] Added 'fail_on_unsupported_field' param to MLT. Closes #28008 (#31160)
  Fix Netty 4 Server Transport tests. Again.
  [DOCS] Fixed typo.
  [DOCS] Added release highlights for 6.3 (#31256)
  [DOCS] Mark SQL feature as experimental
  [DOCS] Updates machine learning custom URL screenshots (#31222)
  Fix naming conventions check for XPackTestCase
  Fix security Netty 4 transport tests
  Fix race in clear scroll (#31259)
  [DOCS] Clarify audit index settings when remote indexing (#30923)
  [ML][TEST] Mute tests using rules (#31204)
  Support RequestedAuthnContext (#31238)
  Validate xContentType in PutWatchRequest. (#31088)
  [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024)
  Upgrade to Netty 4.1.25.Final (#31232)
  Suppress extras FS on caching directory tests
  Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)"
  Revert "Fix snippets in upgrade docs"
  Fix snippets in upgrade docs
  [DOCS] Added 6.3 info & updated the upgrade table. (#30940)
  Enable custom credentials for core REST tests (#31235)
  Move ESIndexLevelReplicationTestCase to test framework (#31243)
  Encapsulate Translog in Engine (#31220)
  [DOCS] Adds machine learning 6.3.0 release notes (#31217)
  Remove all unused imports and fix CRLF (#31207)
  [TEST] Fix testRecoveryAfterPrimaryPromotion
  [Docs] Remove mention pattern files in Grok processor (#31170)
  Use stronger write-once semantics for Azure repository (#30437)
  Don't swallow exceptions on replication (#31179)
  Compliant SAML Response destination check (#31175)
  Move java version checker back to its own jar (#30708)
  TEST:  Retry synced-flush if ongoing ops on primary (#30978)
  [test] add fix for rare virtualbox error (#31212)
@jimczi jimczi added v7.0.0-beta1 and removed v7.0.0 labels Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v6.4.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants