diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index df1288d4fd291..2d45a6fecffdf 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -26,16 +26,12 @@ import java.util.Arrays; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -/** - * - */ public class EsExecutors { /** @@ -62,16 +58,11 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); - // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; } - public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { - return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy(), contextHolder); - } - public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue queue; if (queueCapacity < 0) { @@ -114,6 +105,7 @@ public static ThreadFactory daemonThreadFactory(String namePrefix) { } static class EsThreadFactory implements ThreadFactory { + final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; @@ -133,6 +125,7 @@ public Thread newThread(Runnable r) { t.setDaemon(true); return t; } + } /** @@ -141,7 +134,6 @@ public Thread newThread(Runnable r) { private EsExecutors() { } - static class ExecutorScalingQueue extends LinkedTransferQueue { ThreadPoolExecutor executor; @@ -151,9 +143,17 @@ public ExecutorScalingQueue() { @Override public boolean offer(E e) { + // first try to transfer to a waiting worker thread if (!tryTransfer(e)) { + // check if there might be spare capacity in the thread + // pool executor int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); if (left > 0) { + // reject queuing the task to force the thread pool + // executor to add a worker if it can; combined + // with ForceQueuePolicy, this causes the thread + // pool to always scale up to max pool size and we + // only queue when there is no spare capacity return false; } else { return super.offer(e); @@ -162,6 +162,7 @@ public boolean offer(E e) { return true; } } + } /** @@ -184,4 +185,5 @@ public long rejected() { return 0; } } + } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 04860d1f84fb7..1f5baec1040b4 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -91,7 +91,6 @@ public static class Names { } public enum ThreadPoolType { - CACHED("cached"), DIRECT("direct"), FIXED("fixed"), SCALING("scaling"); @@ -125,12 +124,12 @@ public static ThreadPoolType fromType(String type) { } } - public static Map THREAD_POOL_TYPES; + public static final Map THREAD_POOL_TYPES; static { HashMap map = new HashMap<>(); map.put(Names.SAME, ThreadPoolType.DIRECT); - map.put(Names.GENERIC, ThreadPoolType.CACHED); + map.put(Names.GENERIC, ThreadPoolType.SCALING); map.put(Names.LISTENER, ThreadPoolType.FIXED); map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.INDEX, ThreadPoolType.FIXED); @@ -153,33 +152,67 @@ private static void add(Map executorSettings, ExecutorSettings executorSettings.put(name, settings); } - private static class ExecutorSettingsBuilder { - Map settings = new HashMap<>(); + private static abstract class ExecutorSettingsBuilder> { - public ExecutorSettingsBuilder(String name) { - settings.put("name", name); - settings.put("type", THREAD_POOL_TYPES.get(name).getType()); + private final Settings.Builder builder; + + protected ExecutorSettingsBuilder(String name, ThreadPoolType threadPoolType) { + if (THREAD_POOL_TYPES.get(name) != threadPoolType) { + throw new IllegalArgumentException("thread pool [" + name + "] must be of type [" + threadPoolType + "]"); + } + builder = Settings.builder(); + builder.put("name", name); + builder.put("type", threadPoolType.getType()); } - public ExecutorSettingsBuilder size(int availableProcessors) { - return add("size", Integer.toString(availableProcessors)); + public T keepAlive(String keepAlive) { + return add("keep_alive", keepAlive); } - public ExecutorSettingsBuilder queueSize(int queueSize) { - return add("queue_size", Integer.toString(queueSize)); + public T queueSize(int queueSize) { + return add("queue_size", queueSize); } - public ExecutorSettingsBuilder keepAlive(String keepAlive) { - return add("keep_alive", keepAlive); + protected T add(String setting, int value) { + return add(setting, Integer.toString(value)); + } + + + protected T add(String setting, String value) { + builder.put(setting, value); + @SuppressWarnings("unchecked") final T executor = (T)this; + return executor; + } + + public final Settings build() { return builder.build(); } + + } + + private static class FixedExecutorSettingsBuilder extends ExecutorSettingsBuilder { + + public FixedExecutorSettingsBuilder(String name) { + super(name, ThreadPoolType.FIXED); } - private ExecutorSettingsBuilder add(String key, String value) { - settings.put(key, value); - return this; + public FixedExecutorSettingsBuilder size(int size) { + return add("size", Integer.toString(size)); } - public Settings build() { - return Settings.builder().put(settings).build(); + } + + private static class ScalingExecutorSettingsBuilder extends ExecutorSettingsBuilder { + + public ScalingExecutorSettingsBuilder(String name) { + super(name, ThreadPoolType.SCALING); + } + + public ScalingExecutorSettingsBuilder min(int min) { + return add("min", min); + } + + + public ScalingExecutorSettingsBuilder size(int size) { + return add("size", size); } } @@ -215,25 +248,26 @@ public ThreadPool(Settings settings) { validate(groupSettings); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); - int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); + int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); + int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); Map defaultExecutorTypeSettings = new HashMap<>(); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).size(4 * availableProcessors).keepAlive("30s")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m")); + int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).size(genericThreadPoolMax).keepAlive("30s")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.MANAGEMENT).min(1).size(5).keepAlive("5m")); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FLUSH).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.REFRESH).min(1).size(halfProcMaxAt10).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.WARMER).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.SNAPSHOT).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).min(1).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).min(1).size(availableProcessors * 2).keepAlive("5m")); this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings); @@ -251,9 +285,6 @@ public ThreadPool(Settings settings) { } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); - if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) { - throw new IllegalArgumentException("generic thread pool must be of type cached"); - } this.executors = unmodifiableMap(executors); this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -447,49 +478,23 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); if (ThreadPoolType.DIRECT == threadPoolType) { if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}]", name, type); + logger.debug("updating thread pool [{}], type [{}]", name, type); } else { - logger.debug("creating thread_pool [{}], type [{}]", name, type); + logger.debug("creating thread pool [{}], type [{}]", name, type); } return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType)); - } else if (ThreadPoolType.CACHED == threadPoolType) { - if (!Names.GENERIC.equals(name)) { - throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"); - } - TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); - if (previousExecutorHolder != null) { - if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) { - TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); - if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); - ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null)); - } - return previousExecutorHolder; - } - if (previousInfo.getKeepAlive() != null) { - defaultKeepAlive = previousInfo.getKeepAlive(); - } - } - TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); - if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - } else { - logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - } - Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); - return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null)); } else if (ThreadPoolType.FIXED == threadPoolType) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); if (previousExecutorHolder != null) { + assert previousInfo != null; if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) { SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax())); if (previousInfo.getMax() != updatedSize) { - logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); + logger.debug("updating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); // if you think this code is crazy: that's because it is! if (updatedSize > previousInfo.getMax()) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); @@ -511,20 +516,24 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize)); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); - logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); + logger.debug("creating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize)); } else if (ThreadPoolType.SCALING == threadPoolType) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); + final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null)); + if (queueSize != null) { + throw new IllegalArgumentException("thread pool [" + name + "] of type scaling can not have its queue re-sized but was [" + queueSize + "]"); + } if (previousExecutorHolder != null) { if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); + logger.debug("updating thread pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); } @@ -552,9 +561,9 @@ private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolde int min = settings.getAsInt("min", defaultMin); int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize)); if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + logger.debug("updating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } else { - logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + logger.debug("creating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null)); @@ -577,6 +586,32 @@ private int applyHardSizeLimit(String name, int size) { return size; } + /** + * Constrains a value between minimum and maximum values + * (inclusive). + * + * @param value the value to constrain + * @param min the minimum acceptable value + * @param max the maximum acceptable value + * @return min if value is less than min, max if value is greater + * than value, otherwise value + */ + static int boundedBy(int value, int min, int max) { + return Math.min(max, Math.max(min, value)); + } + + static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) { + return boundedBy((numberOfProcessors + 1) / 2, 1, 5); + } + + static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) { + return boundedBy((numberOfProcessors + 1) / 2, 1, 10); + } + + static int twiceNumberOfProcessors(int numberOfProcessors) { + return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE); + } + private void updateSettings(Settings settings) { Map groupSettings = settings.getAsGroups(); if (groupSettings.isEmpty()) { @@ -969,4 +1004,5 @@ public void close() throws IOException { public ThreadContext getThreadContext() { return threadContext; } + } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 503daba8c2a03..1604ab24160d1 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -50,8 +50,7 @@ protected Settings nodeSettings(int nodeOrdinal) { //Have very low pool and queue sizes to overwhelm internal pools easily return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.generic.size", 1) - .put("threadpool.generic.queue_size", 1) + .put("threadpool.generic.max", 4) // don't mess with this one! It's quite sensitive to a low queue size // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected) //.put("threadpool.listener.queue_size", 1) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java b/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java new file mode 100644 index 0000000000000..7fbd3ccd31b62 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class ESThreadPoolTestCase extends ESTestCase { + + protected final ThreadPool.Info info(final ThreadPool threadPool, final String name) { + for (final ThreadPool.Info info : threadPool.info()) { + if (info.getName().equals(name)) { + return info; + } + } + throw new IllegalArgumentException(name); + } + + protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) { + for (final ThreadPoolStats.Stats stats : threadPool.stats()) { + if (name.equals(stats.getName())) { + return stats; + } + } + throw new IllegalArgumentException(name); + } + + protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } + } + + static String randomThreadPool(final ThreadPool.ThreadPoolType type) { + return randomFrom( + ThreadPool.THREAD_POOL_TYPES + .entrySet().stream() + .filter(t -> t.getValue().equals(type)) + .map(Map.Entry::getKey) + .collect(Collectors.toList())); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java new file mode 100644 index 0000000000000..9c911d1e25cfd --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -0,0 +1,245 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class ScalingThreadPoolTests extends ESThreadPoolTestCase { + + public void testScalingThreadPoolConfiguration() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final Settings.Builder builder = Settings.builder(); + + final int min; + if (randomBoolean()) { + min = randomIntBetween(0, 8); + builder.put("threadpool." + threadPoolName + ".min", min); + } else { + min = "generic".equals(threadPoolName) ? 4 : 1; // the defaults + } + + final int sizeBasedOnNumberOfProcessors; + if (randomBoolean()) { + final int processors = randomIntBetween(1, 64); + sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors); + builder.put("processors", processors); + } else { + sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, Math.min(32, Runtime.getRuntime().availableProcessors())); + } + + final int expectedSize; + if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) { + expectedSize = randomIntBetween(min, 16); + builder.put("threadpool." + threadPoolName + ".size", expectedSize); + } else { + expectedSize = sizeBasedOnNumberOfProcessors; + } + + final long keepAlive; + if (randomBoolean()) { + keepAlive = randomIntBetween(1, 300); + builder.put("threadpool." + threadPoolName + ".keep_alive", keepAlive + "s"); + } else { + keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults + } + + runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> { + final Executor executor = threadPool.executor(threadPoolName); + assertThat(executor, instanceOf(EsThreadPoolExecutor.class)); + final EsThreadPoolExecutor esThreadPoolExecutor = (EsThreadPoolExecutor)executor; + final ThreadPool.Info info = info(threadPool, threadPoolName); + + assertThat(info.getName(), equalTo(threadPoolName)); + assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.SCALING)); + + assertThat(info.getKeepAlive().seconds(), equalTo(keepAlive)); + assertThat(esThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS), equalTo(keepAlive)); + + assertNull(info.getQueueSize()); + assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE)); + + assertThat(info.getMin(), equalTo(min)); + assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(min)); + assertThat(info.getMax(), equalTo(expectedSize)); + assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedSize)); + }); + } + + @FunctionalInterface + private interface SizeFunction { + int size(int numberOfProcessors); + } + + private int expectedSize(final String threadPoolName, final int numberOfProcessors) { + final Map sizes = new HashMap<>(); + sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512)); + sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5); + sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfNumberOfProcessorsMaxTen); + sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceNumberOfProcessors); + sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceNumberOfProcessors); + return sizes.get(threadPoolName).size(numberOfProcessors); + } + + public void testValidDynamicKeepAlive() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final Executor beforeExecutor = threadPool.executor(threadPoolName); + final long seconds = randomIntBetween(1, 300); + clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".keep_alive", seconds + "s")); + final Executor afterExecutor = threadPool.executor(threadPoolName); + assertSame(beforeExecutor, afterExecutor); + final ThreadPool.Info info = info(threadPool, threadPoolName); + assertThat(info.getKeepAlive().seconds(), equalTo(seconds)); + }); + } + + public void testScalingThreadPoolIsBounded() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final int size = randomIntBetween(32, 512); + final Settings settings = Settings.builder().put("threadpool." + threadPoolName + ".size", size).build(); + runScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + final int numberOfTasks = 2 * size; + final CountDownLatch taskLatch = new CountDownLatch(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + threadPool.executor(threadPoolName).execute(() -> { + try { + latch.await(); + taskLatch.countDown(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + final ThreadPoolStats.Stats stats = stats(threadPool, threadPoolName); + assertThat(stats.getQueue(), equalTo(numberOfTasks - size)); + assertThat(stats.getLargest(), equalTo(size)); + latch.countDown(); + try { + taskLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final Settings settings = + Settings.builder() + .put("threadpool." + threadPoolName + ".size", 128) + .put("threadpool." + threadPoolName + ".keep_alive", "1ms") + .build(); + runScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < 128; i++) { + threadPool.executor(threadPoolName).execute(() -> { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + final int active = stats(threadPool, threadPoolName).getThreads(); + assertThat(active, equalTo(128)); + latch.countDown(); + do { + spinForAtLeastOneMillisecond(); + } while (stats(threadPool, threadPoolName).getThreads() > 4); + assertThat(stats(threadPool, threadPoolName).getCompleted(), equalTo(128L)); + })); + } + + public void testDynamicThreadPoolSize() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final Executor beforeExecutor = threadPool.executor(threadPoolName); + int expectedMin = "generic".equals(threadPoolName) ? 4 : 1; + final int size = randomIntBetween(expectedMin, Integer.MAX_VALUE); + clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".size", size)); + final Executor afterExecutor = threadPool.executor(threadPoolName); + assertSame(beforeExecutor, afterExecutor); + final ThreadPool.Info info = info(threadPool, threadPoolName); + assertThat(info.getMin(), equalTo(expectedMin)); + assertThat(info.getMax(), equalTo(size)); + + assertThat(afterExecutor, instanceOf(EsThreadPoolExecutor.class)); + final EsThreadPoolExecutor executor = (EsThreadPoolExecutor)afterExecutor; + assertThat(executor.getCorePoolSize(), equalTo(expectedMin)); + assertThat(executor.getMaximumPoolSize(), equalTo(size)); + }); + } + + public void testResizingScalingThreadPoolQueue() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final int size = randomIntBetween(1, Integer.MAX_VALUE); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".queue_size", size))); + assertThat(e, hasToString( + "java.lang.IllegalArgumentException: thread pool [" + threadPoolName + + "] of type scaling can not have its queue re-sized but was [" + + size + "]")); + }); + } + + public void runScalingThreadPoolTest( + final Settings settings, + final BiConsumer consumer) throws InterruptedException { + ThreadPool threadPool = null; + try { + final String test = Thread.currentThread().getStackTrace()[2].getMethodName(); + final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build(); + threadPool = new ThreadPool(nodeSettings); + final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + threadPool.setClusterSettings(clusterSettings); + consumer.accept(clusterSettings, threadPool); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + private static Settings settings(final String setting, final int value) { + return settings(setting, Integer.toString(value)); + } + + private static Settings settings(final String setting, final String value) { + return Settings.builder().put(setting, value).build(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java new file mode 100644 index 0000000000000..daad1a51a085b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class ThreadPoolTests extends ESTestCase { + + public void testBoundedByBelowMin() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(Integer.MIN_VALUE, min - 1); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(min)); + } + + public void testBoundedByAboveMax() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(max + 1, Integer.MAX_VALUE); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(max)); + } + + public void testBoundedByBetweenMinAndMax() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(min, max); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index a3c4e46892d6a..43e8e7e7af578 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool.Names; import java.lang.reflect.Field; @@ -46,7 +45,7 @@ /** */ -public class UpdateThreadPoolSettingsTests extends ESTestCase { +public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase { public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException { String threadPoolName = randomThreadPoolName(); @@ -162,56 +161,6 @@ public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedExc } } - public void testCachedExecutorType() throws InterruptedException { - String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED); - ThreadPool threadPool = null; - try { - Settings nodeSettings = Settings.builder() - .put("node.name", "testCachedExecutorType").build(); - threadPool = new ThreadPool(nodeSettings); - ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - threadPool.setClusterSettings(clusterSettings); - - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - - Settings settings = clusterSettings.applySettings(Settings.builder() - .put("threadpool." + threadPoolName + ".keep_alive", "10m") - .build()); - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0)); - // Make sure keep alive value changed - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Make sure keep alive value reused - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - - // Change keep alive - Executor oldExecutor = threadPool.executor(threadPoolName); - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); - // Make sure keep alive value changed - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); - - // Set the same keep alive - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); - // Make sure keep alive value didn't change - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); - } finally { - terminateThreadPoolIfNeeded(threadPool); - } - } - private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings)); @@ -273,7 +222,7 @@ public void testFixedExecutorType() throws InterruptedException { assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); // Change queue capacity - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500") + clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500") .build()); } finally { terminateThreadPoolIfNeeded(threadPool); @@ -290,9 +239,11 @@ public void testScalingExecutorType() throws InterruptedException { threadPool = new ThreadPool(nodeSettings); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); threadPool.setClusterSettings(clusterSettings); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1)); + final int expectedMinimum = "generic".equals(threadPoolName) ? 4 : 1; + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedMinimum)); assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L)); + final long expectedKeepAlive = "generic".equals(threadPoolName) ? 30 : 300; + assertThat(info(threadPool, threadPoolName).getKeepAlive().seconds(), equalTo(expectedKeepAlive)); assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); @@ -358,6 +309,9 @@ public void testCustomThreadPool() throws Exception { try { Settings nodeSettings = Settings.builder() .put("threadpool.my_pool1.type", "scaling") + .put("threadpool.my_pool1.min", 1) + .put("threadpool.my_pool1.size", EsExecutors.boundedNumberOfProcessors(Settings.EMPTY)) + .put("threadpool.my_pool1.keep_alive", "1m") .put("threadpool.my_pool2.type", "fixed") .put("threadpool.my_pool2.size", "1") .put("threadpool.my_pool2.queue_size", "1") @@ -429,21 +383,6 @@ public void testCustomThreadPool() throws Exception { } } - private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException { - if (threadPool != null) { - terminate(threadPool); - } - } - - private ThreadPool.Info info(ThreadPool threadPool, String name) { - for (ThreadPool.Info info : threadPool.info()) { - if (info.getName().equals(name)) { - return info; - } - } - return null; - } - private String randomThreadPoolName() { Set threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet(); return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()])); @@ -456,7 +395,4 @@ private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoo return randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); } - private String randomThreadPool(ThreadPool.ThreadPoolType type) { - return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(Map.Entry::getKey).collect(Collectors.toList())); - } } diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 199b6a9b88c08..fe9900b21f7b6 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -11,7 +11,7 @@ There are several thread pools, but the important ones include: `generic`:: For generic operations (e.g., background node discovery). - Thread pool type is `cached`. + Thread pool type is `scaling`. `index`:: For index/delete operations. Thread pool type is `fixed` @@ -72,26 +72,6 @@ NOTE: you can update thread pool settings dynamically using <> thread pool. - -The `keep_alive` parameter determines how long a thread should be kept -around in the thread pool without doing any work. - -[source,js] --------------------------------------------------- -threadpool: - generic: - keep_alive: 2m --------------------------------------------------- - [float] ==== `fixed` @@ -118,9 +98,9 @@ threadpool: [float] ==== `scaling` -The `scaling` thread pool holds a dynamic number of threads. This number is -proportional to the workload and varies between 1 and the value of the -`size` parameter. +The `scaling` thread pool holds a dynamic number of threads. This +number is proportional to the workload and varies between the value of +the `min` and `size` parameters. The `keep_alive` parameter determines how long a thread should be kept around in the thread pool without it doing any work. @@ -129,6 +109,7 @@ around in the thread pool without it doing any work. -------------------------------------------------- threadpool: warmer: + min: 1 size: 8 keep_alive: 2m -------------------------------------------------- diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index dc16bec7c19da..3199a27b9a596 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -314,7 +314,7 @@ public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); + executor = EsExecutors.newScaling("test runner", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); } public static String configuredNodeMode() {