Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register thread pool settings #18674

Merged
merged 19 commits into from
Jun 7, 2016
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consu
addSettingsUpdater(setting.newUpdater(consumer, logger, validator));
}

synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
public synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this back to pkg private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a0afb92.

this.settingUpdaters.add(updater);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
ThreadPool.THREADPOOL_GROUP_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ public static String threadName(Settings settings, String ... names) {
}

public static String threadName(Settings settings, String namePrefix) {
String name = settings.get("node.name");
if (name == null) {
name = "elasticsearch";
String nodeName = settings.get("node.name");
if (nodeName == null) {
return threadName("", namePrefix);
} else {
name = "elasticsearch[" + name + "]";
return threadName(nodeName, namePrefix);
}
return name + "[" + namePrefix + "]";
}

public static String threadName(final String nodeName, final String namePrefix) {
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
}

public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -210,18 +211,20 @@ protected Node(Environment tmpEnv, Version version, Collection<Class<? extends P
throw new IllegalStateException("Failed to created node environment", ex);
}
final NetworkService networkService = new NetworkService(settings);
final ThreadPool threadPool = new ThreadPool(settings);
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CircuitBreakerModule(settings));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
modules.add(new PluginsModule(pluginsService));
SettingsModule settingsModule = new SettingsModule(this.settings);
modules.add(settingsModule);
Expand All @@ -232,7 +235,8 @@ protected Node(Environment tmpEnv, Version version, Collection<Class<? extends P
modules.add(scriptModule);
modules.add(new NodeEnvironmentModule(nodeEnvironment));
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
final ThreadPoolModule threadPoolModule = new ThreadPoolModule(threadPool);
modules.add(threadPoolModule);
modules.add(new DiscoveryModule(this.settings));
modules.add(new ClusterModule(this.settings));
modules.add(new IndicesModule());
Expand All @@ -246,11 +250,14 @@ protected Node(Environment tmpEnv, Version version, Collection<Class<? extends P
modules.add(new AnalysisModule(environment));

pluginsService.processModules(modules);

scriptModule.prepareSettings(settingsModule);

threadPoolModule.prepareSettings(settingsModule);

injector = modules.createInjector();

client = injector.getInstance(Client.class);
threadPool.setClusterSettings(injector.getInstance(ClusterSettings.class));
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* An extension point allowing to plug in custom functionality.
Expand Down Expand Up @@ -80,4 +83,8 @@ public void onIndexModule(IndexModule indexModule) {}
*/
@Deprecated
public final void onModule(IndexModule indexModule) {}

public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put some javadocs on this?

return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -261,6 +262,14 @@ public Collection<Module> nodeModules() {
return modules;
}

public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final ArrayList<ExecutorBuilder<?>> builders = new ArrayList<>();
for (final Tuple<PluginInfo, Plugin> plugin : plugins) {
builders.addAll(plugin.v2().getExecutorBuilders(settings));
}
return builders;
}

public Collection<Class<? extends LifecycleComponent>> nodeServices() {
List<Class<? extends LifecycleComponent>> services = new ArrayList<>();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.List;

/**
* Base class for executor builders.
*
* @param <U> the underlying type of the executor settings
*/
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get javadocs?

Copy link
Member Author

@jasontedor jasontedor Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 070e260.


private final String name;

public ExecutorBuilder(String name) {
this.name = name;
}

protected String name() {
return name;
}

protected static String settingsKey(final String prefix, final String key) {
return String.join(".", prefix, key);
}

/**
* The list of settings this builder will register.
*
* @return the list of registered settings
*/
abstract List<Setting<?>> registeredSettings();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add get as a prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b6bcafa.


/**
* Return an executor settings object from the node-level settings.
*
* @param settings the node-level settings
* @return the executor settings object
*/
abstract U settings(Settings settings);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add get as a prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b6bcafa.


/**
* Builds the executor with the specified executor settings.
*
* @param settings the executor settings
* @param threadContext the current thread context
* @return a new executor built from the specified executor settings
*/
abstract ThreadPool.ExecutorHolder build(U settings, ThreadContext threadContext);

/**
* Format the thread pool info object for this executor.
*
* @param info the thread pool info object to format
* @return a formatted thread pool info (useful for logging)
*/
abstract String formatInfo(ThreadPool.Info info);

static abstract class ExecutorSettings {

protected final String nodeName;

public ExecutorSettings(String nodeName) {
this.nodeName = nodeName;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.Node;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
* A builder for fixed executors.
*/
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {

private final Setting<Integer> sizeSetting;
private final Setting<Integer> queueSizeSetting;

/**
* Construct a fixed executor builder; the settings will have the
* key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add an overload to Setting#intSetting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure since this is the only use and I don't really see broader use?

sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting =
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}

private int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return 1 + EsExecutors.boundedNumberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
}
}

@Override
List<Setting<?>> registeredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting);
}

@Override
FixedExecutorSettings settings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int size = sizeSetting.get(settings);
final int queueSize = queueSizeSetting.get(settings);
return new FixedExecutorSettings(nodeName, size, queueSize);
}

@Override
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
Executor executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext);
final ThreadPool.Info info =
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}

@Override
String formatInfo(ThreadPool.Info info) {
return String.format(
Locale.ROOT,
"name [%s], size [%d], queue size [%s]",
info.getName(),
info.getMax(),
info.getQueueSize() == null ? "unbounded" : info.getQueueSize());
}

static class FixedExecutorSettings extends ExecutorBuilder.ExecutorSettings {

private final int size;
private final int queueSize;

public FixedExecutorSettings(final String nodeName, final int size, final int queueSize) {
super(nodeName);
this.size = size;
this.queueSize = queueSize;
}

}

}
Loading