Skip to content

Commit

Permalink
Fix for IllegalStateException on Thread Rejection with Response Caching
Browse files Browse the repository at this point in the history
With the additional logging I was able to write a unit test that replicated the issue and fix it.
Rejected exceptions were not being handled correctly.

Netflix#113
  • Loading branch information
benjchristensen committed Feb 28, 2013
1 parent 447095f commit 7b046ca
Showing 1 changed file with 215 additions and 22 deletions.
237 changes: 215 additions & 22 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,17 @@ public R execute() {
return getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited");
}

if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
try {
try {
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
return executeWithSemaphore();
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}

} catch (Exception e) {
Expand Down Expand Up @@ -504,12 +504,13 @@ private R executeWithSemaphore() {
* We don't throw an exception but just flip to synchronous execution so code doesn't need to change in order to switch a command from running on a separate thread to the calling thread.
*
* @return {@code Future<R>} Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Future.get()} in {@link ExecutionException#getCause()} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited or thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Future.get()} in {@link ExecutionException#getCause()} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited or thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Future.get()} in {@link ExecutionException#getCause()} if invalid arguments or state were used representing a user failure, not a system failure
*/
Expand Down Expand Up @@ -739,8 +740,8 @@ public R call() throws Exception {
}
}

// start execution
future.start();
// start execution and throw an exception if rejection occurs
future.start(true);

return future;
}
Expand Down Expand Up @@ -1336,6 +1337,7 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final AtomicBoolean actualFutureExecuted = new AtomicBoolean(false);
private volatile R result; // the result of the get()
private volatile ExecutionException executionException; // in case an exception is thrown
private volatile HystrixRuntimeException rejectedException;
private volatile Future<R> actualFuture = null;
private volatile boolean isInterrupted = false;
private final CountDownLatch futureStarted = new CountDownLatch(1);
Expand All @@ -1346,10 +1348,17 @@ public QueuedExecutionFuture(HystrixCommand<R> command, ThreadPoolExecutor execu
this.callable = callable;
}

private void start() {
start(false);
}

/**
* Start execution of Callable<K> on ThreadPoolExecutor
*
* @param throwIfRejected
* since we want an exception thrown in the main queue() path but not via cached responses
*/
private void start() {
private void start(boolean throwIfRejected) {
// make sure we only start once
if (started.compareAndSet(false, true)) {
try {
Expand All @@ -1363,11 +1372,22 @@ private void start() {
// mark on counter
metrics.markThreadPoolRejection();
// use a fallback instead (or throw exception if not implemented)
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e));
try {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e));
} catch (HystrixRuntimeException hre) {
actualFuture = asFuture(hre);
// store this so it can be thrown to queue()
rejectedException = hre;
}
} catch (Exception e) {
// unknown exception
logger.error(getLogMessagePrefix() + ": Unexpected exception while submitting to queue.", e);
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "had unexpected exception while attempting to queue for execution.", e));
try {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "had unexpected exception while attempting to queue for execution.", e));
} catch (HystrixRuntimeException hre) {
actualFuture = asFuture(hre);
throw hre;
}
} finally {
futureStarted.countDown();
}
Expand All @@ -1390,6 +1410,10 @@ private void start() {
actualFuture = asFuture(getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "Unexpected interruption while waiting on other thread submitting to queue.", e));
}
}

if (throwIfRejected && rejectedException != null) {
throw rejectedException;
}
}

/**
Expand Down Expand Up @@ -1420,6 +1444,7 @@ public R get(long timeout, TimeUnit unit) throws CancellationException, Interrup
* execute even though the application as a whole continues.
*/
}

