Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Race Condition on Timeout #256

Merged
merged 2 commits into from
May 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,26 @@
*/
package com.netflix.hystrix;

import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.lang.ref.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;

import rx.Notification;
import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.*;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.*;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.util.HystrixTimer;
import com.netflix.hystrix.util.*;
import com.netflix.hystrix.util.HystrixTimer.TimerListener;

/**
Expand All @@ -55,7 +44,7 @@
*
* @param <R>
* the return type
*
*
* @ThreadSafe
*/
public abstract class HystrixObservableCommand<R> extends HystrixExecutableBase<R> implements HystrixExecutable<R>, HystrixExecutableInfo<R> {
Expand Down Expand Up @@ -261,7 +250,7 @@ public Observable<R> toObservable() {
return toObservable(Schedulers.immediate());
}

protected ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout) {
protected ObservableCommand<R> toObservable(final Scheduler observeOn, final boolean performAsyncTimeout) {
/* this is a stateful object so can only be used once */
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
Expand Down Expand Up @@ -303,7 +292,7 @@ public void call(Subscriber<? super R> observer) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));

getRunObservableDecoratedForMetricsAndErrorHandling(observeOn)
getRunObservableDecoratedForMetricsAndErrorHandling(observeOn, performAsyncTimeout)
.doOnTerminate(new Action0() {

@Override
Expand Down Expand Up @@ -337,9 +326,6 @@ public void call() {
}
});

// wrap for timeout support
o = o.lift(new HystrixObservableTimeoutOperator<R>(_this, performAsyncTimeout));

// error handling at very end (this means fallback didn't exist or failed)
o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {

Expand Down Expand Up @@ -404,9 +390,7 @@ public void call() {
*
* @return R
*/
private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn) {
final HystrixObservableCommand<R> _cmd = this;

private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn, final boolean performAsyncTimeout) {
final HystrixObservableCommand<R> _self = this;
// allow tracking how many concurrent threads are executing
metrics.incrementConcurrentExecutionCount();
Expand Down Expand Up @@ -468,29 +452,22 @@ public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}

}).map(new Func1<R, R>() {
}).lift(new HystrixObservableTimeoutOperator<R>(_self, performAsyncTimeout)).map(new Func1<R, R>() {

@Override
public R call(R t1) {
return executionHook.onRunSuccess(_cmd, t1);
}
/**
* If we get here it means we did not timeout, otherwise it will skip this and go to onErrorResumeNext
*/

}).doOnCompleted(new Action0() {
// this must come before onErrorResumeNext as we only want successful onCompletes processed here
@Override
public void call() {
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
} else {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
metrics.markSuccess(duration);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
}
public R call(R t1) {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
metrics.markSuccess(duration);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
return executionHook.onRunSuccess(_self, t1);
}

}).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
Expand All @@ -499,11 +476,27 @@ public void call() {
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
if (e instanceof RejectedExecutionException) {
// mark on counter
/**
* Rejection handling
*/
metrics.markThreadPoolRejection();
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e);
} else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) {
/**
* Timeout handling
*/
Observable<R> v = getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
/*
* we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
return v.subscribeOn(new HystrixContextScheduler(concurrencyStrategy, Schedulers.computation()));
} else if (t instanceof HystrixBadRequestException) {
/**
* BadRequest handling
*/
try {
Exception decorated = executionHook.onRunError(_self, (Exception) t);

Expand All @@ -521,22 +514,16 @@ public Observable<R> call(Throwable t) {
*/
return Observable.error(t);
} else {
/**
* All other error handling
*/
try {
e = executionHook.onRunError(_self, e);
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.endRunFailure", hookException);
}

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// http://jira/browse/API-4905 HystrixCommand: Error/Timeout Double-count if both occur
// this means we have already timed out then we don't count this error stat and we just return
// as this means the user-thread has already returned, we've already done fallback logic
// and we've already counted the timeout stat
logger.debug("Error executing HystrixCommand.run() [TimedOut]. Proceeding to fallback logic ...", e);
return Observable.empty();
} else {
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);
}
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);

// report failure
metrics.markFailure(System.currentTimeMillis() - invocationStartTime);
Expand All @@ -557,7 +544,7 @@ public void call(Notification<? super R> n) {
@Override
public R call(R t1) {
// allow transforming the results via the executionHook whether it came from success or fallback
return executionHook.onComplete(_cmd, t1);
return executionHook.onComplete(_self, t1);
}

});
Expand Down Expand Up @@ -776,6 +763,12 @@ public HystrixObservableTimeoutOperator(final HystrixObservableCommand<R> origin
this.isNonBlocking = isNonBlocking;
}

public static class HystrixTimeoutException extends Exception {

private static final long serialVersionUID = 7460860948388895401L;

}

@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
Expand All @@ -790,17 +783,7 @@ public Subscriber<? super R> call(final Subscriber<? super R> child) {

@Override
public void run() {
try {
Observable<R> v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
/*
* we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).unsafeSubscribe(child);
} catch (HystrixRuntimeException re) {
child.onError(re);
}
child.onError(new HystrixTimeoutException());
}
});

Expand All @@ -818,11 +801,12 @@ public void tick() {
// we record execution time because we are returning before
originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime);

// shut down the original request
s.unsubscribe();

timeoutRunnable.run();
}

// shut down the original request
s.unsubscribe();
}

@Override
Expand Down Expand Up @@ -852,33 +836,42 @@ public int getIntervalTimeInMilliseconds() {
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);

/**
* If this subscriber receives values it means the parent succeeded/completed
*/
return new Subscriber<R>(s) {

@Override
public void onCompleted() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}

@Override
public void onNext(R v) {
// TODO does this need to compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.NOT_EXECUTED)
// to be thread-safe, and does that even work?
if (originalCommand.isCommandTimedOut.get().equals(TimedOutStatus.NOT_EXECUTED)) {
if (isNotTimedOut()) {
child.onNext(v);
}
}

private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}

};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.netflix.hystrix;
import org.junit.Test;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public class HystrixCommandTimeoutConcurrencyTesting {

@Test
public void testTimeoutRace() {
for (int i = 0; i < 2000; i++) {
String a = null;
String b = null;
try {
HystrixRequestContext.initializeContext();
a = new TestCommand().execute();
b = new TestCommand().execute();
if (a == null || b == null) {
System.err.println("Received NULL!");
throw new RuntimeException("Received NULL");
}

for (HystrixExecutableInfo<?> hi : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
if (hi.isResponseTimedOut() && hi.getExecutionEvents().size() == 1) {
System.err.println("Missing fallback status!");
throw new RuntimeException("Missing fallback status on timeout.");
}
}

} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
} finally {
System.out.println(a + " " + b + " ==> " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
HystrixRequestContext.getContextForCurrentThread().shutdown();
}
}

Hystrix.reset();
}

public static class TestCommand extends HystrixCommand<String> {

protected TestCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("testTimeoutConcurrency"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testTimeoutConcurrencyCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationThreadTimeoutInMilliseconds(1)));
}

@Override
protected String run() throws Exception {
// throw new RuntimeException("test");
// Thread.sleep(5);
return "hello";
}

@Override
protected String getFallback() {
return "failed";
}

}
}