Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register thread pool settings #18674

Merged
merged 19 commits into from
Jun 7, 2016
Merged

Register thread pool settings #18674

merged 19 commits into from
Jun 7, 2016

Conversation

jasontedor
Copy link
Member

@jasontedor jasontedor commented Jun 1, 2016

This commit refactors the handling of thread pool settings so that the
individual settings can be registered rather than registering the top
level group. With this refactoring, individual plugins must now register
their own settings for custom thread pools that they need, but a
dedicated API is provided for this in the thread pool module. This
commit also renames the prefix on the thread pool settings from
"threadpool" to "thread_pool". This enables a hard break on the settings
so that:

  • some of the settings can be given more sensible names (e.g., the max
    number of threads in a scaling thread pool is now named "max" instead
    of "size")
  • change the soft limit on the number of threads in the bulk and
    indexing thread pools to a hard limit
  • the settings names for custom plugins for thread pools can be
    prefixed (e.g., "xpack.watcher.thread_pool.size")
  • thread pool settings are now node-level settings

Relates #18613, closes #9216

This commit refactors the handling of thread pool settings so that the
individual settings can be registered rather than registering the top
level group. With this refactoring, individual plugins must now register
their own settings for custom thread pools that they need, but a
dedicated API is provided for this in the thread pool module. This
commit also renames the prefix on the thread pool settings from
"threadpool" to "thread_pool". This enables a hard break on the settings
so that:
 - some of the settings can be given more sensible names (e.g., the max
   number of threads in a scaling thread pool is now named "max" instead
   of "size")
 - change the soft limit on the number of threads in the bulk and
   indexing thread pools to a hard limit
 - the settings names for custom plugins for thread pools can be
   prefixed (e.g., "xpack.watcher.thread_pool.size")
@jasontedor jasontedor added >enhancement >breaking review :Core/Infra/Settings Settings infrastructure and APIs :Core/Infra/Core Core issues without another label v5.0.0-alpha4 labels Jun 1, 2016
@jasontedor jasontedor mentioned this pull request Jun 1, 2016
7 tasks
@s1monw
Copy link
Contributor

s1monw commented Jun 1, 2016

@jasontedor I think if we are breaking things here hard we should go and make them node level settings and prevent them from being updated.

@jasontedor
Copy link
Member Author

jasontedor commented Jun 1, 2016

I think if we are breaking things here hard we should go and make them node level settings and prevent them from being updated.

I think that having them as node-level settings makes the most sense, especially when considering heterogeneous clusters. The one downside to the settings not being dynamically updatable is that right now there is an "out" if an executor or backing queue is too small, it can be dynamically increased as solution to the problem.

I'm happy to remove this though; do you think that should be done in this PR or a follow-up @s1monw?

* master:
  Update reindex.asciidoc (#18687)
  Add more logging to reindex rethrottle
  [TEST] Increase timeout to wait on folder deletion in IndexWithShadowReplicasIT
  [TEST] Fix tests that rely on assumption that data dirs are removed after index deletion (#18681)
  Acknowledge index deletion requests based on standard cluster state acknowledgment (#18602)
  Register "cloud.node.auto_attributes" setting (#18678)
  Fix StoreRecoveryTests after 6.0.1 upgrade.
  Upgrade to Lucene 6.0.1.
  ingest: added `ignore_failure` option to all processors
  Throw IllegalStateException when handshake fails due to version or cluster mismatch (#18676)
  AggregatorBuilder and PipelineAggregatorBuilder do not need generics. #18368
  Move PageCacheRecycler into BigArrays (#18666)
  Index Template: parse and validate mappings in template when template puts
  [TEST] Set BWC version to 5.0.0-SNAP since this is it's min compat version
  Make cluster health classes immutable and have them implement Writeable instead of Streamable
  Fix javadoc that stated a throws clause that didn't exist.
  Clarify the semantics of the BlobContainer interface
  Build: Rework eclipse settings copy so it does not get automatically cleaned
This commit removes the ability for thread pool settings to be adjusted
dynamically. With this commit, thread pool settings are now node-level
settings and can not be modified via the cluster settings API.
This commit removes some unused imports that are no longer needed after
refactoring thread pool settings.
This commit modifies settings prefixes so that the name is not
duplicated (e.g., "xpack.watcher.thread_pool.watcher" under the previous
code).
This commit removes the now unnecessary retired executors field from
ThreadPool after retiring executors are no longer a thing due to remove
dynamic thread pool settings.
Setting.Property.Dynamic, Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, name, "queue_size");
this.queueSizeSetting =
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.Dynamic, Setting.Property.NodeScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting.Property.Dynamic, can go away

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a mistake, I removed them from the scaling executor settings but forget them here. I'm glad that you noticed. Removed in 76026f1.

This commit removes the volatile keyword from the executors field; this
is no longer needed as the executors can no longer be updated.
}

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newlines FTW?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get a newline, and you get a newline, and you get a newline, everyone gets a newline! 😄

Removed in 2f29247.

This commit removes some unnecessary trailing newlines from
o/e/t/ScalingExecutorBuilder.java.

/**
*
*/
public class ThreadPool extends AbstractComponent implements Closeable {
public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to extend this class? I wanna get rid of it? We should set things up in a ctor and impl. Closeable IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky. I want to construct the executors map once, making it effectively final (although it can not be actually final right now since we do not build it in the constructor). Given this desire, we can not build it in the constructor because we need to wait until all plugins have had a chance to register their custom thread pool settings. But the thread pool instance is needed earlier than this happens because it's needed for the MonitorService which is in turn needed for the NodeModule which must be built before we start processing plugins. The timeline is thus:

constructor runs < inject into MonitorService < inject into NodeModule < plugins process modules < start thread pool to create executors

I looked at options around this but I didn't like any of them, they were all super hacky and involved registering suppliers that used the injector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just get rid of custom threadpools instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this patch could work just fine?

diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 8a1df50..94a0ab1 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -98,6 +98,7 @@ import org.elasticsearch.search.SearchService;
 import org.elasticsearch.snapshots.SnapshotShardsService;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.tasks.TaskResultsService;
+import org.elasticsearch.threadpool.ExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPoolModule;
 import org.elasticsearch.transport.TransportService;
@@ -210,12 +211,12 @@ public class Node implements Closeable {
             throw new IllegalStateException("Failed to created node environment", ex);
         }
         final NetworkService networkService = new NetworkService(settings);
-        final ThreadPool threadPool = new ThreadPool(settings);
+        final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders();
+        final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

         NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
         boolean success = false;
         try {
-            final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
             ModulesBuilder modules = new ModulesBuilder();
             modules.add(new Version.Module(version));
             modules.add(new CircuitBreakerModule(settings));
@@ -223,6 +224,7 @@ public class Node implements Closeable {
             for (Module pluginModule : pluginsService.nodeModules()) {
                 modules.add(pluginModule);
             }
+            final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
             modules.add(new PluginsModule(pluginsService));
             SettingsModule settingsModule = new SettingsModule(this.settings);
             modules.add(settingsModule);
diff --git a/core/src/main/java/org/elasticsearch/plugins/Plugin.java b/core/src/main/java/org/elasticsearch/plugins/Plugin.java
index 1efc151..695a255 100644
--- a/core/src/main/java/org/elasticsearch/plugins/Plugin.java
+++ b/core/src/main/java/org/elasticsearch/plugins/Plugin.java
@@ -23,9 +23,12 @@ import org.elasticsearch.common.component.LifecycleComponent;
 import org.elasticsearch.common.inject.Module;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.ThreadPool;

 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;

 /**
  * An extension point allowing to plug in custom functionality.
@@ -80,4 +83,8 @@ public abstract class Plugin {
      */
     @Deprecated
     public final void onModule(IndexModule indexModule) {}
+
+    public List<ExecutorBuilder<?>> getExecutorBuilders() {
+        return Collections.emptyList();
+    }
 }
diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
index f373da6..bb22854 100644
--- a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
+++ b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
@@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.threadpool.ExecutorBuilder;

 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -261,6 +262,14 @@ public class PluginsService extends AbstractComponent {
         return modules;
     }

+    public List<ExecutorBuilder<?>> getExecutorBuilders() {
+        ArrayList<ExecutorBuilder<?>> builders = new ArrayList<>();
+        for (Tuple<PluginInfo, Plugin> plugin : plugins) {
+            builders.addAll(plugin.v2().getExecutorBuilders());
+        }
+        return getExecutorBuilders();
+    }
+
     public Collection<Class<? extends LifecycleComponent>> nodeServices() {
         List<Class<? extends LifecycleComponent>> services = new ArrayList<>();
         for (Tuple<PluginInfo, Plugin> plugin : plugins) {
diff --git a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
index 31f3f31..61e5141 100644
--- a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
+++ b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
@@ -30,7 +30,7 @@ import java.util.List;
  *
  * @param <U> the underlying type of the executor settings
  */
-abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
+public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {

     private final String name;

diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
index 0b564b2..1d641aa 100644
--- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
+++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
@@ -151,7 +151,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
         return Collections.unmodifiableCollection(builders.values());
     }

-    public ThreadPool(Settings settings) {
+    public ThreadPool(Settings settings, ExecutorBuilder<?>... customBuilders) {
         super(settings);

         final Map<String, ExecutorBuilder> builders = new HashMap<>();
@@ -175,7 +175,13 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
         builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
         builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
         builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
-        this.builders = builders;
+        for (ExecutorBuilder<?> builder : customBuilders) {
+            if (builders.containsKey(builder.name())) {
+                throw new IllegalArgumentException("builder with name: " + builder.name() + " already exists");
+            }
+            builders.put(builder.name(), builder);
+        }
+        this.builders = Collections.unmodifiableMap(builders);

         assert Node.NODE_NAME_SETTING.exists(settings);
         threadContext = new ThreadContext(settings);
@@ -190,10 +196,6 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
         this.estimatedTimeThread.start();
     }

-    void add(ExecutorBuilder builder) {
-        builders.put(builder.name(), builder);
-    }
-
     @Override
     protected void doStart() {
         final Map<String, ExecutorHolder> executors = new HashMap<>();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw I pushed 1fdc2db.

This commit removes the dynamic properties from fixed executor settings.
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecutorBuilder.ScalingExecutorSettings> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these look awesome. Can we get some java docs?

Copy link
Member Author

@jasontedor jasontedor Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 070e260.

info.getQueueSize() == null ? "unbounded" : info.getQueueSize());
}

private static SizeValue queueSizeValue(int queueSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only used once so maybe a local var is easier ie. inline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b2fb135.

This commit cleans up some simple issues for executor builders:
 - marks the implementations as final
 - restricts the visibilty of some methods
 - adds Javadocs
 - changes a couple of method names for clarity
This commit inlines the queue size utility method in the fixed executor
builder as it was only used in one place.
This commit increases the hard size limit on bulk and indexing threads
to one plus the bounded number of processors. This allows for an extra
schedulable thread in case one of the others from the pool is blocked on
I/O or a lock.
This commit refactors custom thread pool registration. Rather than
processing the thread pool module in an onModule method on plugins, we
simply ask the plugins for their custom thread pool registration. This
simplifies custom thread pool registration and ultimately thread pool
construction.
This commit reverts an inadvertent import order chance in
AbstractClient.java.
* master: (22 commits)
  Fix recovery throttling to properly handle relocating non-primary shards (#18701)
  Fix merge stats rendering in RestIndicesAction (#18720)
  [TEST] mute RandomAllocationDeciderTests.testRandomDecisions
  Reworked docs for index-shrink API (#18705)
  Improve painless compile-time exceptions
  Adds UUIDs to snapshots
  Add test rethrottle test case for delete-by-query
  Do not start scheduled pings until transport start
  Adressing review comments
  Only filter intial recovery (post API) when shrinking an index (#18661)
  Add tests to check that toQuery() doesn't return null
  Removing handling of null lucene query where we catch this at parse time
  Handle empty query bodies at parse time and remove EmptyQueryBuilder
  Mute failing assertions in IndexWithShadowReplicasIT until fix
  Remove allow running as root
  Add upgrade-not-supported warning to alpha release notes
  remove unrecognized javadoc tag from matrix aggregation module
  set ValuesSourceConfig fields as private
  Adding MultiValuesSource support classes and documentation to matrix stats agg module
  New Matrix Stats Aggregation module
  ...
@@ -187,7 +187,7 @@ public synchronized Settings applySettings(Settings newSettings) {
addSettingsUpdater(setting.newUpdater(consumer, logger, validator));
}

synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
public synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this back to pkg private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a0afb92.

This commit reverts the method AbstractScopedSettings#addSettingsUpdater
to being package-private. This method was previously made public for an
earlier refactoring but that refactoring has been undone after making
thread pool settings non-dynamic.
@@ -80,4 +83,8 @@ public void onIndexModule(IndexModule indexModule) {}
*/
@Deprecated
public final void onModule(IndexModule indexModule) {}

public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put some javadocs on this?

@s1monw
Copy link
Contributor

s1monw commented Jun 3, 2016

I added some minors LGTM otehrwise - no need for another round of review.

This commit renames two methods in ExecutorBuilder to add "get" prefixes
to their method names.
This commit adds Javadocs for the extension point for plugins to
register custom thread pools.
@s1monw
Copy link
Contributor

s1monw commented Jun 3, 2016

LGTM thanks @jasontedor

This commit registers the estimated time interval setting, the interval
at which the estimated time thread updates the estimated time.
@jasontedor jasontedor merged commit da74323 into elastic:master Jun 7, 2016
@jasontedor jasontedor deleted the thread-pool-refactor branch June 7, 2016 02:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>breaking-java :Core/Infra/Core Core issues without another label :Core/Infra/Settings Settings infrastructure and APIs v5.0.0-alpha4
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ThreadPool.java looks for a config setting it itself prohibits
3 participants