Skip to content

Commit

Permalink
Merge pull request #218 from benjchristensen/hystrix-1-4-non-blocking…
Browse files Browse the repository at this point in the history
…-merge

Hystrix 1.4 - Async/Non-Blocking
  • Loading branch information
benjchristensen committed Mar 11, 2014
2 parents 678c0d3 + 005a344 commit 2118664
Show file tree
Hide file tree
Showing 53 changed files with 20,127 additions and 12,074 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@
(wait-for-observable o)))))
(testing "observes command with a Scheduler"
(let [o (observe-later-on (normalize base-def)
(rx.concurrency.Schedulers/newThread)
(rx.schedulers.Schedulers/newThread)
75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
Expand Down Expand Up @@ -317,7 +317,7 @@
(is (= 103 (wait-for-observable (observe #'my-fn-command 90 13))))
(is (= 105 (wait-for-observable (observe-later #'my-fn-command 91 14))))
(is (= 107 (wait-for-observable (observe-later-on #'my-fn-command
(rx.concurrency.Schedulers/newThread)
(rx.schedulers.Schedulers/newThread)
92 15)))))))

(defcollapser my-collapser
Expand Down
4 changes: 2 additions & 2 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.16.1'
compile 'com.netflix.rxjava:rxjava-core:0.17.0'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit-dep:4.10'
testCompile 'junit:junit-dep:4.10'
}

javadoc {
Expand Down
190 changes: 22 additions & 168 deletions hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.netflix.hystrix;

import static org.junit.Assert.*;

import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.hystrix.HystrixCommand.Setter;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import rx.functions.Action0;

/**
* Lifecycle management of Hystrix.
Expand Down Expand Up @@ -85,18 +81,35 @@ public static HystrixCommandKey getCurrentThreadExecutingCommand() {
return currentCommand.get().peek();
}

/* package */static void startCurrentThreadExecutingCommand(HystrixCommandKey key) {
/**
*
* @return Action0 to perform the same work as `endCurrentThreadExecutingCommand()` but can be done from any thread
*/
/* package */static Action0 startCurrentThreadExecutingCommand(HystrixCommandKey key) {
final LinkedList<HystrixCommandKey> list = currentCommand.get();
try {
currentCommand.get().push(key);
list.push(key);
} catch (Exception e) {
logger.warn("Unable to record command starting", e);
}
return new Action0() {

@Override
public void call() {
endCurrentThreadExecutingCommand(list);
}

};
}

/* package */static void endCurrentThreadExecutingCommand() {
endCurrentThreadExecutingCommand(currentCommand.get());
}

private static void endCurrentThreadExecutingCommand(LinkedList<HystrixCommandKey> list) {
try {
if (!currentCommand.get().isEmpty()) {
currentCommand.get().pop();
if (!list.isEmpty()) {
list.pop();
}
} catch (NoSuchElementException e) {
// this shouldn't be possible since we check for empty above and this is thread-isolated
Expand All @@ -106,163 +119,4 @@ public static HystrixCommandKey getCurrentThreadExecutingCommand() {
}
}

public static class UnitTest {
@Test
public void testNotInThread() {
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixThread() {

assertNull(getCurrentThreadExecutingCommand());

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CommandName"))) {

@Override
protected Boolean run() {
assertEquals("CommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

assertTrue(command.execute());
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideNestedHystrixThread() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("OuterCommand"))) {

@Override
protected Boolean run() {

assertEquals("OuterCommand", getCurrentThreadExecutingCommand().name());

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("BEFORE expected it to run inside a thread");
}

HystrixCommand<Boolean> command2 = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("InnerCommand"))) {

@Override
protected Boolean run() {
assertEquals("InnerCommand", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("AFTER expected it to run inside a thread");
}

return command2.execute();
}

};

assertTrue(command.execute());

assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixSemaphoreExecute() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreIsolatedCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {
assertEquals("SemaphoreIsolatedCommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

// it should be true for semaphore isolation as well
assertTrue(command.execute());
// and then be null again once done
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixSemaphoreQueue() throws Exception {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreIsolatedCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {
assertEquals("SemaphoreIsolatedCommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

// it should be true for semaphore isolation as well
assertTrue(command.queue().get());
// and then be null again once done
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testThreadNestedInsideHystrixSemaphore() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("OuterSemaphoreCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {

assertEquals("OuterSemaphoreCommand", getCurrentThreadExecutingCommand().name());

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("BEFORE expected it to run inside a semaphore");
}

HystrixCommand<Boolean> command2 = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("InnerCommand"))) {

@Override
protected Boolean run() {
assertEquals("InnerCommand", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("AFTER expected it to run inside a semaphore");
}

return command2.execute();
}

};

assertTrue(command.execute());

assertNull(getCurrentThreadExecutingCommand());
}
}
}
Loading

0 comments on commit 2118664

Please sign in to comment.