Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actually bound the generic thread pool #17017

Merged
merged 5 commits into from
Apr 26, 2016
Merged

Actually bound the generic thread pool #17017

merged 5 commits into from
Apr 26, 2016

Conversation

jasontedor
Copy link
Member

@jasontedor jasontedor commented Mar 8, 2016

This commit actually bounds the size of the generic thread pool. The
generic thread pool was of type cached, a thread pool with an unbounded
number of workers and an unbounded work queue. With this commit, the
generic thread pool is now of type scaling. As such, the cached thread
pool type has been removed. By default, the generic thread pool is
constructed with a core pool size of four, a max pool size of 128 and
idle workers can be reaped after a keep-alive time of thirty seconds
expires. The work queue for this thread pool remains unbounded.

@jasontedor
Copy link
Member Author

@bleskes Can you please review very carefully? 😄

@@ -66,6 +67,12 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon
return executor;
}

public static EsThreadPoolExecutor newBoundedCached(String name, int poolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, poolSize, poolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory, new EsAbortPolicy(), contextHolder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you decide to change the queue type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we can't use the EsAbortPolicy - we may never reject. We should use ForceQueuePolicy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you decide to change the queue type?

@bleskes The synchronous queue that was used previously will block producers until there is a consumer available; it is basically a handoff queue.

When the generic thread pool was unbounded, this property did not matter because if a producer tried to submit to the queue when all the consumers were occupied, a new consumer would be created and thus the queue backing the generic thread pool was effectively non-blocking (a producer would only have to wait as long as it took to spin up a new consumer).

However, when we bound the generic thread pool, this property becomes very important because we do not want to block producers.

A linked blocking queue does not have this property; producers will not block waiting for a consumer.

Thus, the switch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we can't use the EsAbortPolicy - we may never reject. We should use ForceQueuePolicy

@bleskes I think that we should use a caller runs policy. The only case in which the rejected execution policy matters is if the queue fills up. But the queue has capacity of Integer.MAX_VALUE; if we ever hit that we are in deep trouble and a force queue policy will not help (the queue is full, no amount of forcing is going to get the task in the queue). Thus, caller runs.

@bleskes
Copy link
Contributor

bleskes commented Mar 10, 2016

I started going through this again and looking at the cached type now, I think we can get rid of it all together and just use scaling (and other defaults). I'm not even sure we need to add setting validation - the generic thread is not different from any other scaling queue - if you mess up the setting you're in big trouble.. I will go along if you want to keep those restriction but I'm afraid it will feel artificial.

@jasontedor
Copy link
Member Author

I started going through this again and looking at the cached type now, I think we can get rid of it all together and just use scaling (and other defaults).

@bleskes I agree and I pushed 51ecabe13de8c7de674d4837277c4fe8290d60e1 to remove the cached thread pool type.

@@ -60,16 +61,11 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name,

public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment about using the ForceQueuePolicy capturing our research? will be a shame to have to do it again :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How I wish I wrote down what we found out at the time. Now that I look at the ExecutorScalingQueue, I see:

@Override
        public boolean offer(E e) {
            if (!tryTransfer(e)) {
                int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); <-- **** 
                if (left > 0) {
                    return false;
                } else {
                    return super.offer(e);
                }
            } else {
                return true;
            }
        }

The place I marked with *** feels weird as it effectively returns false all the time, which is fine due to the fact that we want to prefer adding threads first and then, if it fails queue. Which is why we need the rejection policy. My guess is that this there for the case where people make a "fixed" thread pool out of a scaling one but having min==max (feels like an unneeded optimization to me, btw).

Do you agree, and if so, can we add a comment documenting?

Copy link
Member Author

@jasontedor jasontedor Apr 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree, and if so, can we add a comment documenting?

Yes, your explanation is basically correct, except I don't agree with the part about it being there for the case when people make a fixed thread pool out of a scaling one. It's there for the opposite case, when there could be spare capacity in the thread pool. It's because the JVM prefers queueing to creating new worker threads when the thread pool has reached core pool size that this is needed.

I added comments explaining the behavior.

add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m"));
int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).max(genericThreadPoolMax).keepAlive("30s"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we change the min to 4 , while it used to be 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about this too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes @ywelsch Always keeping some threads alive will help minimize latency from thread creation.

Each cached thread pool has a core pool size (the min) and a maximum pool size. When fewer than min worker threads are running, the JVM will create new threads. When min worker threads are running, the JVM prefers to queue requests rather than creating new worker threads. The JVM will only resort to creating creating new worker threads above min if the queue saturates.

This also explains why we use the ExecutorScalingQueue. We first try to transfer to an active thread. If there is not one, we see if the pool has excess capacity. If it does, then we fake that the queue rejects the task to force the JVM to create a new worker thread rather than queuing the task. We only queue if there is no excess capacity in the thread pool.

});
}

public void testGenericThreadPoolIsBounded() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we test here are properties of the scaling thread pool, not only the one named generic? Can we generalize these tests?

Copy link
Member Author

