Skip to content

Commit

Permalink
[QUIC] API QuicStream (dotnet#71969)
Browse files Browse the repository at this point in the history
* Quic stream API surface

* Fixed test compilation

* Fixed http test compilation

* HttpLoopbackConnection Dispose -> DisposeAsync

* QuicStream implementation

* Fixed some tests

* Fixed all QUIC and HTTP tests

* Fixed exception type for stream closed by connection close

* Feedback

* Fixed WebSocket.Client test build

* Feedback, test fixes

* Fixed build on framework and windows

* Fixed winhandler test

* Swap variable based on order in defining class

* Post merge fixes

* Feedback and build

* Reverted connection state to pass around abort error code

* Fixed exception type.
  • Loading branch information
ManickaP committed Jul 13, 2022
1 parent 912f2ed commit b09e73e
Show file tree
Hide file tree
Showing 50 changed files with 1,524 additions and 2,487 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ private void CloseWebSocket()
}
}

public abstract class GenericLoopbackConnection : IDisposable
public abstract class GenericLoopbackConnection : IAsyncDisposable
{
public abstract void Dispose();
public abstract ValueTask DisposeAsync();

public abstract Task InitializeConnectionAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,12 +838,12 @@ public async Task SendResponseBodyAsync(int streamId, ReadOnlyMemory<byte> respo
await SendResponseDataAsync(streamId, responseBody, isFinal).ConfigureAwait(false);
}

public override void Dispose()
public override async ValueTask DisposeAsync()
{
// Might have been already shutdown manually via WaitForConnectionShutdownAsync which nulls the _connectionStream.
if (_connectionStream != null)
{
ShutdownIgnoringErrorsAsync(_lastStreamId).GetAwaiter().GetResult();
await ShutdownIgnoringErrorsAsync(_lastStreamId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ public override void Dispose()

public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
using (Http2LoopbackConnection connection = await EstablishConnectionAsync().ConfigureAwait(false))
await using (Http2LoopbackConnection connection = await EstablishConnectionAsync().ConfigureAwait(false))
{
return await connection.HandleRequestAsync(statusCode, headers, content).ConfigureAwait(false);
}
}

public override async Task AcceptConnectionAsync(Func<GenericLoopbackConnection, Task> funcAsync)
{
using (Http2LoopbackConnection connection = await EstablishConnectionAsync().ConfigureAwait(false))
await using (Http2LoopbackConnection connection = await EstablishConnectionAsync().ConfigureAwait(false))
{
await funcAsync(connection).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public Http3LoopbackConnection(QuicConnection connection)

public long MaxHeaderListSize { get; private set; } = -1;

public override void Dispose()
public override async ValueTask DisposeAsync()
{
// Close any remaining request streams (but NOT control streams, as these should not be closed while the connection is open)
foreach (Http3LoopbackStream stream in _openStreams.Values)
{
stream.Dispose();
await stream.DisposeAsync().ConfigureAwait(false);
}

foreach (QuicStream stream in _delayedStreams)
{
stream.Dispose();
await stream.DisposeAsync().ConfigureAwait(false);
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
Expand All @@ -79,8 +79,8 @@ public override void Dispose()
_connection.Dispose();

// Dispose control streams so that we release their handles too.
_inboundControlStream?.Dispose();
_outboundControlStream?.Dispose();
await _inboundControlStream?.DisposeAsync().ConfigureAwait(false);
await _outboundControlStream?.DisposeAsync().ConfigureAwait(false);
#endif
}

Expand All @@ -104,7 +104,7 @@ public static int GetRequestId(QuicStream stream)
Debug.Assert(stream.CanRead && stream.CanWrite, "Stream must be a request stream.");

// TODO: QUIC streams can have IDs larger than int.MaxValue; update all our tests to use long rather than int.
return checked((int)stream.StreamId + 1);
return checked((int)stream.Id + 1);
}

public Http3LoopbackStream GetOpenRequest(int requestId = 0)
Expand Down Expand Up @@ -172,9 +172,9 @@ public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()

Assert.True(quicStream.CanWrite, "Expected writeable stream.");

_openStreams.Add(checked((int)quicStream.StreamId), stream);
_openStreams.Add(checked((int)quicStream.Id), stream);
_currentStream = stream;
_currentStreamId = quicStream.StreamId;
_currentStreamId = quicStream.Id;

return stream;
}
Expand Down Expand Up @@ -293,9 +293,9 @@ public async Task WaitForClientDisconnectAsync(bool refuseNewRequests = true)
break;
}

using (stream)
await using (stream)
{
await stream.AbortAndWaitForShutdownAsync(H3_REQUEST_REJECTED);
stream.Abort(H3_REQUEST_REJECTED);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public override async Task<GenericLoopbackConnection> EstablishGenericConnection

public override async Task AcceptConnectionAsync(Func<GenericLoopbackConnection, Task> funcAsync)
{
using Http3LoopbackConnection con = await EstablishHttp3ConnectionAsync().ConfigureAwait(false);
await using Http3LoopbackConnection con = await EstablishHttp3ConnectionAsync().ConfigureAwait(false);
await funcAsync(con).ConfigureAwait(false);
await con.ShutdownAsync();
}

public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
using var con = (Http3LoopbackConnection)await EstablishGenericConnectionAsync().ConfigureAwait(false);
await using Http3LoopbackConnection con = (Http3LoopbackConnection)await EstablishGenericConnectionAsync().ConfigureAwait(false);
return await con.HandleRequestAsync(statusCode, headers, content).ConfigureAwait(false);
}
}
Expand Down
21 changes: 7 additions & 14 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace System.Net.Test.Common
{

internal sealed class Http3LoopbackStream : IDisposable
internal sealed class Http3LoopbackStream : IAsyncDisposable
{
private const int MaximumVarIntBytes = 8;
private const long VarIntMax = (1L << 62) - 1;
Expand All @@ -40,12 +40,9 @@ public Http3LoopbackStream(QuicStream stream)
_stream = stream;
}

public void Dispose()
{
_stream.Dispose();
}
public ValueTask DisposeAsync() => _stream.DisposeAsync();

public long StreamId => _stream.StreamId;
public long StreamId => _stream.Id;

public async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
Expand Down Expand Up @@ -285,9 +282,7 @@ public async Task SendResponseBodyAsync(byte[] content, bool isFinal = true)

if (isFinal)
{
_stream.Shutdown();
await _stream.ShutdownCompleted().ConfigureAwait(false);
Dispose();
_stream.CompleteWrites();
}
}

Expand Down Expand Up @@ -389,7 +384,7 @@ async Task WaitForWriteCancellation()
{
try
{
await _stream.WaitForWriteCompletionAsync();
await _stream.WritesClosed;
}
catch (QuicException ex) when (ex.QuicError == QuicError.StreamAborted && ex.ApplicationErrorCode == Http3LoopbackConnection.H3_REQUEST_CANCELLED)
{
Expand Down Expand Up @@ -424,11 +419,9 @@ private async Task DrainResponseData()
}
}

public async Task AbortAndWaitForShutdownAsync(long errorCode)
public void Abort(long errorCode)
{
_stream.AbortRead(errorCode);
_stream.AbortWrite(errorCode);
await _stream.ShutdownCompleted();
_stream.Abort(QuicAbortDirection.Both, errorCode);
}

public async Task<(long? frameType, byte[] payload)> ReadFrameAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,18 @@ public override async Task<GenericLoopbackConnection> EstablishGenericConnection
{
return connection = await Http2LoopbackServerFactory.Singleton.CreateConnectionAsync(new SocketWrapper(socket), stream, options).ConfigureAwait(false);
}
else
else
{
throw new Exception($"Invalid ClearTextVersion={_options.ClearTextVersion} specified");
}
}
catch
{
connection?.Dispose();
connection = null;
{
if (connection is not null)
{
await connection.DisposeAsync();
connection = null;
}
stream.Dispose();
throw;
}
Expand All @@ -132,15 +135,15 @@ public override async Task<GenericLoopbackConnection> EstablishGenericConnection

public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
using (GenericLoopbackConnection connection = await EstablishGenericConnectionAsync().ConfigureAwait(false))
await using (GenericLoopbackConnection connection = await EstablishGenericConnectionAsync().ConfigureAwait(false))
{
return await connection.HandleRequestAsync(statusCode, headers, content).ConfigureAwait(false);
}
}

public override async Task AcceptConnectionAsync(Func<GenericLoopbackConnection, Task> funcAsync)
{
using (GenericLoopbackConnection connection = await EstablishGenericConnectionAsync().ConfigureAwait(false))
await using (GenericLoopbackConnection connection = await EstablishGenericConnectionAsync().ConfigureAwait(false))
{
await funcAsync(connection).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ await LoopbackServer.CreateClientAndServerAsync(
Assert.Equal(0, requestData.GetHeaderValueCount("Authorization"));
// Establish a session connection
using var connection = await server.EstablishConnectionAsync();
await using LoopbackServer.Connection connection = await server.EstablishConnectionAsync();
requestData = await connection.ReadRequestDataAsync();
string authHeaderValue = requestData.GetSingleHeaderValue("Authorization");
Assert.Contains("NTLM", authHeaderValue);
Expand Down
11 changes: 8 additions & 3 deletions src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public async Task<Connection> EstablishConnectionAsync()

public async Task AcceptConnectionAsync(Func<Connection, Task> funcAsync)
{
using (Connection connection = await EstablishConnectionAsync().ConfigureAwait(false))
await using (Connection connection = await EstablishConnectionAsync().ConfigureAwait(false))
{
await funcAsync(connection).ConfigureAwait(false);
}
Expand Down Expand Up @@ -654,7 +654,7 @@ private async Task<byte[]> ReadLineBytesAsync()
return null;
}

public override void Dispose()
public override async ValueTask DisposeAsync()
{
try
{
Expand All @@ -666,7 +666,12 @@ public override void Dispose()
}
catch (Exception) { }

#if !NETSTANDARD2_0 && !NETFRAMEWORK
await _stream.DisposeAsync().ConfigureAwait(false);
#else
_stream.Dispose();
await Task.CompletedTask.ConfigureAwait(false);
#endif
_socket?.Dispose();
}

Expand Down Expand Up @@ -1076,7 +1081,7 @@ public override Task WaitForCloseAsync(CancellationToken cancellationToken)

public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
using (Connection connection = await EstablishConnectionAsync().ConfigureAwait(false))
await using (Connection connection = await EstablishConnectionAsync().ConfigureAwait(false))
{
return await connection.HandleRequestAsync(statusCode, headers, content).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ await LoopbackServer.CreateClientAndServerAsync(
},
async s =>
{
using (LoopbackServer.Connection connection = await s.EstablishConnectionAsync().ConfigureAwait(false))
await using (LoopbackServer.Connection connection = await s.EstablishConnectionAsync().ConfigureAwait(false))
{
SslStream sslStream = connection.Stream as SslStream;
Assert.NotNull(sslStream);
Expand Down Expand Up @@ -76,7 +76,7 @@ await Http2LoopbackServer.CreateClientAndServerAsync(
},
async s =>
{
using (Http2LoopbackConnection connection = await s.EstablishConnectionAsync().ConfigureAwait(false))
await using (Http2LoopbackConnection connection = await s.EstablishConnectionAsync().ConfigureAwait(false))
{
SslStream sslStream = connection.Stream as SslStream;
Assert.NotNull(sslStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
throw new HttpRequestException(SR.net_http_request_aborted, null, RequestRetryType.RetryOnConnectionFailure);
}

requestStream!.StreamId = quicStream.StreamId;
requestStream!.StreamId = quicStream.Id;

bool goAway;
lock (SyncObj)
Expand Down Expand Up @@ -542,7 +542,7 @@ private async Task ProcessServerStreamAsync(QuicStream stream)
NetEventSource.Info(this, $"Ignoring server-initiated stream of unknown type {unknownStreamType}.");
}

stream.AbortRead((long)Http3ErrorCode.StreamCreationError);
stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.StreamCreationError);
stream.Dispose();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == s
// We're either observing GOAWAY, or the cancellationToken parameter has been canceled.
if (cancellationToken.IsCancellationRequested)
{
_stream.AbortWrite((long)Http3ErrorCode.RequestCancelled);
_stream.Abort(QuicAbortDirection.Write, (long)Http3ErrorCode.RequestCancelled);
throw new TaskCanceledException(ex.Message, ex, cancellationToken);
}
else
Expand All @@ -277,7 +277,7 @@ await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == s
}
catch (Exception ex)
{
_stream.AbortWrite((long)Http3ErrorCode.InternalError);
_stream.Abort(QuicAbortDirection.Write, (long)Http3ErrorCode.InternalError);
if (ex is HttpRequestException)
{
throw;
Expand Down Expand Up @@ -398,7 +398,7 @@ private async Task SendContentAsync(HttpContent content, CancellationToken cance
}
else
{
_stream.Shutdown();
_stream.CompleteWrites();
}

if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.RequestContentStop(writeStream.BytesWritten);
Expand Down Expand Up @@ -814,7 +814,7 @@ private async ValueTask ReadHeadersAsync(long headersLength, CancellationToken c
// https://tools.ietf.org/html/draft-ietf-quic-http-24#section-4.1.1
if (headersLength > _headerBudgetRemaining)
{
_stream.AbortWrite((long)Http3ErrorCode.ExcessiveLoad);
_stream.Abort(QuicAbortDirection.Write, (long)Http3ErrorCode.ExcessiveLoad);
throw new HttpRequestException(SR.Format(SR.net_http_response_headers_exceeded_length, _connection.Pool.Settings._maxResponseHeadersLength * 1024L));
}

Expand Down Expand Up @@ -1201,12 +1201,12 @@ private void HandleReadResponseContentException(Exception ex, CancellationToken
_connection.Abort(ex);
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
case OperationCanceledException oce when oce.CancellationToken == cancellationToken:
_stream.AbortRead((long)Http3ErrorCode.RequestCancelled);
_stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.RequestCancelled);
ExceptionDispatchInfo.Throw(ex); // Rethrow.
return; // Never reached.
}

_stream.AbortRead((long)Http3ErrorCode.InternalError);
_stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.InternalError);
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
}

Expand Down Expand Up @@ -1264,12 +1264,12 @@ private void AbortStream()
// If the request body isn't completed, cancel it now.
if (_requestContentLengthRemaining != 0) // 0 is used for the end of content writing, -1 is used for unknown Content-Length
{
_stream.AbortWrite((long)Http3ErrorCode.RequestCancelled);
_stream.Abort(QuicAbortDirection.Write, (long)Http3ErrorCode.RequestCancelled);
}
// If the response body isn't completed, cancel it now.
if (_responseDataPayloadRemaining != -1) // -1 is used for EOF, 0 for consumed DATA frame payload before the next read
{
_stream.AbortRead((long)Http3ErrorCode.RequestCancelled);
_stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.RequestCancelled);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task AltSvc_ConnectionFrame_UpgradeFrom20_Success()
Task<HttpResponseMessage> firstResponseTask = client.GetAsync(firstServer.Address);
Task serverTask = Task.Run(async () =>
{
using Http2LoopbackConnection connection = await firstServer.EstablishConnectionAsync();
await using Http2LoopbackConnection connection = await firstServer.EstablishConnectionAsync();
int streamId = await connection.ReadRequestHeaderAsync();
await connection.WriteFrameAsync(new AltSvcFrame($"https://{firstServer.Address.IdnHost}:{firstServer.Address.Port}", $"h3=\"{secondServer.Address.IdnHost}:{secondServer.Address.Port}\"", streamId: 0));
Expand All @@ -106,7 +106,7 @@ public async Task AltSvc_ResponseFrame_UpgradeFrom20_Success()
Task<HttpResponseMessage> firstResponseTask = client.GetAsync(firstServer.Address);
Task serverTask = Task.Run(async () =>
{
using Http2LoopbackConnection connection = await firstServer.EstablishConnectionAsync();
await using Http2LoopbackConnection connection = await firstServer.EstablishConnectionAsync();
int streamId = await connection.ReadRequestHeaderAsync();
await connection.SendDefaultResponseHeadersAsync(streamId);
Expand Down
Loading

0 comments on commit b09e73e

Please sign in to comment.