if (executionException != null) {
throw executionException;
} else {
Expand Down Expand Up @@ -1456,12 +1481,12 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// we will countDown the latch and release threads
if (!started.get() || actualFuture == null) {
/**
* https://github.com/Netflix/Hystrix/issues/80
* https://github.com/Netflix/Hystrix/issues/113
*
* Output any extra information that can help tracking down how this failed
* as it most likely means there's a concurrency bug.
*/
throw new IllegalStateException("Future was not started. Key: "
throw new IllegalStateException("Response Not Available. Key: "
+ getCommandKey().name() + " ActualFuture: " + actualFuture
+ " Started: " + started.get() + " actualFutureExecuted: " + actualFutureExecuted.get()
+ " futureStarted: " + futureStarted.getCount()
Expand Down Expand Up @@ -1489,6 +1514,9 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// we can't capture this in execute/queue so we do it here
metrics.markExceptionThrown();
}
} catch (ExecutionException e) {
// if the actualFuture itself throws an ExcecutionException we want to capture it
executionException = e;
} finally {
// mark that we are done and other threads can proceed
actualResponseReceived.countDown();
Expand All @@ -1515,11 +1543,17 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Override
public boolean isCancelled() {
/* in case another thread got to this (via cache) before the constructing thread started it, we'll optimistically try to start it and start() will ensure only one time wins */
start();
/* now 'actualFuture' will be set to something */
return actualFuture.isCancelled();
}

@Override
public boolean isDone() {
/* in case another thread got to this (via cache) before the constructing thread started it, we'll optimistically try to start it and start() will ensure only one time wins */
start();
/* now 'actualFuture' will be set to something */
return actualFuture.isDone();
}

Expand Down Expand Up @@ -1571,6 +1605,42 @@ public ExecutionResult getExecutionResult() {
};
}

private Future<R> asFuture(final HystrixRuntimeException e) {
return new CommandFuture<R>() {

@Override
public boolean cancel(boolean arg0) {
return false;
}

@Override
public R get() throws InterruptedException, ExecutionException {
throw new ExecutionException(e);
}

@Override
public R get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public ExecutionResult getExecutionResult() {
return executionResult;
}

};
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* TryableSemaphore */
Expand Down Expand Up @@ -4174,6 +4244,76 @@ public void testRequestCacheOnTimeoutThrowsException() throws Exception {
assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

@Test
public void testRequestCacheOnThreadRejectionThrowsException() throws Exception {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
CountDownLatch completionLatch = new CountDownLatch(1);
RequestCacheThreadRejectionWithoutFallback r1 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r1: " + r1.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
assertTrue(r1.isResponseRejected());
// what we want
}

RequestCacheThreadRejectionWithoutFallback r2 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r2: " + r2.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
// e.printStackTrace();
assertTrue(r2.isResponseRejected());
// what we want
}

RequestCacheThreadRejectionWithoutFallback r3 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
Future<Boolean> f3 = r3.queue();
try {
System.out.println("f3: " + f3.get());
// we should have thrown an exception
fail("expected a rejection");
} catch (ExecutionException e) {
// e.printStackTrace();
assertTrue(r3.isResponseRejected());
// what we want
}

// let the command finish (only 1 should actually be blocked on this do to the response cache)
completionLatch.countDown();

// then another after the command has completed
RequestCacheThreadRejectionWithoutFallback r4 = new RequestCacheThreadRejectionWithoutFallback(circuitBreaker, completionLatch);
try {
System.out.println("r4: " + r4.execute());
// we should have thrown an exception
fail("expected a rejection");
} catch (HystrixRuntimeException e) {
// e.printStackTrace();
assertTrue(r4.isResponseRejected());
assertFalse(r4.isResponseFromFallback());
// what we want
}

assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(3, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));

assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage());

assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/**
* Test that we can do basic execution without a RequestVariable being initialized.
*/
Expand Down Expand Up @@ -5778,6 +5918,59 @@ public String getCacheKey() {
}
}

private static class RequestCacheThreadRejectionWithoutFallback extends TestHystrixCommand<Boolean> {

final CountDownLatch completionLatch;

public RequestCacheThreadRejectionWithoutFallback(TestCircuitBreaker circuitBreaker, CountDownLatch completionLatch) {
super(testPropsBuilder()
.setCircuitBreaker(circuitBreaker)
.setMetrics(circuitBreaker.metrics)
.setThreadPool(new HystrixThreadPool() {

@Override
public ThreadPoolExecutor getExecutor() {
return null;
}

@Override
public void markThreadExecution() {

}

@Override
public void markThreadCompletion() {

}

@Override
public boolean isQueueSpaceAvailable() {
// always return false so we reject everything
return false;
}

}));
this.completionLatch = completionLatch;
}

@Override
protected Boolean run() {
try {
if (completionLatch.await(1000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("timed out waiting on completionLatch");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}

@Override
public String getCacheKey() {
return "A";
}
}

private static class BadRequestCommand extends TestHystrixCommand<Boolean> {

public BadRequestCommand(TestCircuitBreaker circuitBreaker, ExecutionIsolationStrategy isolationType) {
Expand Down

0 comments on commit 7b046ca

Please sign in to comment.