Skip to content

Commit

Permalink
Race condition exists when constructing a HystrixThreadPool where the…
Browse files Browse the repository at this point in the history
… ThreadPoolExecutor used by HystrixThreadPool.Factory is not the same instance as the one that is associated with the HystrixMetricsPublisherThreadPool.

This causes a disconnect between the ThreadPoolExecutor's metrics and those supplied by HystrixMetricsThreadPool.

Now the ThreadPoolExecutor itself is retrieved from the HystrixMetricsThreadPool object, because it is protected behind the pattern of a ConcurrentMap#putIfAbsent. This seemed much more difficult to do with the ConcurrencyStrategy
as then any further custom implementations would not have similar concurrency guarantees.

A test has been added to show that the ThreadPoolExecutor construct is indeed correct and the same as the one associated with the metrics publisher. It was a bit difficult to construct a test to prove the bad case and good case at the same time. Once the solution was in place, I had to turn the failing test case into a validation test case.

In addition this exposed some static state that was kept around between some tests that needed to be cleaned up. In particular the HystrixMetricsPublisherFactory has a singleton object that needed to be reset (even though it's a singleton) in order to validate the test. I'm not particularly happy with this approach, so I'd be happy for any help here.

Conflicts:
	hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java
  • Loading branch information
cgray authored and Matt Jacobs committed Dec 15, 2014
1 parent 8b0b71f commit 585d385
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static void reset(long time, TimeUnit unit) {
private static void _reset() {
// clear metrics
HystrixCommandMetrics.reset();
HystrixThreadPoolMetrics.reset();
// clear collapsers
HystrixCollapser.reset();
// clear circuit breakers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queue = concurrencyStrategy.getBlockingQueue(properties.maxQueueSize().get());
this.threadPool = concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue);
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, threadPool, properties);
this.metrics = HystrixThreadPoolMetrics.getInstance(
threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();
this.scheduler = new HystrixContextScheduler(concurrencyStrategy, this);

/* strategy: HystrixMetricsPublisherThreadPool */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public static Collection<HystrixThreadPoolMetrics> getInstances() {
return Collections.unmodifiableCollection(metrics.values());
}

/**
* Clears all state from metrics. If new requests come in instances will be recreated and metrics started from scratch.
*
*/
/* package */ static void reset() {
metrics.clear();
}

private final HystrixThreadPoolKey threadPoolKey;
private final HystrixRollingNumber counter;
private final ThreadPoolExecutor threadPool;
Expand All @@ -102,6 +110,15 @@ private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolE
this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets());
}

/**
* {@link ThreadPoolExecutor} this executor represents.
*
* @return ThreadPoolExecutor
*/
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}

/**
* {@link HystrixThreadPoolKey} these metrics represent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ public class HystrixMetricsPublisherFactory {

/**
* Get an instance of {@link HystrixMetricsPublisherThreadPool} with the given factory {@link HystrixMetricsPublisher} implementation for each {@link HystrixThreadPool} instance.
*
* @param metricsPublisher
* Implementation of {@link HystrixMetricsPublisher} to use.
* <p>
* See {@link HystrixMetricsPublisher} class header JavaDocs for precedence of how this is retrieved.
*
* @param threadPoolKey
* Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForThreadPool} implementation
* @param metrics
Expand Down Expand Up @@ -83,6 +79,23 @@ public static HystrixMetricsPublisherCommand createOrRetrievePublisherForCommand
return SINGLETON.getPublisherForCommand(commandKey, commandOwner, metrics, circuitBreaker, properties);
}

/**
* Resets the SINGLETON object.
*
*/
/* package */ static void reset() {
SINGLETON = new HystrixMetricsPublisherFactory();
}

/**
* Clears all state from publishers. If new requests come in instances will be recreated.
*
*/
/* package */ void _reset() {
commandPublishers.clear();
threadPoolPublishers.clear();
}

private final HystrixMetricsPublisher strategy;

private HystrixMetricsPublisherFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.hamcrest.core.Is.is;

import java.util.concurrent.TimeUnit;

import com.netflix.hystrix.HystrixThreadPool.Factory;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.HystrixPluginsTest;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactoryTest;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool;
import org.junit.Before;
import org.junit.Test;

import com.netflix.hystrix.HystrixThreadPool.Factory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HystrixThreadPoolTest {
@Before
public void setup() {
Hystrix.reset();
}

@Test
public void testShutdown() {
Expand Down Expand Up @@ -47,4 +60,50 @@ public void testShutdownWithWait() {
assertEquals(0, Factory.threadPools.size());
assertTrue(pool.getExecutor().isShutdown());
}

private static class HystrixMetricsPublisherThreadPoolContainer implements HystrixMetricsPublisherThreadPool {
private final HystrixThreadPoolMetrics hystrixThreadPoolMetrics;

private HystrixMetricsPublisherThreadPoolContainer(HystrixThreadPoolMetrics hystrixThreadPoolMetrics) {
this.hystrixThreadPoolMetrics = hystrixThreadPoolMetrics;
}

@Override
public void initialize() {
}

public HystrixThreadPoolMetrics getHystrixThreadPoolMetrics() {
return hystrixThreadPoolMetrics;
}
}

@Test
public void ensureThreadPoolInstanceIsTheOneRegisteredWithMetricsPublisherAndThreadPoolCache() throws IllegalAccessException, NoSuchFieldException {
new HystrixPluginsTest().reset();
HystrixPlugins.getInstance().registerMetricsPublisher(new HystrixMetricsPublisher() {
@Override
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
return new HystrixMetricsPublisherThreadPoolContainer(metrics);
}
});
new HystrixMetricsPublisherFactoryTest().reset();
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("threadPoolFactoryConcurrencyTest");
HystrixThreadPool poolOne = new HystrixThreadPool.HystrixThreadPoolDefault(
threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder());
HystrixThreadPool poolTwo = new HystrixThreadPool.HystrixThreadPoolDefault(
threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder());

assertThat(poolOne.getExecutor(), is(poolTwo.getExecutor())); //Now that we get the threadPool from the metrics object, this will always be equal
HystrixMetricsPublisherThreadPoolContainer hystrixMetricsPublisherThreadPool =
(HystrixMetricsPublisherThreadPoolContainer)HystrixMetricsPublisherFactory
.createOrRetrievePublisherForThreadPool(threadPoolKey, null, null);
ThreadPoolExecutor threadPoolExecutor = hystrixMetricsPublisherThreadPool.getHystrixThreadPoolMetrics().getThreadPool();

//assert that both HystrixThreadPools share the same ThreadPoolExecutor as the one in HystrixMetricsPublisherThreadPool
assertTrue(threadPoolExecutor.equals(poolOne.getExecutor()) && threadPoolExecutor.equals(poolTwo.getExecutor()));
assertFalse(threadPoolExecutor.isShutdown());

//Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which
//wins to be inserted into the HystrixThreadPool.Factory.threadPools cache.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;

import com.netflix.hystrix.HystrixCircuitBreaker;
Expand All @@ -17,6 +18,11 @@
import com.netflix.hystrix.HystrixThreadPoolProperties;

public class HystrixMetricsPublisherFactoryTest {
@Before
public void reset() {
HystrixMetricsPublisherFactory.reset();
}

/**
* Assert that we only call a publisher once for a given Command or ThreadPool key.
*/
Expand Down

0 comments on commit 585d385

Please sign in to comment.