Skip to content

Commit

Permalink
Writer: switch back to SemaphoreSlim (#1949)
Browse files Browse the repository at this point in the history
Since Semaphore slim has been fixed on all the platforms we're building for these days, this tests moving back. Getting some test run comparison data, but all synthetic benchmarks are looking good. See mgravell/Pipelines.Sockets.Unofficial#64 for details

Here's the main contention test metrics from those benchmarks:

|                        Method |    Runtime |         Mean |      Error |     StdDev | Allocated |
|------------------------------ |----------- |-------------:|-----------:|-----------:|----------:|
|            SemaphoreSlim_Sync |   .NET 6.0 |    18.246 ns |  0.2540 ns |  0.2375 ns |         - |
|                MutexSlim_Sync |   .NET 6.0 |    22.292 ns |  0.1948 ns |  0.1627 ns |         - |
|            SemaphoreSlim_Sync | .NET 4.7.2 |    65.291 ns |  0.5218 ns |  0.4357 ns |         - |
|                MutexSlim_Sync | .NET 4.7.2 |    43.145 ns |  0.3944 ns |  0.3689 ns |         - |
|                               |            |              |            |            |           |
|           SemaphoreSlim_Async |   .NET 6.0 |    20.920 ns |  0.2461 ns |  0.2302 ns |         - |
|               MutexSlim_Async |   .NET 6.0 |    42.810 ns |  0.4583 ns |  0.4287 ns |         - |
|           SemaphoreSlim_Async | .NET 4.7.2 |    57.513 ns |  0.5600 ns |  0.5238 ns |         - |
|               MutexSlim_Async | .NET 4.7.2 |    76.444 ns |  0.3811 ns |  0.3379 ns |         - |
|                               |            |              |            |            |           |
|   SemaphoreSlim_Async_HotPath |   .NET 6.0 |    15.182 ns |  0.1708 ns |  0.1598 ns |         - |
|       MutexSlim_Async_HotPath |   .NET 6.0 |    29.913 ns |  0.5884 ns |  0.6776 ns |         - |
|   SemaphoreSlim_Async_HotPath | .NET 4.7.2 |    50.912 ns |  0.8782 ns |  0.8215 ns |         - |
|       MutexSlim_Async_HotPath | .NET 4.7.2 |    55.409 ns |  0.7513 ns |  0.6660 ns |         - |
|                               |            |              |            |            |           |
| SemaphoreSlim_ConcurrentAsync |   .NET 6.0 | 2,084.316 ns |  4.5909 ns |  4.0698 ns |     547 B |
|     MutexSlim_ConcurrentAsync |   .NET 6.0 | 2,120.714 ns | 28.5866 ns | 26.7399 ns |     125 B |
| SemaphoreSlim_ConcurrentAsync | .NET 4.7.2 | 3,812.444 ns | 42.4014 ns | 37.5877 ns |   1,449 B |
|     MutexSlim_ConcurrentAsync | .NET 4.7.2 | 2,883.994 ns | 46.5535 ns | 41.2685 ns |     284 B |

We don't have high contention tests, but sanity checking against our test suite (where we don't expect this to matter much):

