Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New signature for HystrixObservableCollapser #261

Merged
merged 2 commits into from
May 6, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

Expand Down Expand Up @@ -142,8 +143,17 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType response) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(response, requests);
return Observable.empty();
}

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@
*/
package com.netflix.hystrix;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.*;
import java.util.concurrent.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;

import rx.*;
import rx.Observable;
import rx.Scheduler;
import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.HystrixCollapserBridge;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
import com.netflix.hystrix.collapser.RequestCollapserFactory;
import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
Expand All @@ -52,14 +46,16 @@
* It must be stateless or else it will be non-deterministic because most instances are discarded while some are retained and become the
* "collapsers" for all the ones that are discarded.
*
* @param <K>
* The key used to match BatchReturnType and RequestArgumentType
* @param <BatchReturnType>
* The type returned from the {@link HystrixCommand} that will be invoked on batch executions.
* @param <ResponseType>
* The type returned from this command.
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {

static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);

Expand Down Expand Up @@ -120,7 +116,7 @@ protected HystrixObservableCollapser(Setter setter) {
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;

/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
Expand All @@ -143,8 +139,40 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
Func1<RequestArgumentType, K> requestKeySelector = self.getRequestArgumentKeySelector();
final Func1<BatchReturnType, K> batchResponseKeySelector = self.getBatchReturnTypeKeySelector();
final Func1<BatchReturnType, ResponseType> mapBatchTypeToResponseType = self.getBatchReturnTypeToResponseTypeMapper();

// index the requests by key
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
}

// observe the responses and join with the requests by key
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType r) {
K responseKey = batchResponseKeySelector.call(r);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
// now remove from map so we know what wasn't set at end
requestsByKey.remove(responseKey);
return Observable.empty();
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
onMissingResponse(cr);
}
}

});
}

@Override
Expand Down Expand Up @@ -211,7 +239,8 @@ public Scope getScope() {
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest}
* objects
*/
protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

Expand All @@ -235,50 +264,41 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
}

/**
* Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
* the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
* Function that returns the key used for matching returned objects against request argument types.
* <p>
* IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
* <p>
* <ul>
* <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
* otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
* <ul>
* <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
* <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
* </ul>
* </ul>
* <p>
* Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
* <p>
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
* The key returned from this function should match up with the key returned from {@link #getRequestArgumentKeySelector()};
*
* For example if the types were {@code <List<String>, String, String>}:
* @return key selector function
*/
protected abstract Func1<BatchReturnType, K> getBatchReturnTypeKeySelector();

/**
* Function that returns the key used for matching request arguments against returned objects.
* <p>
* The key returned from this function should match up with the key returned from {@link #getBatchReturnTypeKeySelector()};
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<String, String>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
* @return key selector function
*/
protected abstract Func1<RequestArgumentType, K> getRequestArgumentKeySelector();

/**
* Invoked if a {@link CollapsedRequest} in the batch does not have a response set on it.
* <p>
* This allows setting an exception (via {@link CollapsedRequest#setException(Exception)}) or a fallback response (via {@link CollapsedRequest#setResponse(Object)}).
*
* @param batchResponse
* The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
* <p>
* @param CollapsedRequest
* that needs a response or exception set on it.
*/
protected abstract void onMissingResponse(CollapsedRequest<ResponseType, RequestArgumentType> r);

/**
* Function for mapping from BatchReturnType to ResponseType.
* <p>
* Often these two types are exactly the same so it's just a pass-thru.
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* <p>
* The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
* @return function for mapping from BatchReturnType to ResponseType
*/
protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
protected abstract Func1<BatchReturnType, ResponseType> getBatchReturnTypeToResponseTypeMapper();

/**
* Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface HystrixCollapserBridge<BatchReturnType, ResponseType, RequestAr

public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public HystrixCollapserKey getCollapserKey();

Expand Down
Loading