-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Actually bound the generic thread pool #17017
Conversation
@bleskes Can you please review very carefully? 😄 |
@@ -66,6 +67,12 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon | |||
return executor; | |||
} | |||
|
|||
public static EsThreadPoolExecutor newBoundedCached(String name, int poolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { | |||
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, poolSize, poolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory, new EsAbortPolicy(), contextHolder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you decide to change the queue type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also we can't use the EsAbortPolicy - we may never reject. We should use ForceQueuePolicy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you decide to change the queue type?
@bleskes The synchronous queue that was used previously will block producers until there is a consumer available; it is basically a handoff queue.
When the generic thread pool was unbounded, this property did not matter because if a producer tried to submit to the queue when all the consumers were occupied, a new consumer would be created and thus the queue backing the generic thread pool was effectively non-blocking (a producer would only have to wait as long as it took to spin up a new consumer).
However, when we bound the generic thread pool, this property becomes very important because we do not want to block producers.
A linked blocking queue does not have this property; producers will not block waiting for a consumer.
Thus, the switch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also we can't use the EsAbortPolicy - we may never reject. We should use ForceQueuePolicy
@bleskes I think that we should use a caller runs policy. The only case in which the rejected execution policy matters is if the queue fills up. But the queue has capacity of Integer.MAX_VALUE
; if we ever hit that we are in deep trouble and a force queue policy will not help (the queue is full, no amount of forcing is going to get the task in the queue). Thus, caller runs.
I started going through this again and looking at the cached type now, I think we can get rid of it all together and just use scaling (and other defaults). I'm not even sure we need to add setting validation - the generic thread is not different from any other scaling queue - if you mess up the setting you're in big trouble.. I will go along if you want to keep those restriction but I'm afraid it will feel artificial. |
@bleskes I agree and I pushed 51ecabe13de8c7de674d4837277c4fe8290d60e1 to remove the cached thread pool type. |
@@ -60,16 +61,11 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, | |||
|
|||
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { | |||
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>(); | |||
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment about using the ForceQueuePolicy
capturing our research? will be a shame to have to do it again :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How I wish I wrote down what we found out at the time. Now that I look at the ExecutorScalingQueue, I see:
@Override
public boolean offer(E e) {
if (!tryTransfer(e)) {
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); <-- ****
if (left > 0) {
return false;
} else {
return super.offer(e);
}
} else {
return true;
}
}
The place I marked with ***
feels weird as it effectively returns false all the time, which is fine due to the fact that we want to prefer adding threads first and then, if it fails queue. Which is why we need the rejection policy. My guess is that this there for the case where people make a "fixed" thread pool out of a scaling one but having min==max (feels like an unneeded optimization to me, btw).
Do you agree, and if so, can we add a comment documenting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you agree, and if so, can we add a comment documenting?
Yes, your explanation is basically correct, except I don't agree with the part about it being there for the case when people make a fixed thread pool out of a scaling one. It's there for the opposite case, when there could be spare capacity in the thread pool. It's because the JVM prefers queueing to creating new worker threads when the thread pool has reached core pool size that this is needed.
I added comments explaining the behavior.
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); | ||
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m")); | ||
int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); | ||
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).max(genericThreadPoolMax).keepAlive("30s")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we change the min to 4 , while it used to be 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder about this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes @ywelsch Always keeping some threads alive will help minimize latency from thread creation.
Each cached thread pool has a core pool size (the min
) and a maximum pool size. When fewer than min
worker threads are running, the JVM will create new threads. When min
worker threads are running, the JVM prefers to queue requests rather than creating new worker threads. The JVM will only resort to creating creating new worker threads above min
if the queue saturates.
This also explains why we use the ExecutorScalingQueue
. We first try to transfer to an active thread. If there is not one, we see if the pool has excess capacity. If it does, then we fake that the queue rejects the task to force the JVM to create a new worker thread rather than queuing the task. We only queue if there is no excess capacity in the thread pool.
}); | ||
} | ||
|
||
public void testGenericThreadPoolIsBounded() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we test here are properties of the scaling thread pool, not only the one named generic
? Can we generalize these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that's an artifact how initially the generic thread pool was the only cached thread pool (and thus separate from scaling) but during the evolution of this PR the cached thread pool was removed and the generic thread pool became just another scaling thread pool.
@ywelsch I have updated this pull request as we discussed and I think that this ready for your review. |
if (!tryTransfer(e)) { | ||
// check if there might be spare capacity in the thread | ||
// pool executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is what is happening here. AFAICS the expression executor.getMaximumPoolSize() - executor.getCorePoolSize()
is fixed once we have constructed the threadpool and only changed by setCorePoolSize()
and setMaximumPoolSize()
.
What I think this code effectively does is to behave like a fixed size thread pool with unbounded queue in case max pool size is configured to be equal to core pool size (which is what @bleskes was essentially saying, I guess). If that's the case, I also concur with @bleskes that we should get rid of this case distinction. This means that we should require min < max for scaling threadpool and otherwise require to explicitly specify fixed thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS the expression
executor.getMaximumPoolSize() - executor.getCorePoolSize()
is fixed once we have constructed the threadpool
@ywelsch This is correct and was already taken into consideration in my assessment of the situation.
From java.util.concurrent.ThreadPoolExecutor#execute
:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
The first thing that the ThreadPoolExecutor
does is checks if there are fewer than core pool size threads. If there are, it tries to add a new worker to handle the submitted task. If this fails (it's racy!), ThreadPoolExecutor
then tries to place the task on the queue. This is where we turn to the ExecutorScalingQueue
:
public boolean offer(E e) {
if (!tryTransfer(e)) {
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
return false;
} else {
return super.offer(e);
}
} else {
return true;
}
}
We first try to transfer the task to an available worker thread. If this fails, we come to the code in question. We check if there might be spare capacity in the thread pool, which can only be the case if the thread pool was configured with max pool size greater than core pool size. If there might be spare capacity, we return false
. This causes the ThreadPoolExecutor
to create a new worker thread, even if core pool size threads are already running. Again, the ThreadPoolExecutor
prefers queueing to creating new workers when there are core pool size threads running, and will only resort to creating a new thread above core pool size threads if the queue is saturated which we faked by returning false
. If this fails (again, it's racy), ThreadPoolExecutor
will reject the task. This is where the ForceQueuePolicy
rejection handler comes into play to give us a chance to just drop this on the queue.
I continue to disagree with your and @bleskes assessment of the situation. It is not an optimization, it is a trick for the common case to make the ThreadPoolExecutor
spin up threads all the way up to max pool size instead of queuing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we're just missing each other's point. I wasn't arguing for getting rid of ExecutorScalingQueue
. I just think that a scaling threadpool should be always defined with min < max. This would mean that the condition if (left > 0) {
can be removed and we end up with:
public boolean offer(E e) {
return tryTransfer(e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just think that a scaling threadpool should be always defined with min < max.
I'm not sure that this is the right thing to do. Consider the refresh thread pool with its default settings of min = 1 and max = half the number of processors (with a ceiling of 10); on a two-processor system this will have min = 1, max = 1 and maybe that's the right thing? Either way, we should address this in a follow-up.
This commit actually bounds the size of the generic thread pool. The generic thread pool was of type cached, a thread pool with an unbounded number of workers and an unbounded work queue. With this commit, the generic thread pool is now of type scaling. As such, the cached thread pool type has been removed. By default, the generic thread pool is constructed with a core pool size of four, a max pool size of 128 and idle workers can be reaped after a keep-alive time of thirty seconds expires. The work queue for this thread pool remains unbounded.
This commit removes two unused imports and applies a few other formatting cleanups to EsExecutors.java.
This commit clarifies an error message that is produced when an attempt is made to resize the backing queue for a scaling executor. As this queue is unbounded, resizing the backing queue does not make sense. The clarification here is to specify that this restriction is because the executor is a scaling executor.
This commit expands the configuration test for scaling executors to include the case where the min pool size is set to zero.
@ywelsch I think this is ready for another review cycle. |
|
||
final int expectedSize; | ||
if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) { | ||
expectedSize = randomIntBetween(min + 1, 16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also test size == min
?
Left two minor comments. Feel free to push after addressing them. Thanks @jasontedor, LGTM. |
This commit slightly expands the scaling thread pool configuration test coverage. In particular, the test testScalingThreadPoolConfiguration is expanded to include the case when min is equal to size, and the test testDynamicThreadPoolSize is expanded to include all possible cases when size is greater than or equal to min.
Thanks for a helpful and thorough review @ywelsch! |
This commit actually bounds the size of the generic thread pool. The
generic thread pool was of type cached, a thread pool with an unbounded
number of workers and an unbounded work queue. With this commit, the
generic thread pool is now of type scaling. As such, the cached thread
pool type has been removed. By default, the generic thread pool is
constructed with a core pool size of four, a max pool size of 128 and
idle workers can be reaped after a keep-alive time of thirty seconds
expires. The work queue for this thread pool remains unbounded.