Main branch:
![main test speed](https://user-images.githubusercontent.com/454813/149633288-0b1fb4ac-44f8-4151-92e1-610b678610d2.png)

PR branch:
![pr test speed](https://user-images.githubusercontent.com/454813/149633275-0cc20e3a-ba6b-49b6-bfed-563ed8e343d0.png)

We could scope this back to .NET 6.0+ only, but the code's a lot more `#ifdef` and complicated (because `LockTokens` aren't a thing - it's just a bool "did I get it?")...thoughts?
  • Loading branch information
NickCraver authored Jan 19, 2022
1 parent 92fdfa3 commit 11943b3
Showing 1 changed file with 141 additions and 29 deletions.
170 changes: 141 additions & 29 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
#if !NETCOREAPP
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
#endif


namespace StackExchange.Redis
{
Expand Down Expand Up @@ -46,6 +49,12 @@ internal sealed class PhysicalBridge : IDisposable

private volatile int state = (int)State.Disconnected;

#if NETCOREAPP
private readonly SemaphoreSlim _singleWriterMutex = new(1,1);
#else
private readonly MutexSlim _singleWriterMutex;
#endif

internal string PhysicalName => physical?.ToString();
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds)
{
Expand All @@ -54,7 +63,9 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
Multiplexer = serverEndPoint.Multiplexer;
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
TimeoutMilliseconds = timeoutMilliseconds;
#if !NETCOREAPP
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
#endif
}

private readonly int TimeoutMilliseconds;
Expand Down Expand Up @@ -309,7 +320,11 @@ internal readonly struct BridgeStatus
internal BridgeStatus GetStatus() => new()
{
MessagesSinceLastHeartbeat = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)),
#if NETCOREAPP
IsWriterActive = _singleWriterMutex.CurrentCount == 0,
#else
IsWriterActive = !_singleWriterMutex.IsAvailable,
#endif
BacklogMessagesPending = _backlog.Count,
BacklogStatus = _backlogStatus,
Connection = physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default,
Expand Down Expand Up @@ -633,8 +648,6 @@ internal bool TryEnqueue(List<Message> messages, bool isReplica)
return true;
}

private readonly MutexSlim _singleWriterMutex;

private Message _activeMessage;

private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message message)
Expand Down Expand Up @@ -716,11 +729,20 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
return WriteResult.Success; // queued counts as success
}

#if NETCOREAPP
bool gotLock = false;
#else
LockToken token = default;
#endif
try
{
#if NETCOREAPP
gotLock = _singleWriterMutex.Wait(0);
if (!gotLock)
#else
token = _singleWriterMutex.TryWait(WaitOptions.NoDelay);
if (!token.Success)
#endif
{
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
Expand All @@ -729,8 +751,13 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
#if NETCOREAPP
gotLock = _singleWriterMutex.Wait(TimeoutMilliseconds);
if (!gotLock) return TimedOutBeforeWrite(message);
#else
token = _singleWriterMutex.TryWait();
if (!token.Success) return TimedOutBeforeWrite(message);
#endif
}

var result = WriteMessageInsideLock(physical, message);
Expand All @@ -747,7 +774,14 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
finally
{
UnmarkActiveMessage(message);
#if NETCOREAPP
if (gotLock)
{
_singleWriterMutex.Release();
}
#else
token.Dispose();
#endif
}
}

