Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous opening of QuicStreams #67859

Merged
merged 25 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public override void Dispose()
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
#if false
// Dispose the connection
// If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle.
Expand All @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode)
await _connection.CloseAsync(errorCode).ConfigureAwait(false);
}

public Http3LoopbackStream OpenUnidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenUnidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync());
}

public Http3LoopbackStream OpenBidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenBidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync());
}

public static int GetRequestId(QuicStream stream)
Expand Down Expand Up @@ -185,10 +185,10 @@ public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()

public async Task EstablishControlStreamAsync()
{
_outboundControlStream = OpenUnidirectionalStream();
_outboundControlStream = await OpenUnidirectionalStreamAsync();
await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream);
await _outboundControlStream.SendSettingsFrameAsync();
}
}

public override async Task<byte[]> ReadRequestBodyAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,49 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
// Allocate an active request
QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;
ValueTask waitTask = default;

try
{
try
{
while (true)
if (_connection != null)
{
ValueTask<QuicStream> openTask;
bool synchronous = false;

// unfortunately, the compiler cannot infer that the task is consumed only once
#pragma warning disable CA2012 // ValueTasks instances should only be consumed once
lock (SyncObj)
{
if (_connection == null)
{
break;
}
openTask = _connection.OpenBidirectionalStreamAsync(cancellationToken);

if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
if (openTask.IsCompleted)
{
quicStream = _connection.OpenBidirectionalStream();
// hot path for synchronous completion: finish while still holding the lock
synchronous = true;
quicStream = openTask.Result;
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
break;
}
rzikm marked this conversation as resolved.
Show resolved Hide resolved

waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken);
}

if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0)
if (!synchronous)
{
// We avoid logging RequestLeftQueue if a stream was available immediately (synchronously)
queueStartingTimestamp = Stopwatch.GetTimestamp();
}
// cold path: waiting until a stream is available
if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0)
{
queueStartingTimestamp = Stopwatch.GetTimestamp();
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await waitTask.ConfigureAwait(false);
quicStream = await openTask.ConfigureAwait(false);
requestStream = new Http3RequestStream(request, this, quicStream);

lock (SyncObj)
{
_activeRequests.Add(quicStream, requestStream);
}
}
#pragma warning restore CA2021
}
}
finally
Expand Down Expand Up @@ -377,7 +386,7 @@ private async Task SendSettingsAsync()
{
try
{
_clientControl = _connection!.OpenUnidirectionalStream();
_clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
10 changes: 5 additions & 5 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { }
public void Dispose() { }
public int GetRemoteAvailableBidirectionalStreamCount() { throw null; }
public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; }
public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; }
public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } }
public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
rzikm marked this conversation as resolved.
Show resolved Hide resolved
}
public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException
{
public QuicConnectionAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicConnectionAbortedException(string message, long errorCode) : base(default(string)) { }
rzikm marked this conversation as resolved.
Show resolved Hide resolved
public long ErrorCode { get { throw null; } }
}
public partial class QuicException : System.Exception
Expand Down Expand Up @@ -71,7 +71,7 @@ public QuicListenerOptions() { }
}
public partial class QuicOperationAbortedException : System.Net.Quic.QuicException
{
public QuicOperationAbortedException(string message) : base (default(string)) { }
public QuicOperationAbortedException(string message) : base(default(string)) { }
}
public partial class QuicOptions
{
Expand Down Expand Up @@ -125,7 +125,7 @@ public override void WriteByte(byte value) { }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
public QuicStreamAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicStreamAbortedException(string message, long errorCode) : base(default(string)) { }
public long ErrorCode { get { throw null; } }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,17 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati
return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken);
}

internal override QuicStreamProvider OpenUnidirectionalStream()
internal async override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Unidirectional.TryIncrement())
while (!streamLimit.Unidirectional.TryIncrement())
{
throw new QuicException("No available unidirectional stream");
await WaitForAvailableUnidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand All @@ -207,17 +207,17 @@ internal override QuicStreamProvider OpenUnidirectionalStream()
return OpenStream(streamId, false);
}

internal override QuicStreamProvider OpenBidirectionalStream()
internal async override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Bidirectional.TryIncrement())
while (!streamLimit.Bidirectional.TryIncrement())
{
throw new QuicException("No available bidirectional stream");
await WaitForAvailableBidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint
SEND_SHUTDOWN_COMPLETE = 6,
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9
}

#if SOCKADDR_HAS_LENGTH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,27 +582,19 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati
return new ValueTask(tcs.Task.WaitAsync(cancellationToken));
}

