Skip to content

Commit

Permalink
Introduce OutcomeResilienceStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk committed Jun 20, 2023
1 parent 47aef87 commit 36f47bc
Show file tree
Hide file tree
Showing 47 changed files with 465 additions and 619 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ Job=MediumRun Toolchain=InProcessEmitToolchain IterationCount=15
LaunchCount=2 WarmupCount=10

```
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|------------------------------------- |---------:|----------:|----------:|------:|--------:|-------:|----------:|------------:|
| ExecuteStrategyPipeline_V7 | 2.220 μs | 0.0164 μs | 0.0236 μs | 1.00 | 0.00 | 0.1106 | 2824 B | 1.00 |
| ExecuteStrategyPipeline_V8 | 1.901 μs | 0.0089 μs | 0.0127 μs | 0.86 | 0.01 | - | 40 B | 0.01 |
| ExecuteStrategyPipeline_Telemetry_V8 | 2.947 μs | 0.0077 μs | 0.0115 μs | 1.33 | 0.02 | - | 40 B | 0.01 |
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|-------------------------------------- |---------:|----------:|----------:|------:|--------:|-------:|----------:|------------:|
| ExecuteStrategyPipeline_V7 | 2.488 μs | 0.0316 μs | 0.0463 μs | 1.00 | 0.00 | 0.1106 | 2824 B | 1.00 |
| ExecuteStrategyPipeline_V8 | 1.913 μs | 0.0066 μs | 0.0093 μs | 0.77 | 0.01 | - | 40 B | 0.01 |
| ExecuteStrategyPipeline_Telemetry_V8 | 2.431 μs | 0.0088 μs | 0.0129 μs | 0.98 | 0.02 | - | 40 B | 0.01 |
| ExecuteStrategyPipeline_NonGeneric_V8 | 2.207 μs | 0.0068 μs | 0.0102 μs | 0.89 | 0.01 | - | 40 B | 0.01 |
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
``` ini

BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.22621.1702/22H2/2022Update/SunValley2)
BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.22621.1848/22H2/2022Update/SunValley2)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.203
[Host] : .NET 7.0.5 (7.0.523.17405), X64 RyuJIT AVX2
.NET SDK=7.0.304
[Host] : .NET 7.0.7 (7.0.723.27404), X64 RyuJIT AVX2

Job=MediumRun Toolchain=InProcessEmitToolchain IterationCount=15
LaunchCount=2 WarmupCount=10