Expand Down Expand Up @@ -863,7 +897,11 @@ internal enum BacklogStatus : byte
private volatile BacklogStatus _backlogStatus;
private async Task ProcessBacklogAsync()
{
#if NETCOREAPP
bool gotLock = false;
#else
LockToken token = default;
#endif
try
{
#if DEBUG
Expand All @@ -878,8 +916,13 @@ private async Task ProcessBacklogAsync()
if (_backlog.IsEmpty) return; // nothing to do

// try and get the lock; if unsuccessful, retry
#if NETCOREAPP
gotLock = await _singleWriterMutex.WaitAsync(TimeoutMilliseconds).ConfigureAwait(false);
if (gotLock) break; // got the lock; now go do something with it
#else
token = await _singleWriterMutex.TryWaitAsync().ConfigureAwait(false);
if (token.Success) break; // got the lock; now go do something with it
#endif

#if DEBUG
failureCount++;
Expand Down Expand Up @@ -962,8 +1005,15 @@ private async Task ProcessBacklogAsync()
_backlogStatus = BacklogStatus.Faulted;
}
finally
{
{
#if NETCOREAPP
if (gotLock)
{
_singleWriterMutex.Release();
}
#else
token.Dispose();
#endif

// Do this in finally block, so that thread aborts can't convince us the backlog processor is running forever
if (Interlocked.CompareExchange(ref _backlogProcessorIsRunning, 0, 1) != 1)
Expand Down Expand Up @@ -994,7 +1044,7 @@ private WriteResult TimedOutBeforeWrite(Message message)
}

/// <summary>
/// This writes a message to the output stream
/// This writes a message to the output stream.
/// </summary>
/// <param name="physical">The physical connection to write to.</param>
/// <param name="message">The message to be written.</param>
Expand Down Expand Up @@ -1025,13 +1075,22 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect

bool releaseLock = true; // fine to default to true, as it doesn't matter until token is a "success"
int lockTaken = 0;
#if NETCOREAPP
bool gotLock = false;
#else
LockToken token = default;
#endif
try
{
// try to acquire it synchronously
// note: timeout is specified in mutex-constructor
#if NETCOREAPP
gotLock = _singleWriterMutex.Wait(0);
if (!gotLock)
#else
token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay);
if (!token.Success)
#endif
{
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
Expand All @@ -1041,11 +1100,19 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
#if NETCOREAPP
var pending = _singleWriterMutex.WaitAsync(TimeoutMilliseconds);
if (pending.Status != TaskStatus.RanToCompletion) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message);

gotLock = pending.Result; // fine since we know we got a result
if (!gotLock) return new ValueTask<WriteResult>(TimedOutBeforeWrite(message));
#else
var pending = _singleWriterMutex.TryWaitAsync(options: WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message);

token = pending.Result; // fine since we know we got a result
if (!token.Success) return new ValueTask<WriteResult>(TimedOutBeforeWrite(message));
#endif
}
lockTaken = Environment.TickCount;

Expand All @@ -1057,7 +1124,11 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
if (!flush.IsCompletedSuccessfully)
{
releaseLock = false; // so we don't release prematurely
#if NETCOREAPP
return CompleteWriteAndReleaseLockAsync(flush, message, lockTaken);
#else
return CompleteWriteAndReleaseLockAsync(token, flush, message, lockTaken);
#endif
}

result = flush.Result; // we know it was completed, this is fine
Expand All @@ -1073,7 +1144,11 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
}
finally
{
#if NETCOREAPP
if (gotLock)
#else
if (token.Success)
#endif
{
UnmarkActiveMessage(message);

Expand All @@ -1082,7 +1157,11 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
#if DEBUG
RecordLockDuration(lockTaken);
#endif
#if NETCOREAPP
_singleWriterMutex.Release();
#else
token.Dispose();
#endif
}
}
}
Expand All @@ -1097,30 +1176,42 @@ private void RecordLockDuration(int lockTaken)
volatile int _maxLockDuration = -1;
#endif

private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(ValueTask<LockToken> pending, PhysicalConnection physical, Message message)
private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(
#if NETCOREAPP
Task<bool> pending,
#else
ValueTask<LockToken> pending,
#endif
PhysicalConnection physical, Message message)
{
#if NETCOREAPP
bool gotLock = false;
#endif

try
{
using (var token = await pending.ForAwait())
{
if (!token.Success) return TimedOutBeforeWrite(message);
#if NETCOREAPP
gotLock = await pending.ForAwait();
if (!gotLock) return TimedOutBeforeWrite(message);
#else
using var token = await pending.ForAwait();
#endif
#if DEBUG
int lockTaken = Environment.TickCount;
int lockTaken = Environment.TickCount;
#endif
var result = WriteMessageInsideLock(physical, message);
var result = WriteMessageInsideLock(physical, message);

if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
}
if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
}

physical.SetIdle();
physical.SetIdle();

#if DEBUG
RecordLockDuration(lockTaken);
RecordLockDuration(lockTaken);
#endif
return result;
}
return result;
}
catch (Exception ex)
{
Expand All @@ -1129,22 +1220,43 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
finally
{
UnmarkActiveMessage(message);
#if NETCOREAPP
if (gotLock)
{
_singleWriterMutex.Release();
}
#endif
}
}

private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message, int lockTaken)
private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(
#if !NETCOREAPP
LockToken lockToken,
#endif
ValueTask<WriteResult> flush,
Message message,
int lockTaken)
{
#if !NETCOREAPP
using (lockToken)
#endif
try
{
var result = await flush.ForAwait();
physical.SetIdle();
return result;
}
catch (Exception ex)
{
return HandleWriteException(message, ex);
}
finally
{
try
{
var result = await flush.ForAwait();
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
#if DEBUG
finally { RecordLockDuration(lockTaken); }
RecordLockDuration(lockTaken);
#endif
#if NETCOREAPP
_singleWriterMutex.Release();
#endif
}
}
Expand Down

0 comments on commit 11943b3

Please sign in to comment.