Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous opening of QuicStreams #67859

Merged
merged 25 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public override void Dispose()
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
#if false
// Dispose the connection
// If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle.
Expand All @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode)
await _connection.CloseAsync(errorCode).ConfigureAwait(false);
}

public Http3LoopbackStream OpenUnidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenUnidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync());
}

public Http3LoopbackStream OpenBidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenBidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync());
}

public static int GetRequestId(QuicStream stream)
Expand Down Expand Up @@ -185,10 +185,10 @@ public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()

public async Task EstablishControlStreamAsync()
{
_outboundControlStream = OpenUnidirectionalStream();
_outboundControlStream = await OpenUnidirectionalStreamAsync();
await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream);
await _outboundControlStream.SendSettingsFrameAsync();
}
}

public override async Task<byte[]> ReadRequestBodyAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,26 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
// Allocate an active request
QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;
ValueTask waitTask = default;

try
{
try
{
while (true)
if (_connection != null)
{
lock (SyncObj)
if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0 && _connection.GetRemoteAvailableBidirectionalStreamCount() == 0)
rzikm marked this conversation as resolved.
Show resolved Hide resolved
{
if (_connection == null)
{
break;
}

if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
{
quicStream = _connection.OpenBidirectionalStream();
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
break;
}

waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken);
// the call below will almost certainly block, measure waiting time for telemetry purposes
queueStartingTimestamp = Stopwatch.GetTimestamp();
}

if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0)
quicStream = await _connection.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false);
rzikm marked this conversation as resolved.
Show resolved Hide resolved
requestStream = new Http3RequestStream(request, this, quicStream);

lock (SyncObj)
{
// We avoid logging RequestLeftQueue if a stream was available immediately (synchronously)
queueStartingTimestamp = Stopwatch.GetTimestamp();
_activeRequests.Add(quicStream, requestStream);
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await waitTask.ConfigureAwait(false);
}
}
finally
Expand Down Expand Up @@ -377,7 +363,7 @@ private async Task SendSettingsAsync()
{
try
{
_clientControl = _connection!.OpenUnidirectionalStream();
_clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
12 changes: 5 additions & 7 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { }
public void Dispose() { }
public int GetRemoteAvailableBidirectionalStreamCount() { throw null; }
public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; }
public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; }
public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } }
public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException
{
public QuicConnectionAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicConnectionAbortedException(string message, long errorCode) : base(default(string)) { }
rzikm marked this conversation as resolved.
Show resolved Hide resolved
public long ErrorCode { get { throw null; } }
}
public partial class QuicException : System.Exception
Expand Down Expand Up @@ -71,7 +69,7 @@ public QuicListenerOptions() { }
}
public partial class QuicOperationAbortedException : System.Net.Quic.QuicException
{
public QuicOperationAbortedException(string message) : base (default(string)) { }
public QuicOperationAbortedException(string message) : base(default(string)) { }
}
public partial class QuicOptions
{
Expand Down Expand Up @@ -125,7 +123,7 @@ public override void WriteByte(byte value) { }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
public QuicStreamAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicStreamAbortedException(string message, long errorCode) : base(default(string)) { }
public long ErrorCode { get { throw null; } }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,39 +162,17 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
return ValueTask.CompletedTask;
}

internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default)
internal async override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

return streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken);
}

internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken);
}

internal override QuicStreamProvider OpenUnidirectionalStream()
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Unidirectional.TryIncrement())
while (!streamLimit.Unidirectional.TryIncrement())
{
throw new QuicException("No available unidirectional stream");
await streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand All @@ -207,17 +185,17 @@ internal override QuicStreamProvider OpenUnidirectionalStream()
return OpenStream(streamId, false);
}

internal override QuicStreamProvider OpenBidirectionalStream()
internal async override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Bidirectional.TryIncrement())
while (!streamLimit.Bidirectional.TryIncrement())
{
throw new QuicException("No available bidirectional stream");
await streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint
SEND_SHUTDOWN_COMPLETE = 6,
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9
}

#if SOCKADDR_HAS_LENGTH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ internal sealed class State
// TODO: only allocate these when there is an outstanding shutdown.
public readonly TaskCompletionSource<uint> ShutdownTcs = new TaskCompletionSource<uint>(TaskCreationOptions.RunContinuationsAsynchronously);

// Note that there's no such thing as resetable TCS, so we cannot reuse the same instance after we've set the result.
// We also cannot use solutions like ManualResetValueTaskSourceCore, since we can have multiple waiters on the same TCS.
// As a result, we allocate a new TCS when needed, which is when someone explicitely asks for them in WaitForAvailableStreamsAsync.
public TaskCompletionSource? NewUnidirectionalStreamsAvailable;
public TaskCompletionSource? NewBidirectionalStreamsAvailable;