internal override QuicStreamProvider OpenUnidirectionalStream()
private async ValueTask<QuicStreamProvider> OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (!Connected)
{
throw new InvalidOperationException(SR.net_quic_not_connected);
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
return await MsQuicStream.CreateOutbound(_state, flags, cancellationToken).ConfigureAwait(false);
rzikm marked this conversation as resolved.
Show resolved Hide resolved
}

internal override QuicStreamProvider OpenBidirectionalStream()
{
ThrowIfDisposed();
if (!Connected)
{
throw new InvalidOperationException(SR.net_quic_not_connected);
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE);
}
internal override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken);
internal override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.NONE, cancellationToken);

internal override int GetRemoteAvailableUnidirectionalStreamCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private sealed class State
public MsQuicConnection.State ConnectionState = null!; // set in ctor.
public string TraceId = null!; // set in ctor.

public uint StartStatus = MsQuicStatusCodes.Success;
public uint StartStatus = unchecked((uint)-1);
rzikm marked this conversation as resolved.
Show resolved Hide resolved

public ReadState ReadState;

Expand Down Expand Up @@ -76,6 +76,9 @@ private sealed class State
// Set once writes have been shutdown.
public readonly TaskCompletionSource ShutdownWriteCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

// Set once stream has been started and within peer's advertised stream limits
public readonly TaskCompletionSource StartCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

public ShutdownState ShutdownState;

// The value makes sure that we release the handles only once.
Expand Down Expand Up @@ -133,6 +136,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
catch
{
_state.StateGCHandle.Free();
// don't free the streamHandle, it will be freed by the caller
throw;
}

Expand All @@ -146,6 +150,38 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
}
}

internal static async ValueTask<MsQuicStream> CreateOutbound(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var stream = new MsQuicStream(connectionState, flags);
State state = stream._state;

try
{
Debug.Assert(!Monitor.IsEntered(state));

cancellationToken.ThrowIfCancellationRequested();
using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
{
((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token));
}, state);

// Fire of start of the stream
uint status = MsQuicApi.Api.StreamStartDelegate(state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");

// wait unit start completes.
await state.StartCompletionSource.Task.ConfigureAwait(false);
rzikm marked this conversation as resolved.
Show resolved Hide resolved
}
catch
{
stream.Dispose();
throw;
}

return stream;
}

// outbound.
internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags)
{
Expand Down Expand Up @@ -185,10 +221,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
}

QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
}
catch
{
Expand Down Expand Up @@ -943,6 +975,9 @@ private static unsafe uint NativeCallbackHandler(
// Shutdown for both sending and receiving is completed.
case QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE:
return HandleEventShutdownComplete(state, ref *streamEvent);
// Asynchronous open finished, the stream is now within advertised stream limits.
case QUIC_STREAM_EVENT_TYPE.PEER_ACCEPTED:
return HandleEventPeerAccepted(state);
default:
return MsQuicStatusCodes.Success;
}
Expand Down Expand Up @@ -1108,8 +1143,28 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt)

private static uint HandleEventStartComplete(State state, ref StreamEvent evt)
{
// Store the start status code and check it when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL in StreamStart.
state.StartStatus = evt.Data.StartComplete.Status;
uint status = evt.Data.StartComplete.Status;

// The way we expose Open(Uni|Bi)directionalStreamAsync operations is that the stream
// is also accepted by the peer (i.e. it is within advertised stream limits). However,
// We may receive START_COMPLETE notification before the stream is accepted, so we defer
// setting state.StartStatus until the stream is accepted or we meet failure along the way.

if (status != MsQuicStatusCodes.Success)
{
// Start failed, stream not accepted. Store the start status code and check it
// when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL
// in StreamStart.
// state.StartCompletionSource will be set when handling shutdown event as well.
state.StartStatus = status;
}
else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0)
{
// Start succeeded and we were within stream limits, stream already usable.
state.StartStatus = status;
state.StartCompletionSource.TrySetResult();
}

return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -1222,6 +1277,13 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
return MsQuicStatusCodes.Success;
}

private static uint HandleEventPeerAccepted(State state)
{
state.StartStatus = MsQuicStatusCodes.Success;
state.StartCompletionSource.TrySetResult();
return MsQuicStatusCodes.Success;
}

private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
{
bool shouldComplete = false;
Expand Down Expand Up @@ -1586,6 +1648,12 @@ private static uint HandleEventConnectionClose(State state)
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
}

if (state.StartStatus != MsQuicStatusCodes.Success)
{
state.StartCompletionSource.TrySetException(
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
}

// Dispose was called before complete event.
bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, 2) == 1;
if (releaseHandles)
Expand Down
Loading