```
| Method | Mean | Error | StdDev | Median | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|---------------- |---------:|---------:|---------:|---------:|------:|--------:|-------:|----------:|------------:|
| ExecuteRetry_V7 | 264.2 ns | 10.80 ns | 15.83 ns | 255.3 ns | 1.00 | 0.00 | 0.0658 | 552 B | 1.00 |
| ExecuteRetry_V8 | 244.7 ns | 7.75 ns | 10.61 ns | 244.1 ns | 0.93 | 0.05 | - | - | 0.00 |
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Allocated | Alloc Ratio |
|---------------- |---------:|--------:|--------:|------:|--------:|-------:|----------:|------------:|
| ExecuteRetry_V7 | 210.9 ns | 4.37 ns | 6.27 ns | 1.00 | 0.00 | 0.0658 | 552 B | 1.00 |
| ExecuteRetry_V8 | 224.1 ns | 2.34 ns | 3.43 ns | 1.06 | 0.04 | - | - | 0.00 |
15 changes: 15 additions & 0 deletions bench/Polly.Core.Benchmarks/MultipleStrategiesBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class MultipleStrategiesBenchmark
private object? _strategyV7;
private object? _strategyV8;
private object? _strategyTelemetryV8;
private ResilienceStrategy? _nonGeneric;

[GlobalSetup]
public void Setup()
Expand All @@ -16,6 +17,7 @@ public void Setup()
_strategyV7 = Helper.CreateStrategyPipeline(PollyVersion.V7, false);
_strategyV8 = Helper.CreateStrategyPipeline(PollyVersion.V8, false);
_strategyTelemetryV8 = Helper.CreateStrategyPipeline(PollyVersion.V8, true);
_nonGeneric = Helper.CreateNonGenericStrategyPipeline();
}

[GlobalCleanup]
Expand All @@ -29,4 +31,17 @@ public void Setup()

[Benchmark]
public ValueTask ExecuteStrategyPipeline_Telemetry_V8() => _strategyTelemetryV8!.ExecuteAsync(PollyVersion.V8);

[Benchmark]
public async ValueTask ExecuteStrategyPipeline_NonGeneric_V8()
{
var context = ResilienceContext.Get();

await _nonGeneric!.ExecuteOutcomeAsync(
static (_, _) => new ValueTask<Outcome<string>>(new Outcome<string>("dummy")),
context,
string.Empty).ConfigureAwait(false);

ResilienceContext.Return(context);
}
}
55 changes: 50 additions & 5 deletions bench/Polly.Core.Benchmarks/Utils/Helper.MultipleStrategies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ internal static partial class Helper
PermitLimit = 10
})
.AddTimeout(TimeSpan.FromSeconds(10))
.AddRetry(
predicate => predicate.Handle<InvalidOperationException>().HandleResult(Failure),
RetryBackoffType.Constant,
3,
TimeSpan.FromSeconds(1))
.AddRetry(new()
{
BackoffType = RetryBackoffType.Constant,
RetryCount = 3,
BaseDelay = TimeSpan.FromSeconds(1),
ShouldHandle = args => args switch
{
{ Exception: InvalidOperationException } => PredicateResult.True,
{ Result: var result } when result == Failure => PredicateResult.True,
_ => PredicateResult.False
}
})
.AddTimeout(TimeSpan.FromSeconds(1))
.AddAdvancedCircuitBreaker(new()
{
Expand All @@ -49,4 +56,42 @@ internal static partial class Helper
}),
_ => throw new NotSupportedException()
};

public static ResilienceStrategy CreateNonGenericStrategyPipeline()
{
return new ResilienceStrategyBuilder()
.AddConcurrencyLimiter(new ConcurrencyLimiterOptions
{
QueueLimit = 10,
PermitLimit = 10
})
.AddTimeout(TimeSpan.FromSeconds(10))
.AddRetry(new()
{
BackoffType = RetryBackoffType.Constant,
RetryCount = 3,
BaseDelay = TimeSpan.FromSeconds(1),
ShouldHandle = args => args switch
{
{ Exception: InvalidOperationException } => PredicateResult.True,
{ Result: string result } when result == Failure => PredicateResult.True,
_ => PredicateResult.False
}
})
.AddTimeout(TimeSpan.FromSeconds(1))
.AddAdvancedCircuitBreaker(new()
{
FailureThreshold = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(5),
ShouldHandle = args => args switch
{
{ Exception: InvalidOperationException } => PredicateResult.True,
{ Result: string result } when result == Failure => PredicateResult.True,
_ => PredicateResult.False
}
})
.Build();
}
}
25 changes: 12 additions & 13 deletions src/Polly.Core/CircuitBreaker/CircuitBreakerResilienceStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
namespace Polly.CircuitBreaker;

internal sealed class CircuitBreakerResilienceStrategy : ResilienceStrategy
internal sealed class CircuitBreakerResilienceStrategy<T> : OutcomeResilienceStrategy<T>
{
private readonly PredicateInvoker<CircuitBreakerPredicateArguments> _handler;
private readonly CircuitStateController _controller;
private readonly Func<OutcomeArguments<T, CircuitBreakerPredicateArguments>, ValueTask<bool>> _handler;
private readonly CircuitStateController<T> _controller;

public CircuitBreakerResilienceStrategy(
PredicateInvoker<CircuitBreakerPredicateArguments> handler,
CircuitStateController controller,
Func<OutcomeArguments<T, CircuitBreakerPredicateArguments>, ValueTask<bool>> handler,
CircuitStateController<T> controller,
CircuitBreakerStateProvider? stateProvider,
CircuitBreakerManualControl? manualControl)
CircuitBreakerManualControl? manualControl,
bool isGeneric)
: base(isGeneric)
{
_handler = handler;
_controller = controller;
Expand All @@ -21,20 +23,17 @@ public CircuitBreakerResilienceStrategy(
_controller.Dispose);
}

protected internal override async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
protected override async ValueTask<Outcome<T>> ExecuteCallbackAsync<TState>(Func<ResilienceContext, TState, ValueTask<Outcome<T>>> callback, ResilienceContext context, TState state)
{
if (await _controller.OnActionPreExecuteAsync<TResult>(context).ConfigureAwait(context.ContinueOnCapturedContext) is Outcome<TResult> outcome)
if (await _controller.OnActionPreExecuteAsync(context).ConfigureAwait(context.ContinueOnCapturedContext) is Outcome<T> outcome)
{
return outcome;
}

outcome = await ExecuteCallbackSafeAsync(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);

var args = new OutcomeArguments<TResult, CircuitBreakerPredicateArguments>(context, outcome, new CircuitBreakerPredicateArguments());
if (await _handler.HandleAsync(args).ConfigureAwait(context.ContinueOnCapturedContext))
var args = new OutcomeArguments<T, CircuitBreakerPredicateArguments>(context, outcome, new CircuitBreakerPredicateArguments());
if (await _handler(args).ConfigureAwait(context.ContinueOnCapturedContext))
{
await _controller.OnActionFailureAsync(outcome, context).ConfigureAwait(context.ContinueOnCapturedContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,26 @@ private static TBuilder AddSimpleCircuitBreakerCore<TBuilder, TResult>(this TBui
return builder.AddStrategy(context => CreateStrategy(context, options, new ConsecutiveFailuresCircuitBehavior(options.FailureThreshold)), options);
}

internal static CircuitBreakerResilienceStrategy CreateStrategy<TResult>(ResilienceStrategyBuilderContext context, CircuitBreakerStrategyOptions<TResult> options, CircuitBehavior behavior)
internal static CircuitBreakerResilienceStrategy<TResult> CreateStrategy<TResult>(
ResilienceStrategyBuilderContext context,
CircuitBreakerStrategyOptions<TResult> options,
CircuitBehavior behavior)
{
var controller = new CircuitStateController(
var controller = new CircuitStateController<TResult>(
options.BreakDuration,
context.CreateInvoker(options.OnOpened),
context.CreateInvoker(options.OnClosed),
options.OnOpened,
options.OnClosed,
options.OnHalfOpened,
behavior,
context.TimeProvider,
context.Telemetry);

return new CircuitBreakerResilienceStrategy(
context.CreateInvoker(options.ShouldHandle)!,
return new CircuitBreakerResilienceStrategy<TResult>(
options.ShouldHandle!,
controller,
options.StateProvider,
options.ManualControl);
options.ManualControl,
context.IsGenericBuilder);
}
}

38 changes: 19 additions & 19 deletions src/Polly.Core/CircuitBreaker/Controller/CircuitStateController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ namespace Polly.CircuitBreaker;
/// <summary>
/// Thread-safe controller that holds and manages the circuit breaker state transitions.
/// </summary>
internal sealed class CircuitStateController : IDisposable
internal sealed class CircuitStateController<T> : IDisposable
{
private readonly object _lock = new();
private readonly ScheduledTaskExecutor _executor = new();
private readonly EventInvoker<OnCircuitOpenedArguments>? _onOpened;
private readonly EventInvoker<OnCircuitClosedArguments>? _onClosed;
private readonly Func<OutcomeArguments<T, OnCircuitOpenedArguments>, ValueTask>? _onOpened;
private readonly Func<OutcomeArguments<T, OnCircuitClosedArguments>, ValueTask>? _onClosed;
private readonly Func<OnCircuitHalfOpenedArguments, ValueTask>? _onHalfOpen;
private readonly TimeProvider _timeProvider;
private readonly ResilienceStrategyTelemetry _telemetry;
Expand All @@ -24,8 +24,8 @@ internal sealed class CircuitStateController : IDisposable

public CircuitStateController(
TimeSpan breakDuration,
EventInvoker<OnCircuitOpenedArguments>? onOpened,
EventInvoker<OnCircuitClosedArguments>? onClosed,
Func<OutcomeArguments<T, OnCircuitOpenedArguments>, ValueTask>? onOpened,
Func<OutcomeArguments<T, OnCircuitClosedArguments>, ValueTask>? onClosed,
Func<OnCircuitHalfOpenedArguments, ValueTask>? onHalfOpen,
CircuitBehavior behavior,
TimeProvider timeProvider,
Expand Down Expand Up @@ -90,7 +90,7 @@ public ValueTask IsolateCircuitAsync(ResilienceContext context)
lock (_lock)
{
SetLastHandledOutcome_NeedsLock(new Outcome<VoidResult>(new IsolatedCircuitException()));
OpenCircuitFor_NeedsLock(new Outcome<VoidResult>(VoidResult.Instance), TimeSpan.MaxValue, manual: true, context, out task);
OpenCircuitFor_NeedsLock(new Outcome<T>(default(T)), TimeSpan.MaxValue, manual: true, context, out task);
_circuitState = CircuitState.Isolated;
}

Expand All @@ -107,13 +107,13 @@ public ValueTask CloseCircuitAsync(ResilienceContext context)

lock (_lock)
{
CloseCircuit_NeedsLock(new Outcome<VoidResult>(VoidResult.Instance), manual: true, context, out task);
CloseCircuit_NeedsLock(new Outcome<T>(default(T)), manual: true, context, out task);
}

return ExecuteScheduledTaskAsync(task, context);
}

public async ValueTask<Outcome<TResult>?> OnActionPreExecuteAsync<TResult>(ResilienceContext context)
public async ValueTask<Outcome<T>?> OnActionPreExecuteAsync(ResilienceContext context)
{
EnsureNotDisposed();

Expand Down Expand Up @@ -150,13 +150,13 @@ public ValueTask CloseCircuitAsync(ResilienceContext context)

if (exception is not null)
{
return new Outcome<TResult>(exception);
return new Outcome<T>(exception);
}

return null;
}

public ValueTask OnActionSuccessAsync<TResult>(Outcome<TResult> outcome, ResilienceContext context)
public ValueTask OnActionSuccessAsync(Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();

Expand All @@ -182,7 +182,7 @@ public ValueTask OnActionSuccessAsync<TResult>(Outcome<TResult> outcome, Resilie
return ExecuteScheduledTaskAsync(task, context);
}

public ValueTask OnActionFailureAsync<TResult>(Outcome<TResult> outcome, ResilienceContext context)
public ValueTask OnActionFailureAsync(Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();

Expand Down Expand Up @@ -251,11 +251,11 @@ private void EnsureNotDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(CircuitStateController));
throw new ObjectDisposedException(nameof(CircuitStateController<T>));
}
}

private void CloseCircuit_NeedsLock<TResult>(Outcome<TResult> outcome, bool manual, ResilienceContext context, out Task? scheduledTask)
private void CloseCircuit_NeedsLock(Outcome<T> outcome, bool manual, ResilienceContext context, out Task? scheduledTask)
{
scheduledTask = null;

Expand All @@ -269,12 +269,12 @@ private void CloseCircuit_NeedsLock<TResult>(Outcome<TResult> outcome, bool manu

if (priorState != CircuitState.Closed)
{
var args = new OutcomeArguments<TResult, OnCircuitClosedArguments>(context, outcome, new OnCircuitClosedArguments(manual));
var args = new OutcomeArguments<T, OnCircuitClosedArguments>(context, outcome, new OnCircuitClosedArguments(manual));
_telemetry.Report(CircuitBreakerConstants.OnCircuitClosed, args);

if (_onClosed is not null)
{
_executor.ScheduleTask(() => _onClosed.HandleAsync(args).AsTask(), context, out scheduledTask);
_executor.ScheduleTask(() => _onClosed(args).AsTask(), context, out scheduledTask);
}
}
}
Expand Down Expand Up @@ -310,12 +310,12 @@ private void SetLastHandledOutcome_NeedsLock<TResult>(Outcome<TResult> outcome)

private BrokenCircuitException GetBreakingException_NeedsLock() => _breakingException ?? new BrokenCircuitException();

private void OpenCircuit_NeedsLock<TResult>(Outcome<TResult> outcome, bool manual, ResilienceContext context, out Task? scheduledTask)
private void OpenCircuit_NeedsLock(Outcome<T> outcome, bool manual, ResilienceContext context, out Task? scheduledTask)
{
OpenCircuitFor_NeedsLock(outcome, _breakDuration, manual, context, out scheduledTask);
}

private void OpenCircuitFor_NeedsLock<TResult>(Outcome<TResult> outcome, TimeSpan breakDuration, bool manual, ResilienceContext context, out Task? scheduledTask)
private void OpenCircuitFor_NeedsLock(Outcome<T> outcome, TimeSpan breakDuration, bool manual, ResilienceContext context, out Task? scheduledTask)
{
scheduledTask = null;
var utcNow = _timeProvider.UtcNow;
Expand All @@ -325,12 +325,12 @@ private void OpenCircuitFor_NeedsLock<TResult>(Outcome<TResult> outcome, TimeSpa
var transitionedState = _circuitState;
_circuitState = CircuitState.Open;

var args = new OutcomeArguments<TResult, OnCircuitOpenedArguments>(context, outcome, new OnCircuitOpenedArguments(breakDuration, manual));
var args = new OutcomeArguments<T, OnCircuitOpenedArguments>(context, outcome, new OnCircuitOpenedArguments(breakDuration, manual));
_telemetry.Report(CircuitBreakerConstants.OnCircuitOpened, args);

if (_onOpened is not null)
{
_executor.ScheduleTask(() => _onOpened.HandleAsync(args).AsTask(), context, out scheduledTask);
_executor.ScheduleTask(() => _onOpened(args).AsTask(), context, out scheduledTask);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Polly.Core/CircuitBreaker/Health/HealthMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Polly.CircuitBreaker.Health;

/// <summary>
/// The health metrics for advanced circuit breaker.
/// All operations here are executed from <see cref="CircuitStateController"/> under a lock and are thread safe.
/// All operations here are executed from <see cref="CircuitStateController{T}"/> under a lock and are thread safe.
/// </summary>
internal abstract class HealthMetrics
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Polly.CircuitBreaker;

/// <inheritdoc/>
public class SimpleCircuitBreakerStrategyOptions : SimpleCircuitBreakerStrategyOptions<object>
public class SimpleCircuitBreakerStrategyOptions : SimpleCircuitBreakerStrategyOptions<int>
{
}
8 changes: 1 addition & 7 deletions src/Polly.Core/Fallback/FallbackHandler.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
namespace Polly.Fallback;

internal sealed record class FallbackHandler<T>(
PredicateInvoker<FallbackPredicateArguments> ShouldHandle,
Func<OutcomeArguments<T, FallbackPredicateArguments>, ValueTask<bool>> ShouldHandle,
Func<OutcomeArguments<T, FallbackPredicateArguments>, ValueTask<Outcome<T>>> ActionGenerator,
bool IsGeneric)
{
public bool HandlesFallback<TResult>() => IsGeneric switch
{
true => typeof(TResult) == typeof(T),
false => true
};

public async ValueTask<Outcome<TResult>> GetFallbackOutcomeAsync<TResult>(OutcomeArguments<TResult, FallbackPredicateArguments> args)
{
var copiedArgs = new OutcomeArguments<T, FallbackPredicateArguments>(
Expand Down
Loading

0 comments on commit 36f47bc

Please sign in to comment.