public bool Connected;
public long AbortErrorCode = -1;
public int StreamCount;
Expand Down Expand Up @@ -320,26 +314,6 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent
// Stop accepting new streams.
state.AcceptQueue.Writer.TryComplete();

// Stop notifying about available streams.
TaskCompletionSource? unidirectionalTcs = null;
TaskCompletionSource? bidirectionalTcs = null;
lock (state)
{
unidirectionalTcs = state.NewUnidirectionalStreamsAvailable;
bidirectionalTcs = state.NewBidirectionalStreamsAvailable;
state.NewUnidirectionalStreamsAvailable = null;
state.NewBidirectionalStreamsAvailable = null;
}

if (unidirectionalTcs is not null)
{
unidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException()));
}
if (bidirectionalTcs is not null)
{
bidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException()));
}

return MsQuicStatusCodes.Success;
}

Expand All @@ -358,32 +332,6 @@ private static uint HandleEventNewStream(State state, ref ConnectionEvent connec

private static uint HandleEventStreamsAvailable(State state, ref ConnectionEvent connectionEvent)
{
TaskCompletionSource? unidirectionalTcs = null;
TaskCompletionSource? bidirectionalTcs = null;
lock (state)
{
if (connectionEvent.Data.StreamsAvailable.UniDirectionalCount > 0)
{
unidirectionalTcs = state.NewUnidirectionalStreamsAvailable;
state.NewUnidirectionalStreamsAvailable = null;
}

if (connectionEvent.Data.StreamsAvailable.BiDirectionalCount > 0)
{
bidirectionalTcs = state.NewBidirectionalStreamsAvailable;
state.NewBidirectionalStreamsAvailable = null;
}
}

if (unidirectionalTcs is not null)
{
unidirectionalTcs.SetResult();
}
if (bidirectionalTcs is not null)
{
bidirectionalTcs.SetResult();
}

return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -517,93 +465,33 @@ internal override async ValueTask<QuicStreamProvider> AcceptStreamAsync(Cancella
return stream;
}

internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default)
{
TaskCompletionSource? tcs = _state.NewUnidirectionalStreamsAvailable;
if (tcs is null)
{
// We need to avoid calling MsQuic under lock.
// This is not atomic but it won't be anyway as counts can change between when task is completed
// and before somebody may try to allocate new stream.
int count = GetRemoteAvailableUnidirectionalStreamCount();
lock (_state)
{
if (_state.NewUnidirectionalStreamsAvailable is null)
{
if (_state.ShutdownTcs.Task.IsCompleted)
{
throw new QuicOperationAbortedException();
}

if (count > 0)
{
return ValueTask.CompletedTask;
}

_state.NewUnidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}

tcs = _state.NewUnidirectionalStreamsAvailable;
}
}

return new ValueTask(tcs.Task.WaitAsync(cancellationToken));
}

internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default)
{
TaskCompletionSource? tcs = _state.NewBidirectionalStreamsAvailable;
if (tcs is null)
{
// We need to avoid calling MsQuic under lock.
// This is not atomic but it won't be anyway as counts can change between when task is completed
// and before somebody may try to allocate new stream.
int count = GetRemoteAvailableBidirectionalStreamCount();
lock (_state)
{
if (_state.NewBidirectionalStreamsAvailable is null)
{
if (_state.ShutdownTcs.Task.IsCompleted)
{
throw new QuicOperationAbortedException();
}

if (count > 0)
{
return ValueTask.CompletedTask;
}

_state.NewBidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}
tcs = _state.NewBidirectionalStreamsAvailable;
}
}

return new ValueTask(tcs.Task.WaitAsync(cancellationToken));
}

internal override QuicStreamProvider OpenUnidirectionalStream()
private async ValueTask<QuicStreamProvider> OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (!Connected)
{
throw new InvalidOperationException(SR.net_quic_not_connected);
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
}
cancellationToken.ThrowIfCancellationRequested();
var stream = new MsQuicStream(_state, flags);

internal override QuicStreamProvider OpenBidirectionalStream()
{
ThrowIfDisposed();
if (!Connected)
try
{
throw new InvalidOperationException(SR.net_quic_not_connected);
await stream.StartAsync(cancellationToken).ConfigureAwait(false);
}
catch
{
stream.Dispose();
throw;
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE);
return stream;
}

internal override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken);
internal override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.NONE, cancellationToken);

internal override int GetRemoteAvailableUnidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
Expand Down
Loading