Skip to content

Commit

Permalink
Merge pull request #1204 from mattrjacobs/add-cancelled-state
Browse files Browse the repository at this point in the history
Support unsubscription of HystrixCommands
  • Loading branch information
mattrjacobs committed May 13, 2016
2 parents dafbded + 77e9860 commit ee47e40
Show file tree
Hide file tree
Showing 18 changed files with 2,325 additions and 1,502 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void setUp() throws Exception {
public void testGetUserByIdSuccess() {
// blocking
Observable<User> observable = userService.getUser("1", "name: ");
assertObservableExecutionMode(observable, ObservableExecutionMode.EAGER);
assertEquals("name: 1", observable.toBlocking().single().getName());

// non-blocking
Expand Down Expand Up @@ -96,7 +95,6 @@ public void call(User user) {
public void testGetUserWithRegularFallback() {
final User exUser = new User("def", "def");
Observable<User> userObservable = userService.getUserRegularFallback(" ", "");
assertObservableExecutionMode(userObservable, ObservableExecutionMode.LAZY);
// blocking
assertEquals(exUser, userObservable.toBlocking().single());
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
Expand Down Expand Up @@ -127,7 +125,6 @@ public void testGetUserWithRxCommandFallback() {

// blocking
Observable<User> userObservable = userService.getUserRxCommandFallback(" ", "");
assertObservableExecutionMode(userObservable, ObservableExecutionMode.LAZY);
assertEquals(exUser, userObservable.toBlocking().single());
assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
com.netflix.hystrix.HystrixInvokableInfo getUserRxCommandFallback = getHystrixCommandByKey("getUserRxCommandFallback");
Expand All @@ -140,20 +137,6 @@ public void testGetUserWithRxCommandFallback() {
}


private static void assertObservableExecutionMode(Observable observable, ObservableExecutionMode mode) {
// todo find better way to figure it out
boolean eager = observable instanceof ReplaySubject;
if (ObservableExecutionMode.EAGER == mode) {
if (!eager) {
throw new AssertionError("observable must be instance of ReplaySubject");
}
} else {
if (eager) {
throw new AssertionError("observable must not be instance of ReplaySubject");
}
}
}

public static class UserService {

private User regularFallback(String id, String name) {
Expand Down
964 changes: 475 additions & 489 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ private ExecutionResult(EventCounts eventCounts, long startTimestamp, int execut
public static ExecutionResult from(HystrixEventType... eventTypes) {
boolean didExecutionOccur = false;
for (HystrixEventType eventType: eventTypes) {
/*if (isFallbackEvent(eventType)) {
}*/
if (didExecutionOccur(eventType)) {
didExecutionOccur = true;
}
Expand All @@ -217,10 +214,16 @@ private static boolean didExecutionOccur(HystrixEventType eventType) {
case FAILURE: return true;
case BAD_REQUEST: return true;
case TIMEOUT: return true;
case CANCELLED: return true;
default: return false;
}
}

public ExecutionResult setExecutionOccurred() {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, true, isExecutedInThread, collapserKey);
}

public ExecutionResult setExecutionLatency(int executionLatency) {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, isExecutedInThread, collapserKey);
Expand All @@ -246,6 +249,11 @@ public ExecutionResult setExecutedInThread() {
failedExecutionException, executionException, executionOccurred, true, collapserKey);
}

public ExecutionResult setNotExecutedInThread() {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, false, collapserKey);
}

public ExecutionResult markCollapsed(HystrixCollapserKey collapserKey, int sizeOfBatch) {
return new ExecutionResult(eventCounts.plus(HystrixEventType.COLLAPSED, sizeOfBatch), startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, isExecutedInThread, collapserKey);
Expand All @@ -270,14 +278,14 @@ public ExecutionResult markUserThreadCompletion(long userThreadLatency) {
public ExecutionResult addEvent(HystrixEventType eventType) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
}

public ExecutionResult addEvent(int executionLatency, HystrixEventType eventType) {
if (startTimestamp >= 0 && !isResponseRejected()) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
} else {
return addEvent(eventType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.netflix.hystrix;

import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class HystrixCachedObservable<R> {
final AbstractCommand<R> originalCommand;
final Observable<R> cachedObservable;
final Subscription originalSubscription;
final ReplaySubject<R> replaySubject = ReplaySubject.create();
final AtomicInteger outstandingSubscriptions = new AtomicInteger(0);

/* package-private */ HystrixCachedObservable(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
this.originalSubscription = originalObservable
.subscribe(replaySubject);

this.cachedObservable = replaySubject
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (outstandingSubscriptions.decrementAndGet() == 0) {
originalSubscription.unsubscribe();
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions.getAndIncrement();
}
});
this.originalCommand = originalCommand;
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
return new HystrixCachedObservable<R>(o, originalCommand);
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixCollapser<?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixObservableCollapser<?, ?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
}

public Observable<R> toObservable() {
return cachedObservable;
}

public Observable<R> toObservable(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);

return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandUnsubscribed(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
});
}

private void commandCompleted(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = originalCommand.executionResult;
}

private void commandUnsubscribed(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.addEvent(HystrixEventType.CANCELLED);
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.setExecutionLatency(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
if (isRequestCacheEnabled) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
metrics.markResponseFromCache();
return fromCache;
return fromCache.toObservable();
}
}

Expand All @@ -396,12 +396,12 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
Observable<ResponseType> o = response.cache();
Observable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
if (fromCache == null) {
response = o;
return toCache.toObservable();
} else {
response = fromCache;
return fromCache.toObservable();
}
}
return response;
Expand Down
27 changes: 11 additions & 16 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import rx.functions.Func0;

/**
* Used to wrap code that will execute potentially risky functionality (typically meaning a service call over the network)
Expand Down Expand Up @@ -285,35 +286,29 @@ protected R getFallback() {

@Override
final protected Observable<R> getExecutionObservable() {
return Observable.create(new OnSubscribe<R>() {

return Observable.defer(new Func0<Observable<R>>() {
@Override
public void call(Subscriber<? super R> s) {
public Observable<R> call() {
try {
s.onNext(run());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}

});
}

@Override
final protected Observable<R> getFallbackObservable() {
return Observable.create(new OnSubscribe<R>() {

return Observable.defer(new Func0<Observable<R>>() {
@Override
public void call(Subscriber<? super R> s) {
public Observable<R> call() {
try {
s.onNext(getFallback());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,20 @@ public int getCurrentConcurrentExecutionCount() {
*
* This metrics should measure the actual health of a {@link HystrixCommand}. For that reason, the following are included:
* <p><ul>
* <li>{@link HystrixRollingNumberEvent#SUCCESS}
* <li>{@link HystrixRollingNumberEvent#FAILURE}
* <li>{@link HystrixRollingNumberEvent#TIMEOUT}
* <li>{@link HystrixRollingNumberEvent#THREAD_POOL_REJECTED}
* <li>{@link HystrixRollingNumberEvent#SEMAPHORE_REJECTED}
* <li>{@link HystrixEventType#SUCCESS}
* <li>{@link HystrixEventType#FAILURE}
* <li>{@link HystrixEventType#TIMEOUT}
* <li>{@link HystrixEventType#THREAD_POOL_REJECTED}
* <li>{@link HystrixEventType#SEMAPHORE_REJECTED}
* </ul><p>
* The following are not included in either attempts/failures:
* <p><ul>
* <li>{@link HystrixRollingNumberEvent#BAD_REQUEST} - this event denotes bad arguments to the command and not a problem with the command
* <li>{@link HystrixRollingNumberEvent#SHORT_CIRCUITED} - this event measures a health problem in the past, not a problem with the current state
* <li>{@link HystrixEventType#BAD_REQUEST} - this event denotes bad arguments to the command and not a problem with the command
* <li>{@link HystrixEventType#SHORT_CIRCUITED} - this event measures a health problem in the past, not a problem with the current state
* <li>{@link HystrixEventType#CANCELLED} - this event denotes a user-cancelled command. It's not known if it would have been a success or failure, so it shouldn't count for either
* <li>All Fallback metrics
* <li>{@link HystrixRollingNumberEvent#EMIT} - this event is not a terminal state for the command
* <li>{@link HystrixRollingNumberEvent#COLLAPSED} - this event is about the batching process, not the command execution
* <li>{@link HystrixEventType#EMIT} - this event is not a terminal state for the command
* <li>{@link HystrixEventType#COLLAPSED} - this event is about the batching process, not the command execution
* </ul><p>
*
* @return {@link HealthCounts}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum HystrixEventType {
FALLBACK_MISSING(true),
EXCEPTION_THROWN(false),
RESPONSE_FROM_CACHE(true),
CANCELLED(true),
COLLAPSED(false);

private final boolean isTerminal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
if (isRequestCacheEnabled) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
metrics.markResponseFromCache();
return fromCache;
return fromCache.toObservable();
}
}

Expand All @@ -457,12 +457,12 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
Observable<ResponseType> o = response.cache();
Observable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
if (fromCache == null) {
response = o;
return toCache.toObservable();
} else {
response = fromCache;
return fromCache.toObservable();
}
}
return response;
Expand Down
Loading

0 comments on commit ee47e40

Please sign in to comment.