Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unsubscription of HystrixCommands #1204

Merged
merged 5 commits into from
May 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void setUp() throws Exception {
public void testGetUserByIdSuccess() {
// blocking
Observable<User> observable = userService.getUser("1", "name: ");
assertObservableExecutionMode(observable, ObservableExecutionMode.EAGER);
assertEquals("name: 1", observable.toBlocking().single().getName());

// non-blocking
Expand Down Expand Up @@ -96,7 +95,6 @@ public void call(User user) {
public void testGetUserWithRegularFallback() {
final User exUser = new User("def", "def");
Observable<User> userObservable = userService.getUserRegularFallback(" ", "");
assertObservableExecutionMode(userObservable, ObservableExecutionMode.LAZY);
// blocking
assertEquals(exUser, userObservable.toBlocking().single());
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
Expand Down Expand Up @@ -127,7 +125,6 @@ public void testGetUserWithRxCommandFallback() {

// blocking
Observable<User> userObservable = userService.getUserRxCommandFallback(" ", "");
assertObservableExecutionMode(userObservable, ObservableExecutionMode.LAZY);
assertEquals(exUser, userObservable.toBlocking().single());
assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
com.netflix.hystrix.HystrixInvokableInfo getUserRxCommandFallback = getHystrixCommandByKey("getUserRxCommandFallback");
Expand All @@ -140,20 +137,6 @@ public void testGetUserWithRxCommandFallback() {
}


private static void assertObservableExecutionMode(Observable observable, ObservableExecutionMode mode) {
// todo find better way to figure it out
boolean eager = observable instanceof ReplaySubject;
if (ObservableExecutionMode.EAGER == mode) {
if (!eager) {
throw new AssertionError("observable must be instance of ReplaySubject");
}
} else {
if (eager) {
throw new AssertionError("observable must not be instance of ReplaySubject");
}
}
}

public static class UserService {

private User regularFallback(String id, String name) {
Expand Down
964 changes: 475 additions & 489 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ private ExecutionResult(EventCounts eventCounts, long startTimestamp, int execut
public static ExecutionResult from(HystrixEventType... eventTypes) {
boolean didExecutionOccur = false;
for (HystrixEventType eventType: eventTypes) {
/*if (isFallbackEvent(eventType)) {

}*/
if (didExecutionOccur(eventType)) {
didExecutionOccur = true;
}
Expand All @@ -217,10 +214,16 @@ private static boolean didExecutionOccur(HystrixEventType eventType) {
case FAILURE: return true;
case BAD_REQUEST: return true;
case TIMEOUT: return true;
case CANCELLED: return true;
default: return false;
}
}

public ExecutionResult setExecutionOccurred() {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, true, isExecutedInThread, collapserKey);
}

public ExecutionResult setExecutionLatency(int executionLatency) {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, isExecutedInThread, collapserKey);
Expand All @@ -246,6 +249,11 @@ public ExecutionResult setExecutedInThread() {
failedExecutionException, executionException, executionOccurred, true, collapserKey);
}

public ExecutionResult setNotExecutedInThread() {
return new ExecutionResult(eventCounts, startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, false, collapserKey);
}

public ExecutionResult markCollapsed(HystrixCollapserKey collapserKey, int sizeOfBatch) {
return new ExecutionResult(eventCounts.plus(HystrixEventType.COLLAPSED, sizeOfBatch), startTimestamp, executionLatency, userThreadLatency,
failedExecutionException, executionException, executionOccurred, isExecutedInThread, collapserKey);
Expand All @@ -270,14 +278,14 @@ public ExecutionResult markUserThreadCompletion(long userThreadLatency) {
public ExecutionResult addEvent(HystrixEventType eventType) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
}

public ExecutionResult addEvent(int executionLatency, HystrixEventType eventType) {
if (startTimestamp >= 0 && !isResponseRejected()) {
return new ExecutionResult(eventCounts.plus(eventType), startTimestamp, executionLatency,
userThreadLatency, failedExecutionException, executionException,
executionOccurred ? executionOccurred : didExecutionOccur(eventType), isExecutedInThread, collapserKey);
executionOccurred, isExecutedInThread, collapserKey);
} else {
return addEvent(eventType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.netflix.hystrix;

import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class HystrixCachedObservable<R> {
final AbstractCommand<R> originalCommand;
final Observable<R> cachedObservable;
final Subscription originalSubscription;
final ReplaySubject<R> replaySubject = ReplaySubject.create();
final AtomicInteger outstandingSubscriptions = new AtomicInteger(0);

/* package-private */ HystrixCachedObservable(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
this.originalSubscription = originalObservable
.subscribe(replaySubject);

this.cachedObservable = replaySubject
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (outstandingSubscriptions.decrementAndGet() == 0) {
originalSubscription.unsubscribe();
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions.getAndIncrement();
}
});
this.originalCommand = originalCommand;
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
return new HystrixCachedObservable<R>(o, originalCommand);
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixCollapser<?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixObservableCollapser<?, ?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
}

public Observable<R> toObservable() {
return cachedObservable;
}

public Observable<R> toObservable(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);

return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandUnsubscribed(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
});
}

private void commandCompleted(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = originalCommand.executionResult;
}

private void commandUnsubscribed(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.addEvent(HystrixEventType.CANCELLED);
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.setExecutionLatency(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
if (isRequestCacheEnabled) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
metrics.markResponseFromCache();
return fromCache;
return fromCache.toObservable();
}
}

Expand All @@ -396,12 +396,12 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
Observable<ResponseType> o = response.cache();
Observable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
if (fromCache == null) {
response = o;
return toCache.toObservable();
} else {
response = fromCache;
return fromCache.toObservable();
}
}
return response;
Expand Down
27 changes: 11 additions & 16 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import rx.functions.Func0;

/**
* Used to wrap code that will execute potentially risky functionality (typically meaning a service call over the network)
Expand Down Expand Up @@ -285,35 +286,29 @@ protected R getFallback() {

@Override
final protected Observable<R> getExecutionObservable() {
return Observable.create(new OnSubscribe<R>() {

return Observable.defer(new Func0<Observable<R>>() {
@Override
public void call(Subscriber<? super R> s) {
public Observable<R> call() {
try {
s.onNext(run());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}

});
}

@Override
final protected Observable<R> getFallbackObservable() {
return Observable.create(new OnSubscribe<R>() {

return Observable.defer(new Func0<Observable<R>>() {
@Override
public void call(Subscriber<? super R> s) {
public Observable<R> call() {
try {
s.onNext(getFallback());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,20 @@ public int getCurrentConcurrentExecutionCount() {
*
* This metrics should measure the actual health of a {@link HystrixCommand}. For that reason, the following are included:
* <p><ul>
* <li>{@link HystrixRollingNumberEvent#SUCCESS}
* <li>{@link HystrixRollingNumberEvent#FAILURE}
* <li>{@link HystrixRollingNumberEvent#TIMEOUT}
* <li>{@link HystrixRollingNumberEvent#THREAD_POOL_REJECTED}
* <li>{@link HystrixRollingNumberEvent#SEMAPHORE_REJECTED}
* <li>{@link HystrixEventType#SUCCESS}
* <li>{@link HystrixEventType#FAILURE}
* <li>{@link HystrixEventType#TIMEOUT}
* <li>{@link HystrixEventType#THREAD_POOL_REJECTED}
* <li>{@link HystrixEventType#SEMAPHORE_REJECTED}
* </ul><p>
* The following are not included in either attempts/failures:
* <p><ul>
* <li>{@link HystrixRollingNumberEvent#BAD_REQUEST} - this event denotes bad arguments to the command and not a problem with the command
* <li>{@link HystrixRollingNumberEvent#SHORT_CIRCUITED} - this event measures a health problem in the past, not a problem with the current state
* <li>{@link HystrixEventType#BAD_REQUEST} - this event denotes bad arguments to the command and not a problem with the command
* <li>{@link HystrixEventType#SHORT_CIRCUITED} - this event measures a health problem in the past, not a problem with the current state
* <li>{@link HystrixEventType#CANCELLED} - this event denotes a user-cancelled command. It's not known if it would have been a success or failure, so it shouldn't count for either
* <li>All Fallback metrics
* <li>{@link HystrixRollingNumberEvent#EMIT} - this event is not a terminal state for the command
* <li>{@link HystrixRollingNumberEvent#COLLAPSED} - this event is about the batching process, not the command execution
* <li>{@link HystrixEventType#EMIT} - this event is not a terminal state for the command
* <li>{@link HystrixEventType#COLLAPSED} - this event is about the batching process, not the command execution
* </ul><p>
*
* @return {@link HealthCounts}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum HystrixEventType {
FALLBACK_MISSING(true),
EXCEPTION_THROWN(false),
RESPONSE_FROM_CACHE(true),
CANCELLED(true),
COLLAPSED(false);

private final boolean isTerminal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
if (isRequestCacheEnabled) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
metrics.markResponseFromCache();
return fromCache;
return fromCache.toObservable();
}
}

Expand All @@ -457,12 +457,12 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
Observable<ResponseType> o = response.cache();
Observable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
if (fromCache == null) {
response = o;
return toCache.toObservable();
} else {
response = fromCache;
return fromCache.toObservable();
}
}
return response;
Expand Down
Loading