diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 65edb99b4..0e25bd189 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -7,10 +7,13 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Pipelines.Sockets.Unofficial.Threading; -using static Pipelines.Sockets.Unofficial.Threading.MutexSlim; using static StackExchange.Redis.ConnectionMultiplexer; using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState; +#if !NETCOREAPP +using Pipelines.Sockets.Unofficial.Threading; +using static Pipelines.Sockets.Unofficial.Threading.MutexSlim; +#endif + namespace StackExchange.Redis { @@ -46,6 +49,12 @@ internal sealed class PhysicalBridge : IDisposable private volatile int state = (int)State.Disconnected; +#if NETCOREAPP + private readonly SemaphoreSlim _singleWriterMutex = new(1,1); +#else + private readonly MutexSlim _singleWriterMutex; +#endif + internal string PhysicalName => physical?.ToString(); public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds) { @@ -54,7 +63,9 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti Multiplexer = serverEndPoint.Multiplexer; Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString(); TimeoutMilliseconds = timeoutMilliseconds; +#if !NETCOREAPP _singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds); +#endif } private readonly int TimeoutMilliseconds; @@ -309,7 +320,11 @@ internal readonly struct BridgeStatus internal BridgeStatus GetStatus() => new() { MessagesSinceLastHeartbeat = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)), +#if NETCOREAPP + IsWriterActive = _singleWriterMutex.CurrentCount == 0, +#else IsWriterActive = !_singleWriterMutex.IsAvailable, +#endif BacklogMessagesPending = _backlog.Count, BacklogStatus = _backlogStatus, Connection = physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default, @@ -633,8 +648,6 @@ internal bool TryEnqueue(List messages, bool isReplica) return true; } - private readonly MutexSlim _singleWriterMutex; - private Message _activeMessage; private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message message) @@ -716,11 +729,20 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical return WriteResult.Success; // queued counts as success } +#if NETCOREAPP + bool gotLock = false; +#else LockToken token = default; +#endif try { +#if NETCOREAPP + gotLock = _singleWriterMutex.Wait(0); + if (!gotLock) +#else token = _singleWriterMutex.TryWait(WaitOptions.NoDelay); if (!token.Success) +#endif { // we can't get it *instantaneously*; is there // perhaps a backlog and active backlog processor? @@ -729,8 +751,13 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical // no backlog... try to wait with the timeout; // if we *still* can't get it: that counts as // an actual timeout +#if NETCOREAPP + gotLock = _singleWriterMutex.Wait(TimeoutMilliseconds); + if (!gotLock) return TimedOutBeforeWrite(message); +#else token = _singleWriterMutex.TryWait(); if (!token.Success) return TimedOutBeforeWrite(message); +#endif } var result = WriteMessageInsideLock(physical, message); @@ -747,7 +774,14 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical finally { UnmarkActiveMessage(message); +#if NETCOREAPP + if (gotLock) + { + _singleWriterMutex.Release(); + } +#else token.Dispose(); +#endif } } @@ -863,7 +897,11 @@ internal enum BacklogStatus : byte private volatile BacklogStatus _backlogStatus; private async Task ProcessBacklogAsync() { +#if NETCOREAPP + bool gotLock = false; +#else LockToken token = default; +#endif try { #if DEBUG @@ -878,8 +916,13 @@ private async Task ProcessBacklogAsync() if (_backlog.IsEmpty) return; // nothing to do // try and get the lock; if unsuccessful, retry +#if NETCOREAPP + gotLock = await _singleWriterMutex.WaitAsync(TimeoutMilliseconds).ConfigureAwait(false); + if (gotLock) break; // got the lock; now go do something with it +#else token = await _singleWriterMutex.TryWaitAsync().ConfigureAwait(false); if (token.Success) break; // got the lock; now go do something with it +#endif #if DEBUG failureCount++; @@ -962,8 +1005,15 @@ private async Task ProcessBacklogAsync() _backlogStatus = BacklogStatus.Faulted; } finally - { + { +#if NETCOREAPP + if (gotLock) + { + _singleWriterMutex.Release(); + } +#else token.Dispose(); +#endif // Do this in finally block, so that thread aborts can't convince us the backlog processor is running forever if (Interlocked.CompareExchange(ref _backlogProcessorIsRunning, 0, 1) != 1) @@ -994,7 +1044,7 @@ private WriteResult TimedOutBeforeWrite(Message message) } /// - /// This writes a message to the output stream + /// This writes a message to the output stream. /// /// The physical connection to write to. /// The message to be written. @@ -1025,13 +1075,22 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect bool releaseLock = true; // fine to default to true, as it doesn't matter until token is a "success" int lockTaken = 0; +#if NETCOREAPP + bool gotLock = false; +#else LockToken token = default; +#endif try { // try to acquire it synchronously // note: timeout is specified in mutex-constructor +#if NETCOREAPP + gotLock = _singleWriterMutex.Wait(0); + if (!gotLock) +#else token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay); if (!token.Success) +#endif { // we can't get it *instantaneously*; is there // perhaps a backlog and active backlog processor? @@ -1041,11 +1100,19 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect // no backlog... try to wait with the timeout; // if we *still* can't get it: that counts as // an actual timeout +#if NETCOREAPP + var pending = _singleWriterMutex.WaitAsync(TimeoutMilliseconds); + if (pending.Status != TaskStatus.RanToCompletion) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message); + + gotLock = pending.Result; // fine since we know we got a result + if (!gotLock) return new ValueTask(TimedOutBeforeWrite(message)); +#else var pending = _singleWriterMutex.TryWaitAsync(options: WaitOptions.DisableAsyncContext); if (!pending.IsCompletedSuccessfully) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message); token = pending.Result; // fine since we know we got a result if (!token.Success) return new ValueTask(TimedOutBeforeWrite(message)); +#endif } lockTaken = Environment.TickCount; @@ -1057,7 +1124,11 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect if (!flush.IsCompletedSuccessfully) { releaseLock = false; // so we don't release prematurely +#if NETCOREAPP + return CompleteWriteAndReleaseLockAsync(flush, message, lockTaken); +#else return CompleteWriteAndReleaseLockAsync(token, flush, message, lockTaken); +#endif } result = flush.Result; // we know it was completed, this is fine @@ -1073,7 +1144,11 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect } finally { +#if NETCOREAPP + if (gotLock) +#else if (token.Success) +#endif { UnmarkActiveMessage(message); @@ -1082,7 +1157,11 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect #if DEBUG RecordLockDuration(lockTaken); #endif +#if NETCOREAPP + _singleWriterMutex.Release(); +#else token.Dispose(); +#endif } } } @@ -1097,30 +1176,42 @@ private void RecordLockDuration(int lockTaken) volatile int _maxLockDuration = -1; #endif - private async ValueTask WriteMessageTakingWriteLockAsync_Awaited(ValueTask pending, PhysicalConnection physical, Message message) + private async ValueTask WriteMessageTakingWriteLockAsync_Awaited( +#if NETCOREAPP + Task pending, +#else + ValueTask pending, +#endif + PhysicalConnection physical, Message message) { +#if NETCOREAPP + bool gotLock = false; +#endif + try { - using (var token = await pending.ForAwait()) - { - if (!token.Success) return TimedOutBeforeWrite(message); +#if NETCOREAPP + gotLock = await pending.ForAwait(); + if (!gotLock) return TimedOutBeforeWrite(message); +#else + using var token = await pending.ForAwait(); +#endif #if DEBUG - int lockTaken = Environment.TickCount; + int lockTaken = Environment.TickCount; #endif - var result = WriteMessageInsideLock(physical, message); + var result = WriteMessageInsideLock(physical, message); - if (result == WriteResult.Success) - { - result = await physical.FlushAsync(false).ForAwait(); - } + if (result == WriteResult.Success) + { + result = await physical.FlushAsync(false).ForAwait(); + } - physical.SetIdle(); + physical.SetIdle(); #if DEBUG - RecordLockDuration(lockTaken); + RecordLockDuration(lockTaken); #endif - return result; - } + return result; } catch (Exception ex) { @@ -1129,22 +1220,43 @@ private async ValueTask WriteMessageTakingWriteLockAsync_Awaited(Va finally { UnmarkActiveMessage(message); +#if NETCOREAPP + if (gotLock) + { + _singleWriterMutex.Release(); + } +#endif } } - private async ValueTask CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask flush, Message message, int lockTaken) + private async ValueTask CompleteWriteAndReleaseLockAsync( +#if !NETCOREAPP + LockToken lockToken, +#endif + ValueTask flush, + Message message, + int lockTaken) { +#if !NETCOREAPP using (lockToken) +#endif + try + { + var result = await flush.ForAwait(); + physical.SetIdle(); + return result; + } + catch (Exception ex) + { + return HandleWriteException(message, ex); + } + finally { - try - { - var result = await flush.ForAwait(); - physical.SetIdle(); - return result; - } - catch (Exception ex) { return HandleWriteException(message, ex); } #if DEBUG - finally { RecordLockDuration(lockTaken); } + RecordLockDuration(lockTaken); +#endif +#if NETCOREAPP + _singleWriterMutex.Release(); #endif } }