@jasontedor jasontedor Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's an artifact how initially the generic thread pool was the only cached thread pool (and thus separate from scaling) but during the evolution of this PR the cached thread pool was removed and the generic thread pool became just another scaling thread pool.

@jasontedor
Copy link
Member Author

@ywelsch I have updated this pull request as we discussed and I think that this ready for your review.

@jasontedor jasontedor assigned ywelsch and unassigned bleskes Apr 24, 2016
if (!tryTransfer(e)) {
// check if there might be spare capacity in the thread
// pool executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is what is happening here. AFAICS the expression executor.getMaximumPoolSize() - executor.getCorePoolSize() is fixed once we have constructed the threadpool and only changed by setCorePoolSize() and setMaximumPoolSize().

What I think this code effectively does is to behave like a fixed size thread pool with unbounded queue in case max pool size is configured to be equal to core pool size (which is what @bleskes was essentially saying, I guess). If that's the case, I also concur with @bleskes that we should get rid of this case distinction. This means that we should require min < max for scaling threadpool and otherwise require to explicitly specify fixed thread pool.

Copy link
Member Author

@jasontedor jasontedor Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS the expression executor.getMaximumPoolSize() - executor.getCorePoolSize() is fixed once we have constructed the threadpool

@ywelsch This is correct and was already taken into consideration in my assessment of the situation.

From java.util.concurrent.ThreadPoolExecutor#execute:

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

The first thing that the ThreadPoolExecutor does is checks if there are fewer than core pool size threads. If there are, it tries to add a new worker to handle the submitted task. If this fails (it's racy!), ThreadPoolExecutor then tries to place the task on the queue. This is where we turn to the ExecutorScalingQueue:

        public boolean offer(E e) {
            if (!tryTransfer(e)) {
                int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
                if (left > 0) {
                    return false;
                } else {
                    return super.offer(e);
                }
            } else {
                return true;
            }
        }

We first try to transfer the task to an available worker thread. If this fails, we come to the code in question. We check if there might be spare capacity in the thread pool, which can only be the case if the thread pool was configured with max pool size greater than core pool size. If there might be spare capacity, we return false. This causes the ThreadPoolExecutor to create a new worker thread, even if core pool size threads are already running. Again, the ThreadPoolExecutor prefers queueing to creating new workers when there are core pool size threads running, and will only resort to creating a new thread above core pool size threads if the queue is saturated which we faked by returning false. If this fails (again, it's racy), ThreadPoolExecutor will reject the task. This is where the ForceQueuePolicy rejection handler comes into play to give us a chance to just drop this on the queue.

I continue to disagree with your and @bleskes assessment of the situation. It is not an optimization, it is a trick for the common case to make the ThreadPoolExecutor spin up threads all the way up to max pool size instead of queuing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're just missing each other's point. I wasn't arguing for getting rid of ExecutorScalingQueue. I just think that a scaling threadpool should be always defined with min < max. This would mean that the condition if (left > 0) { can be removed and we end up with:

public boolean offer(E e) {
    return tryTransfer(e);
}

Copy link
Member Author

@jasontedor jasontedor Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think that a scaling threadpool should be always defined with min < max.

I'm not sure that this is the right thing to do. Consider the refresh thread pool with its default settings of min = 1 and max = half the number of processors (with a ceiling of 10); on a two-processor system this will have min = 1, max = 1 and maybe that's the right thing? Either way, we should address this in a follow-up.

This commit actually bounds the size of the generic thread pool. The
generic thread pool was of type cached, a thread pool with an unbounded
number of workers and an unbounded work queue. With this commit, the
generic thread pool is now of type scaling. As such, the cached thread
pool type has been removed. By default, the generic thread pool is
constructed with a core pool size of four, a max pool size of 128 and
idle workers can be reaped after a keep-alive time of thirty seconds
expires. The work queue for this thread pool remains unbounded.
This commit removes two unused imports and applies a few other
formatting cleanups to EsExecutors.java.
This commit clarifies an error message that is produced when an attempt
is made to resize the backing queue for a scaling executor. As this
queue is unbounded, resizing the backing queue does not make sense. The
clarification here is to specify that this restriction is because the
executor is a scaling executor.
This commit expands the configuration test for scaling executors to
include the case where the min pool size is set to zero.
@jasontedor
Copy link
Member Author

@ywelsch I think this is ready for another review cycle.


final int expectedSize;
if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) {
expectedSize = randomIntBetween(min + 1, 16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also test size == min?

@ywelsch
Copy link
Contributor

ywelsch commented Apr 26, 2016

Left two minor comments. Feel free to push after addressing them. Thanks @jasontedor, LGTM.

This commit slightly expands the scaling thread pool configuration test
coverage. In particular, the test testScalingThreadPoolConfiguration is
expanded to include the case when min is equal to size, and the test
testDynamicThreadPoolSize is expanded to include all possible cases when
size is greater than or equal to min.
@jasontedor jasontedor merged commit efeec4d into elastic:master Apr 26, 2016
@jasontedor jasontedor deleted the generic-thread-pool branch April 26, 2016 12:27
@jasontedor
Copy link
Member Author

Thanks for a helpful and thorough review @ywelsch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants