-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register thread pool settings #18674
Changes from 15 commits
73b1ed2
1d30273
a219971
ae9bea0
cc935f0
08b14d0
ccabd8c
2f29247
76026f1
070e260
b2fb135
a1e3cb3
1fdc2db
26eb8b1
6b6a02b
a0afb92
b6bcafa
001c3cf
955d694
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,9 +23,12 @@ | |
import org.elasticsearch.common.inject.Module; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.IndexModule; | ||
import org.elasticsearch.threadpool.ExecutorBuilder; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
/** | ||
* An extension point allowing to plug in custom functionality. | ||
|
@@ -80,4 +83,8 @@ public void onIndexModule(IndexModule indexModule) {} | |
*/ | ||
@Deprecated | ||
public final void onModule(IndexModule indexModule) {} | ||
|
||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you put some javadocs on this? |
||
return Collections.emptyList(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.threadpool; | ||
|
||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Base class for executor builders. | ||
* | ||
* @param <U> the underlying type of the executor settings | ||
*/ | ||
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we get javadocs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 070e260. |
||
|
||
private final String name; | ||
|
||
public ExecutorBuilder(String name) { | ||
this.name = name; | ||
} | ||
|
||
protected String name() { | ||
return name; | ||
} | ||
|
||
protected static String settingsKey(final String prefix, final String key) { | ||
return String.join(".", prefix, key); | ||
} | ||
|
||
/** | ||
* The list of settings this builder will register. | ||
* | ||
* @return the list of registered settings | ||
*/ | ||
abstract List<Setting<?>> registeredSettings(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed b6bcafa. |
||
|
||
/** | ||
* Return an executor settings object from the node-level settings. | ||
* | ||
* @param settings the node-level settings | ||
* @return the executor settings object | ||
*/ | ||
abstract U settings(Settings settings); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed b6bcafa. |
||
|
||
/** | ||
* Builds the executor with the specified executor settings. | ||
* | ||
* @param settings the executor settings | ||
* @param threadContext the current thread context | ||
* @return a new executor built from the specified executor settings | ||
*/ | ||
abstract ThreadPool.ExecutorHolder build(U settings, ThreadContext threadContext); | ||
|
||
/** | ||
* Format the thread pool info object for this executor. | ||
* | ||
* @param info the thread pool info object to format | ||
* @return a formatted thread pool info (useful for logging) | ||
*/ | ||
abstract String formatInfo(ThreadPool.Info info); | ||
|
||
static abstract class ExecutorSettings { | ||
|
||
protected final String nodeName; | ||
|
||
public ExecutorSettings(String nodeName) { | ||
this.nodeName = nodeName; | ||
} | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.threadpool; | ||
|
||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.SizeValue; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; | ||
import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
import org.elasticsearch.node.Node; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ThreadFactory; | ||
|
||
/** | ||
* A builder for fixed executors. | ||
*/ | ||
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> { | ||
|
||
private final Setting<Integer> sizeSetting; | ||
private final Setting<Integer> queueSizeSetting; | ||
|
||
/** | ||
* Construct a fixed executor builder; the settings will have the | ||
* key prefix "thread_pool." followed by the executor name. | ||
* | ||
* @param settings the node-level settings | ||
* @param name the name of the executor | ||
* @param size the fixed number of threads | ||
* @param queueSize the size of the backing queue, -1 for unbounded | ||
*/ | ||
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) { | ||
this(settings, name, size, queueSize, "thread_pool." + name); | ||
} | ||
|
||
/** | ||
* Construct a fixed executor builder. | ||
* | ||
* @param settings the node-level settings | ||
* @param name the name of the executor | ||
* @param size the fixed number of threads | ||
* @param queueSize the size of the backing queue, -1 for unbounded | ||
* @param prefix the prefix for the settings keys | ||
*/ | ||
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) { | ||
super(name); | ||
final String sizeKey = settingsKey(prefix, "size"); | ||
this.sizeSetting = | ||
new Setting<>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add an overload to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure since this is the only use and I don't really see broader use? |
||
sizeKey, | ||
s -> Integer.toString(size), | ||
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), | ||
Setting.Property.NodeScope); | ||
final String queueSizeKey = settingsKey(prefix, "queue_size"); | ||
this.queueSizeSetting = | ||
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); | ||
} | ||
|
||
private int applyHardSizeLimit(final Settings settings, final String name) { | ||
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { | ||
return 1 + EsExecutors.boundedNumberOfProcessors(settings); | ||
} else { | ||
return Integer.MAX_VALUE; | ||
} | ||
} | ||
|
||
@Override | ||
List<Setting<?>> registeredSettings() { | ||
return Arrays.asList(sizeSetting, queueSizeSetting); | ||
} | ||
|
||
@Override | ||
FixedExecutorSettings settings(Settings settings) { | ||
final String nodeName = Node.NODE_NAME_SETTING.get(settings); | ||
final int size = sizeSetting.get(settings); | ||
final int queueSize = queueSizeSetting.get(settings); | ||
return new FixedExecutorSettings(nodeName, size, queueSize); | ||
} | ||
|
||
@Override | ||
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) { | ||
int size = settings.size; | ||
int queueSize = settings.queueSize; | ||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); | ||
Executor executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext); | ||
final ThreadPool.Info info = | ||
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); | ||
return new ThreadPool.ExecutorHolder(executor, info); | ||
} | ||
|
||
@Override | ||
String formatInfo(ThreadPool.Info info) { | ||
return String.format( | ||
Locale.ROOT, | ||
"name [%s], size [%d], queue size [%s]", | ||
info.getName(), | ||
info.getMax(), | ||
info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); | ||
} | ||
|
||
static class FixedExecutorSettings extends ExecutorBuilder.ExecutorSettings { | ||
|
||
private final int size; | ||
private final int queueSize; | ||
|
||
public FixedExecutorSettings(final String nodeName, final int size, final int queueSize) { | ||
super(nodeName); | ||
this.size = size; | ||
this.queueSize = queueSize; | ||
} | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this back to pkg private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in a0afb92.