Skip to content

Commit

Permalink
[release/7.0] [RateLimiting] Handle Timer jitter (#74971)
Browse files Browse the repository at this point in the history
* [RateLimiting] TryReplenish handles multiple replenish periods at a time

* ignore tick on auto

* fixup

* partial

* allow TimeSpan.Zero

* no special case

* TimeSpan.Zero

* Apply suggestions from code review

Co-authored-by: Stephen Halter <halter73@gmail.com>

Co-authored-by: Brennan Conroy <brecon@microsoft.com>
Co-authored-by: Stephen Halter <halter73@gmail.com>
  • Loading branch information
3 people committed Sep 3, 2022
1 parent d2e3961 commit 8b5185a
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.Window < TimeSpan.Zero)
if (options.Window <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new FixedWindowRateLimiterOptions
Expand Down Expand Up @@ -287,29 +287,22 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks)
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
{
return;
}

_lastReplenishmentTick = nowTicks;

int availableRequestCounters = _requestCount;
int maxPermits = _options.PermitLimit;
int resourcesToAdd;

if (availableRequestCounters < maxPermits)
{
resourcesToAdd = maxPermits - availableRequestCounters;
}
else
if (availableRequestCounters >= _options.PermitLimit)
{
// All counters available, nothing to do
return;
}

_requestCount += resourcesToAdd;
Debug.Assert(_requestCount == _options.PermitLimit);
_requestCount = _options.PermitLimit;

// Process queued requests
while (_queue.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class FixedWindowRateLimiterOptions
{
/// <summary>
/// Specifies the time window that takes in the requests.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="FixedWindowRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="FixedWindowRateLimiter"/>.
/// </summary>
public TimeSpan Window { get; set; } = TimeSpan.Zero;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter

private readonly Timer? _renewTimer;
private readonly SlidingWindowRateLimiterOptions _options;
private readonly TimeSpan _replenishmentPeriod;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();

// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
Expand All @@ -42,7 +43,7 @@ public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter
public override bool IsAutoReplenishing => _options.AutoReplenishment;

/// <inheritdoc />
public override TimeSpan ReplenishmentPeriod => new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow);
public override TimeSpan ReplenishmentPeriod => _replenishmentPeriod;

/// <summary>
/// Initializes the <see cref="SlidingWindowRateLimiter"/>.
Expand All @@ -62,9 +63,9 @@ public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.Window < TimeSpan.Zero)
if (options.Window <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new SlidingWindowRateLimiterOptions
Expand All @@ -78,6 +79,7 @@ public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options)
};

_requestCount = options.PermitLimit;
_replenishmentPeriod = new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow);

// _requestsPerSegment holds the no. of acquired requests in each window segment
_requestsPerSegment = new int[options.SegmentsPerWindow];
Expand Down Expand Up @@ -287,7 +289,7 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks)
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks && !_options.AutoReplenishment)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class SlidingWindowRateLimiterOptions
{
/// <summary>
/// Specifies the minimum period between replenishments.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="SlidingWindowRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="SlidingWindowRateLimiter"/>.
/// </summary>
public TimeSpan Window { get; set; } = TimeSpan.Zero;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace System.Threading.RateLimiting
/// </summary>
public sealed class TokenBucketRateLimiter : ReplenishingRateLimiter
{
private int _tokenCount;
private double _tokenCount;
private int _queueCount;
private long _lastReplenishmentTick;
private long? _idleSince;
Expand All @@ -22,6 +22,7 @@ public sealed class TokenBucketRateLimiter : ReplenishingRateLimiter
private long _failedLeasesCount;
private long _successfulLeasesCount;

private readonly double _fillRate;
private readonly Timer? _renewTimer;
private readonly TokenBucketRateLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
Expand Down Expand Up @@ -60,9 +61,9 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.ReplenishmentPeriod < TimeSpan.Zero)
if (options.ReplenishmentPeriod <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.ReplenishmentPeriod)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.ReplenishmentPeriod)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new TokenBucketRateLimiterOptions
Expand All @@ -76,6 +77,7 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
};

_tokenCount = options.TokenLimit;
_fillRate = (double)options.TokensPerPeriod / options.ReplenishmentPeriod.Ticks;

_idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp();

Expand All @@ -91,7 +93,7 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
ThrowIfDisposed();
return new RateLimiterStatistics()
{
CurrentAvailablePermits = _tokenCount,
CurrentAvailablePermits = (long)_tokenCount,
CurrentQueuedCount = _queueCount,
TotalFailedLeases = Interlocked.Read(ref _failedLeasesCount),
TotalSuccessfulLeases = Interlocked.Read(ref _successfulLeasesCount),
Expand Down Expand Up @@ -210,7 +212,7 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int tokenCount, Ca

private RateLimitLease CreateFailedTokenLease(int tokenCount)
{
int replenishAmount = tokenCount - _tokenCount + _queueCount;
int replenishAmount = tokenCount - (int)_tokenCount + _queueCount;
// can't have 0 replenish periods, that would mean it should be a successful lease
// if TokensPerPeriod is larger than the replenishAmount needed then it would be 0
Debug.Assert(_options.TokensPerPeriod > 0);
Expand Down Expand Up @@ -278,7 +280,7 @@ private static void Replenish(object? state)
limiter!.ReplenishInternal(nowTicks);
}

// Used in tests that test behavior with specific time intervals
// Used in tests to avoid dealing with real time
private void ReplenishInternal(long nowTicks)
{
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
Expand All @@ -289,45 +291,43 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.ReplenishmentPeriod.Ticks)
if (_tokenCount == _options.TokenLimit)
{
return;
}

_lastReplenishmentTick = nowTicks;

int availablePermits = _tokenCount;
TokenBucketRateLimiterOptions options = _options;
int maxPermits = options.TokenLimit;
int resourcesToAdd;
double add;

if (availablePermits < maxPermits)
// Trust the timer to be close enough to when we want to replenish, this avoids issues with Timer jitter where it might be .99 seconds instead of 1, and 1.1 seconds the next time etc.
if (_options.AutoReplenishment)
{
resourcesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
add = _options.TokensPerPeriod;
}
else
{
// All tokens available, nothing to do
return;
add = _fillRate * (nowTicks - _lastReplenishmentTick) * TickFrequency;
}

_tokenCount = Math.Min(_options.TokenLimit, _tokenCount + add);

_lastReplenishmentTick = nowTicks;

// Process queued requests
Deque<RequestRegistration> queue = _queue;

_tokenCount += resourcesToAdd;
Debug.Assert(_tokenCount <= _options.TokenLimit);
while (queue.Count > 0)
{
RequestRegistration nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.PeekHead()
: queue.PeekTail();

if (_tokenCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.DequeueHead()
: queue.DequeueTail();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class TokenBucketRateLimiterOptions
{
/// <summary>
/// Specifies the minimum period between replenishments.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="TokenBucketRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="TokenBucketRateLimiter"/>.
/// </summary>
public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;

Expand Down
Loading

0 comments on commit 8b5185a

Please sign in to comment.