diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5fd3d2d5e..43c4dcbe3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Dec 02 15:47:21 PST 2015 +#Thu May 19 16:56:49 PDT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle new file mode 100644 index 000000000..44a2d8e63 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -0,0 +1,24 @@ +repositories { + mavenCentral() + jcenter() + maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } +} + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +dependencies { + compile project(':hystrix-core') + + compile 'io.reactivex:rxjava-reactive-streams:latest.release' + + compile 'com.fasterxml.jackson.core:jackson-core:latest.release' + compile 'com.fasterxml.jackson.core:jackson-databind:latest.release' + compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release' + compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release' + compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release' + compile 'io.reactivesocket:reactivesocket:latest.release' + + testCompile 'junit:junit-dep:4.10' + testCompile 'org.mockito:mockito-all:1.9.5' +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java new file mode 100644 index 000000000..7d82215f3 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java @@ -0,0 +1,19 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.fasterxml.jackson.dataformat.cbor.CBORFactory; +import io.reactivesocket.Payload; +import rx.Observable; +import rx.subjects.BehaviorSubject; + +import java.util.function.Supplier; + +public abstract class BasePayloadSupplier implements Supplier> { + protected final CBORFactory jsonFactory; + + protected final BehaviorSubject subject; + + protected BasePayloadSupplier() { + this.jsonFactory = new CBORFactory(); + this.subject = BehaviorSubject.create(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java new file mode 100644 index 000000000..5882e4c12 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -0,0 +1,74 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollapserMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream; +import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream; +import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream; +import io.reactivesocket.Payload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +import java.util.Arrays; +import java.util.function.Supplier; + +public enum EventStreamEnum implements Supplier> { + + CONFIG_STREAM(1) { + @Override + public Observable get() { + logger.info("streaming config data"); + return HystrixConfigStream.getInstance().get(); + } + }, + REQUEST_EVENT_STREAM(2) { + @Override + public Observable get() { + logger.info("streaming request events"); + return HystrixRequestEventsStream.getInstance().get(); + } + }, + UTILIZATION_EVENT_STREAM(3) { + @Override + public Observable get() { + logger.info("streaming utilization events"); + return HystrixUtilizationStream.getInstance().get(); + } + }, + METRICS_STREAM(4) { + @Override + public Observable get() { + logger.info("streaming metrics"); + return Observable.merge( + HystrixCommandMetricsStream.getInstance().get(), + HystrixThreadPoolMetricsStream.getInstance().get(), + HystrixCollapserMetricsStream.getInstance().get()); + } + } + + ; + + private static final Logger logger = LoggerFactory.getLogger(EventStreamEnum.class); + + private int typeId; + + EventStreamEnum(int typeId) { + this.typeId = typeId; + } + + public static EventStreamEnum findByTypeId(int typeId) { + return Arrays + .asList(EventStreamEnum.values()) + .stream() + .filter(t -> t.typeId == typeId) + .findAny() + .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); + } + + public int getTypeId() { + return typeId; + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java new file mode 100644 index 000000000..35b90e86d --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -0,0 +1,66 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +import io.reactivesocket.Payload; +import io.reactivesocket.RequestHandler; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.RxReactiveStreams; + +/** + * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload} + * data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If + * the id is found it will begin to stream the events to the subscriber. + */ +public class EventStreamRequestHandler extends RequestHandler { + private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class); + + @Override + public Publisher handleRequestResponse(Payload payload) { + return NO_REQUEST_RESPONSE_HANDLER.apply(payload); + } + + @Override + public Publisher handleRequestStream(Payload payload) { + return NO_REQUEST_STREAM_HANDLER.apply(payload); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Observable defer = Observable + .defer(() -> { + try { + int typeId = payload + .getData() + .getInt(0); + + EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); + return eventStreamEnum + .get(); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + return Observable.error(t); + } + }) + .onBackpressureDrop(); + + return RxReactiveStreams + .toPublisher(defer); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return NO_FIRE_AND_FORGET_HANDLER.apply(payload); + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return NO_REQUEST_CHANNEL_HANDLER.apply(inputs); + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return NO_METADATA_PUSH_HANDLER.apply(payload); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java new file mode 100644 index 000000000..807e2d50f --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -0,0 +1,70 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.fasterxml.jackson.core.JsonGenerator; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.functions.Func0; +import rx.schedulers.Schedulers; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public abstract class StreamingSupplier extends BasePayloadSupplier { + + protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class); + + protected StreamingSupplier() { + + Observable + .interval(500, TimeUnit.MILLISECONDS, Schedulers.computation()) + .doOnNext(i -> + getStream() + .filter(this::filter) + .map(this::getPayloadData) + .forEach(b -> { + Payload p = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + subject.onNext(p); + }) + ) + .retry() + .subscribe(); + } + + public boolean filter(T t) { + return true; + } + + @Override + public Observable get() { + return subject; + } + + protected abstract Stream getStream(); + + protected abstract byte[] getPayloadData(T t); + + protected void safelyWriteNumberField(JsonGenerator json, String name, Func0 metricGenerator) throws IOException { + try { + json.writeNumberField(name, metricGenerator.call()); + } catch (NoSuchFieldError error) { + logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); + json.writeNumberField(name, 0L); + } + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java new file mode 100644 index 000000000..d936c865e --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java @@ -0,0 +1,106 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import org.agrona.LangUtil; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixCollapserMetricsStream extends StreamingSupplier { + private static HystrixCollapserMetricsStream INSTANCE = new HystrixCollapserMetricsStream(); + + private HystrixCollapserMetricsStream() { + super(); + } + + public static HystrixCollapserMetricsStream getInstance() { + return INSTANCE; + } + + @Override + protected Stream getStream() { + return HystrixCollapserMetrics.getInstances().stream(); + } + + protected byte[] getPayloadData(final HystrixCollapserMetrics collapserMetrics) { + byte[] retVal = null; + try { + HystrixCollapserKey key = collapserMetrics.getCollapserKey(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + json.writeStartObject(); + + json.writeStringField("type", "HystrixCollapser"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH); + } + }); + safelyWriteNumberField(json, "rollingCountBatches", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED); + } + }); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE); + } + }); + + // batch size percentiles + json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean()); + json.writeObjectFieldStart("batchSize"); + json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25)); + json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50)); + json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75)); + json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90)); + json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95)); + json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99)); + json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5)); + json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100)); + json.writeEndObject(); + + // shard size percentiles (commented-out for now) + //json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean()); + //json.writeObjectFieldStart("shardSize"); + //json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25)); + //json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50)); + //json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75)); + //json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90)); + //json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95)); + //json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99)); + //json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5)); + //json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100)); + //json.writeEndObject(); + + //json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get()); + json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get()); + json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java new file mode 100644 index 000000000..f426452b1 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java @@ -0,0 +1,239 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import org.agrona.LangUtil; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixCommandMetricsStream extends StreamingSupplier { + private static final HystrixCommandMetricsStream INSTANCE = new HystrixCommandMetricsStream(); + + private HystrixCommandMetricsStream() { + super(); + } + + public static HystrixCommandMetricsStream getInstance() { + return INSTANCE; + } + + @Override + protected Stream getStream() { + return HystrixCommandMetrics.getInstances().stream(); + } + + protected byte[] getPayloadData(final HystrixCommandMetrics commandMetrics) { + byte[] retVal = null; + + try + + { + HystrixCommandKey key = commandMetrics.getCommandKey(); + HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixCommand"); + json.writeStringField("name", key.name()); + json.writeStringField("group", commandMetrics.getCommandGroup().name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + // circuit breaker + if (circuitBreaker == null) { + // circuit breaker is disabled and thus never open + json.writeBooleanField("isCircuitBreakerOpen", false); + } else { + json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); + } + HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); + json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); + json.writeNumberField("errorCount", healthCounts.getErrorCount()); + json.writeNumberField("requestCount", healthCounts.getTotalRequests()); + + // rolling counters + safelyWriteNumberField(json, "rollingCountBadRequests", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST); + } + }); + safelyWriteNumberField(json, "rollingCountCollapsedRequests", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.COLLAPSED); + } + }); + safelyWriteNumberField(json, "rollingCountEmit", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.EMIT); + } + }); + safelyWriteNumberField(json, "rollingCountExceptionsThrown", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN); + } + }); + safelyWriteNumberField(json, "rollingCountFailure", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FAILURE); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackEmit", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackFailure", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackMissing", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackRejection", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackSuccess", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS); + } + }); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE); + } + }); + safelyWriteNumberField(json, "rollingCountSemaphoreRejected", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED); + } + }); + safelyWriteNumberField(json, "rollingCountShortCircuited", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED); + } + }); + safelyWriteNumberField(json, "rollingCountSuccess", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SUCCESS); + } + }); + safelyWriteNumberField(json, "rollingCountThreadPoolRejected", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED); + } + }); + safelyWriteNumberField(json, "rollingCountTimeout", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.TIMEOUT); + } + }); + + json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount()); + json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions()); + + // latency percentiles + json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean()); + json.writeObjectFieldStart("latencyExecute"); + json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100)); + json.writeEndObject(); + // + json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean()); + json.writeObjectFieldStart("latencyTotal"); + json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); + json.writeEndObject(); + + // property values for reporting what is actually seen by the command rather than what was set somewhere + HystrixCommandProperties commandProperties = commandMetrics.getProperties(); + + json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get()); + json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); + json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get()); + json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get()); + + json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name()); + json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get()); + json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get()); + json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get()); + json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get()); + + /* + * The following are commented out as these rarely change and are verbose for streaming for something people don't change. + * We could perhaps allow a property or request argument to include these. + */ + + // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get()); + // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get()); + // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get()); + // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get()); + // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get()); + json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception t) { + LangUtil.rethrowUnchecked(t); + } + + return retVal; + } +} + + diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java new file mode 100644 index 000000000..c2407934d --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java @@ -0,0 +1,94 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixThreadPoolMetricsStream extends StreamingSupplier { + private static HystrixThreadPoolMetricsStream INSTANCE = new HystrixThreadPoolMetricsStream(); + + private HystrixThreadPoolMetricsStream() { + super(); + } + + public static HystrixThreadPoolMetricsStream getInstance() { + return INSTANCE; + } + + @Override + public boolean filter(HystrixThreadPoolMetrics threadPoolMetrics) { + return threadPoolMetrics.getCurrentCompletedTaskCount().intValue() > 0; + } + + @Override + public Observable get() { + return super.get(); + } + + @Override + protected byte[] getPayloadData(HystrixThreadPoolMetrics threadPoolMetrics) { + byte[] retVal = null; + + try { + HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + json.writeStartObject(); + + json.writeStringField("type", "HystrixThreadPool"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); + json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); + json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); + json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); + json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); + json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); + json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); + json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); + safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); + } + }); + json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); + safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); + } + }); + + json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + @Override + protected Stream getStream() { + return HystrixThreadPoolMetrics.getInstances().stream(); + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java new file mode 100644 index 000000000..7cf85a3d8 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java @@ -0,0 +1,118 @@ +package com.netflix.hystrix.contrib.reactivesocket.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.ExecutionResult; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; +import com.netflix.hystrix.metric.HystrixRequestEvents; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.schedulers.Schedulers; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public class HystrixRequestEventsStream extends BasePayloadSupplier { + private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); + + private HystrixRequestEventsStream() { + super(); + + com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance() + .observe() + .observeOn(Schedulers.computation()) + .map(this::getPayloadData) + .map(b -> + new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); + } + + public static HystrixRequestEventsStream getInstance() { + return INSTANCE; + } + + @Override + public Observable get() { + return subject; + } + + public byte[] getPayloadData(HystrixRequestEvents requestEvents) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + writeRequestAsJson(json, requestEvents); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private void writeRequestAsJson(JsonGenerator json, HystrixRequestEvents request) throws IOException { + json.writeStartArray(); + + for (Map.Entry> entry: request.getExecutionsMappedToLatencies().entrySet()) { + convertExecutionToJson(json, entry.getKey(), entry.getValue()); + } + + json.writeEndArray(); + } + + private void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List latencies) throws IOException { + json.writeStartObject(); + json.writeStringField("name", executionSignature.getCommandName()); + json.writeArrayFieldStart("events"); + ExecutionResult.EventCounts eventCounts = executionSignature.getEventCounts(); + for (HystrixEventType eventType: HystrixEventType.values()) { + if (!eventType.equals(HystrixEventType.COLLAPSED)) { + if (eventCounts.contains(eventType)) { + int eventCount = eventCounts.getCount(eventType); + if (eventCount > 1) { + json.writeStartObject(); + json.writeStringField("name", eventType.name()); + json.writeNumberField("count", eventCount); + json.writeEndObject(); + } else { + json.writeString(eventType.name()); + } + } + } + } + json.writeEndArray(); + json.writeArrayFieldStart("latencies"); + for (int latency: latencies) { + json.writeNumber(latency); + } + json.writeEndArray(); + if (executionSignature.getCachedCount() > 0) { + json.writeNumberField("cached", executionSignature.getCachedCount()); + } + if (executionSignature.getEventCounts().contains(HystrixEventType.COLLAPSED)) { + json.writeObjectFieldStart("collapsed"); + json.writeStringField("name", executionSignature.getCollapserKey().name()); + json.writeNumberField("count", executionSignature.getCollapserBatchSize()); + json.writeEndObject(); + } + json.writeEndObject(); + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java new file mode 100644 index 000000000..64f780f3d --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java @@ -0,0 +1,165 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +public class HystrixConfigStream extends BasePayloadSupplier { + private static final HystrixConfigStream INSTANCE = new HystrixConfigStream(); + + private HystrixConfigStream() { + super(); + + HystrixConfigurationStream stream = new HystrixConfigurationStream(100); + stream + .observe() + .map(this::getPayloadData) + .map(b -> new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); + } + + public static HystrixConfigStream getInstance() { + return INSTANCE; + } + + @Override + public Observable get() { + return subject; + } + + public byte[] getPayloadData(HystrixConfiguration config) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java new file mode 100644 index 000000000..b5f9257da --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java @@ -0,0 +1,104 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +public class HystrixUtilizationStream extends BasePayloadSupplier { + private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(); + + private HystrixUtilizationStream() { + super(); + + com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream + = new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100); + stream + .observe() + .map(this::getPayloadData) + .map(b -> new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); + } + + public static HystrixUtilizationStream getInstance() { + return INSTANCE; + } + + @Override + public Observable get() { + return subject; + } + + public byte[] getPayloadData(HystrixUtilization utilization) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixUtilization"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandUtilization commandUtilization = entry.getValue(); + writeCommandUtilizationJson(json, key, commandUtilization); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); + writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private static void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); + json.writeEndObject(); + } + + private static void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); + json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); + json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); + json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); + json.writeEndObject(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java new file mode 100644 index 000000000..8c7a55855 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -0,0 +1,182 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.BitUtil; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import rx.schedulers.Schedulers; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class EventStreamRequestHandlerTest { + @Test(timeout = 10_000) + public void testEventStreamRequestN() throws Exception { + Payload payload = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer + .allocate(BitUtil.SIZE_OF_INT) + .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + Schedulers + .io() + .createWorker() + .schedulePeriodically(() -> { + TestCommand testCommand = new TestCommand(); + testCommand.execute(); + }, 0, 1, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch1 = new CountDownLatch(5); + CountDownLatch latch2 = new CountDownLatch(15); + + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + + EventStreamRequestHandler handler = new EventStreamRequestHandler(); + Publisher payloadPublisher = handler.handleSubscription(payload); + + payloadPublisher + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + + latch1.countDown(); + latch2.countDown(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + + latch.await(); + + Subscription subscription = subscriptionAtomicReference.get(); + subscription.request(5); + + latch1.await(); + + long count = latch2.getCount(); + Assert.assertTrue(count < 15); + + subscription.request(100); + + latch2.await(); + + } + + @Test(timeout = 10_000) + public void testEventStreamFireHose() throws Exception { + Payload payload = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer + .allocate(BitUtil.SIZE_OF_INT) + .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + Schedulers + .io() + .createWorker() + .schedulePeriodically(() -> { + TestCommand testCommand = new TestCommand(); + testCommand.execute(); + }, 0, 1, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch1 = new CountDownLatch(25); + + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + + EventStreamRequestHandler handler = new EventStreamRequestHandler(); + Publisher payloadPublisher = handler.handleSubscription(payload); + + payloadPublisher + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + + latch1.countDown(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + + latch.await(); + + Subscription subscription = subscriptionAtomicReference.get(); + subscription.request(Long.MAX_VALUE); + + latch1.await(); + + + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java new file mode 100644 index 000000000..a65d6ff89 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java @@ -0,0 +1,49 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +public class HystrixCollapserMetricsStreamTest { + + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(21); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + latch.countDown(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } + + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java new file mode 100644 index 000000000..7cbe344f5 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java @@ -0,0 +1,51 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixCommandMetricsStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(23); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(latch); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + CountDownLatch latch; + protected TestCommand(CountDownLatch latch) { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + this.latch = latch; + } + + @Override + protected Boolean run() throws Exception { + latch.countDown(); + return true; + } + } + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java new file mode 100644 index 000000000..5212bab00 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java @@ -0,0 +1,50 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixConfigStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + System.out.println("IM A HYSTRIX COMMAND!!!!!"); + return true; + } + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 7405bbb1f..24d313735 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,6 +10,7 @@ include 'hystrix-core', \ 'hystrix-contrib/hystrix-codahale-metrics-publisher', \ 'hystrix-contrib/hystrix-yammer-metrics-publisher', \ 'hystrix-contrib/hystrix-network-auditor-agent', \ +'hystrix-contrib/hystrix-reactivesocket-event-stream', \ 'hystrix-contrib/hystrix-javanica', \ 'hystrix-contrib/hystrix-junit', \ 'hystrix-dashboard' @@ -20,6 +21,7 @@ project(':hystrix-contrib/hystrix-servo-metrics-publisher').name = 'hystrix-serv project(':hystrix-contrib/hystrix-metrics-event-stream').name = 'hystrix-metrics-event-stream' project(':hystrix-contrib/hystrix-rx-netty-metrics-stream').name = 'hystrix-rx-netty-metrics-stream' project(':hystrix-contrib/hystrix-codahale-metrics-publisher').name = 'hystrix-codahale-metrics-publisher' +project(':hystrix-contrib/hystrix-reactivesocket-event-stream').name = 'hystrix-reactivesocket-event-stream' project(':hystrix-contrib/hystrix-yammer-metrics-publisher').name = 'hystrix-yammer-metrics-publisher' project(':hystrix-contrib/hystrix-network-auditor-agent').name = 'hystrix-network-auditor-agent' project(':hystrix-contrib/hystrix-javanica').name = 'hystrix-javanica'