Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #2376 - avoid deadlock scenario when completing dead connections #2378

Merged
merged 4 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Current package versions:
- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script paramters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
- Fix [#2362](https://github.com/StackExchange/StackExchange.Redis/issues/2362): Set `RedisConnectionException.FailureType` to `AuthenticationFailure` on all authentication scenarios for better handling ([#2367 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2367))
- Fix [#2368](https://github.com/StackExchange/StackExchange.Redis/issues/2368): Support `RedisValue.Length()` for all storage types ([#2370 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2370))
- Fix [#2376](https://github.com/StackExchange/StackExchange.Redis/issues/2376): Avoid a (rare) deadlock scenario ([#2378 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2378))

## 2.6.90

Expand Down
12 changes: 9 additions & 3 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
{
var source = SimpleResultBox<T>.Get();

bool timeout = false;
lock (source)
{
#pragma warning disable CS0618 // Type or member is obsolete
Expand All @@ -1976,11 +1977,16 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
else
{
Trace("Timeout performing " + message);
Interlocked.Increment(ref syncTimeouts);
throw ExceptionFactory.Timeout(this, null, message, server);
// Very important not to return "source" to the pool here
timeout = true;
}
}

if (timeout) // note we throw *outside* of the main lock to avoid deadlock scenarios (#2376)
{
Interlocked.Increment(ref syncTimeouts);
// Very important not to return "source" to the pool here
throw ExceptionFactory.Timeout(this, null, message, server);
}
// Snapshot these so that we can recycle the box
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
if (ex != null) throw ex;
Expand Down
26 changes: 25 additions & 1 deletion src/StackExchange.Redis/ExtensionMethods.Internal.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;

namespace StackExchange.Redis
{
Expand All @@ -9,5 +10,28 @@ internal static bool IsNullOrEmpty([NotNullWhen(false)] this string? s) =>

internal static bool IsNullOrWhiteSpace([NotNullWhen(false)] this string? s) =>
string.IsNullOrWhiteSpace(s);

#if !NETCOREAPP3_1_OR_GREATER
internal static bool TryDequeue<T>(this Queue<T> queue, [NotNullWhen(true)] out T? result)
{
if (queue.Count == 0)
{
result = default;
return false;
}
result = queue.Dequeue()!;
return true;
}
internal static bool TryPeek<T>(this Queue<T> queue, [NotNullWhen(true)] out T? result)
{
if (queue.Count == 0)
{
result = default;
return false;
}
result = queue.Peek()!;
return true;
}
#endif
}
}
10 changes: 10 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1566,5 +1566,15 @@ protected override void WriteImpl(PhysicalConnection physical)
}
public override int ArgCount => 1;
}

// this is a placeholder message for use when (for example) unable to queue the
// connection queue due to a lock timeout
internal sealed class UnknownMessage : Message
{
public static UnknownMessage Instance { get; } = new();
private UnknownMessage() : base(0, CommandFlags.None, RedisCommand.UNKNOWN) { }
public override int ArgCount => 0;
protected override void WriteImpl(PhysicalConnection physical) => throw new InvalidOperationException("This message cannot be written");
}
}
}
73 changes: 44 additions & 29 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
Expand All @@ -16,6 +17,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static StackExchange.Redis.Message;

namespace StackExchange.Redis
{
Expand Down Expand Up @@ -396,9 +398,8 @@ public void RecordConnectionFailed(
lock (_writtenAwaitingResponse)
{
// find oldest message awaiting a response
if (_writtenAwaitingResponse.Count != 0)
if (_writtenAwaitingResponse.TryPeek(out var next))
{
var next = _writtenAwaitingResponse.Peek();
unansweredWriteTime = next.GetWriteTime();
}
}
Expand Down Expand Up @@ -478,34 +479,42 @@ void add(string lk, string sk, string? v)
bridge?.OnConnectionFailed(this, failureType, outerException);
}
}
// cleanup
// clean up (note: avoid holding the lock when we complete things, even if this means taking
// the lock multiple times; this is fine here - we shouldn't be fighting anyone, and we're already toast)
lock (_writtenAwaitingResponse)
{
bridge?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count);
while (_writtenAwaitingResponse.Count != 0)
{
var next = _writtenAwaitingResponse.Dequeue();
}

if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{
// fine, death of a socket is close enough
next.Complete();
}
else
while (TryDequeueLocked(_writtenAwaitingResponse, out var next))
{
if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{
// fine, death of a socket is close enough
next.Complete();
}
else
{
var ex = innerException is RedisException ? innerException : outerException;
if (bridge != null)
{
var ex = innerException is RedisException ? innerException : outerException;
if (bridge != null)
{
bridge.Trace("Failing: " + next);
bridge.Multiplexer?.OnMessageFaulted(next, ex, origin);
}
next.SetExceptionAndComplete(ex!, bridge);
bridge.Trace("Failing: " + next);
bridge.Multiplexer?.OnMessageFaulted(next, ex, origin);
}
next.SetExceptionAndComplete(ex!, bridge);
}
}

// burn the socket
Shutdown();

static bool TryDequeueLocked(Queue<Message> queue, [NotNullWhen(true)] out Message? message)
{
lock (queue)
{
return queue.TryDequeue(out message);
}
}
}

internal bool IsIdle() => _writeStatus == WriteStatus.Idle;
Expand Down Expand Up @@ -1580,18 +1589,10 @@ private void MatchResult(in RawResult result)
_readStatus = ReadStatus.DequeueResult;
lock (_writtenAwaitingResponse)
{
#if NET5_0_OR_GREATER
if (!_writtenAwaitingResponse.TryDequeue(out msg))
{
throw new InvalidOperationException("Received response with no message waiting: " + result.ToString());
}
#else
if (_writtenAwaitingResponse.Count == 0)
{
throw new InvalidOperationException("Received response with no message waiting: " + result.ToString());
}
msg = _writtenAwaitingResponse.Dequeue();
#endif
}
_activeMessage = msg;

Expand Down Expand Up @@ -1632,9 +1633,23 @@ static bool TryGetPubSubPayload(in RawResult value, out RedisValue parsed, bool
internal void GetHeadMessages(out Message? now, out Message? next)
{
now = _activeMessage;
lock(_writtenAwaitingResponse)
bool haveLock = false;
try
{
// careful locking here; a: don't try too hard (this is error info only), b: avoid deadlock (see #2376)
Monitor.TryEnter(_writtenAwaitingResponse, 10, ref haveLock);
if (haveLock)
{
_writtenAwaitingResponse.TryPeek(out next);
}
else
{
next = UnknownMessage.Instance;
}
}
finally
{
next = _writtenAwaitingResponse.Count == 0 ? null : _writtenAwaitingResponse.Peek();
if (haveLock) Monitor.Exit(_writtenAwaitingResponse);
}
}

Expand Down