Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUIC] Adding CompleteWritesAsync #104032

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public async Task SendResponseBodyAsync(byte[] content, bool isFinal = true)

if (isFinal)
{
_stream.CompleteWrites();
await _stream.CompleteWritesAsync().ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ private async Task SendContentAsync(HttpContent content, CancellationToken cance
}
else
{
_stream.CompleteWrites();
await _stream.CompleteWritesAsync().ConfigureAwait(false);
}

if (HttpTelemetry.Log.IsEnabled()) HttpTelemetry.Log.RequestContentStop(bytesWritten);
Expand Down
2 changes: 2 additions & 0 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ internal QuicStream() { }
public void Abort(System.Net.Quic.QuicAbortDirection abortDirection, long errorCode) { }
public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
public override System.IAsyncResult BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
[System.ObsoleteAttribute("Will be removed soon, use CompleteWritesAsync instead.")]
public void CompleteWrites() { }
public System.Threading.Tasks.ValueTask CompleteWritesAsync() { throw null; }
protected override void Dispose(bool disposing) { }
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
public override int EndRead(System.IAsyncResult asyncResult) { throw null; }
Expand Down
71 changes: 55 additions & 16 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace System.Net.Quic;
/// <description>Allows to close the writing side of the stream as a single operation with the write itself.</description>
/// </item>
/// <item>
/// <term><see cref="CompleteWrites"/></term>
/// <term><see cref="CompleteWritesAsync"/></term>
/// <description>Close the writing side of the stream.</description>
/// </item>
/// <item>
Expand Down Expand Up @@ -147,7 +147,7 @@ public sealed partial class QuicStream

/// <summary>
/// A <see cref="Task"/> that will get completed once writing side has been closed.
/// Which might be by closing the write side via <see cref="CompleteWrites"/>
/// Which might be by closing the write side via <see cref="CompleteWritesAsync"/>
/// or <see cref="WriteAsync(System.ReadOnlyMemory{byte},bool,System.Threading.CancellationToken)"/> with <c>completeWrites: true</c> and getting acknowledgement from the peer for it,
/// or when <see cref="Abort"/> for <see cref="QuicAbortDirection.Write"/> is called,
/// or when the peer called <see cref="Abort"/> for <see cref="QuicAbortDirection.Read"/>.
Expand Down Expand Up @@ -360,51 +360,50 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
/// <param name="buffer">The region of memory to write data from.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <param name="completeWrites">Notifies the peer about gracefully closing the write side, i.e.: sends FIN flag with the data.</param>
public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool completeWrites, CancellationToken cancellationToken = default)
public async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool completeWrites, CancellationToken cancellationToken = default)
{
if (_disposed == 1)
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(QuicStream))));
}
ObjectDisposedException.ThrowIf(_disposed == 1, this);

if (!_canWrite)
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.net_quic_writing_notallowed)));
throw new InvalidOperationException(SR.net_quic_writing_notallowed);
}

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(this, $"{this} Stream writing memory of '{buffer.Length}' bytes while {(completeWrites ? "completing" : "not completing")} writes.");
}

if (_sendTcs.IsCompleted && cancellationToken.IsCancellationRequested)
if (_sendTcs.IsCompleted)
{
// Special case exception type for pre-canceled token while we've already transitioned to a final state and don't need to abort write.
// It must happen before we try to get the value task, since the task source is versioned and each instance must be awaited.
return ValueTask.FromCanceled(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
}

// Concurrent call, this one lost the race.
if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken))
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "write"))));
throw new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "write"));
}

// No need to call anything since we already have a result, most likely an exception.
if (valueTask.IsCompleted)
{
return valueTask;
await valueTask.ConfigureAwait(false);
return;
}

// For an empty buffer complete immediately, close the writing side of the stream if necessary.
if (buffer.IsEmpty)
{
_sendTcs.TrySetResult();
await valueTask.ConfigureAwait(false);
if (completeWrites)
{
CompleteWrites();
await CompleteWritesAsync().ConfigureAwait(false);
}
return valueTask;
return;
}

// We own the lock, abort might happen, but exception will get stored instead.
Expand Down Expand Up @@ -440,7 +439,11 @@ public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool completeWrites, Ca
}
}

return valueTask;
await valueTask.ConfigureAwait(false);
if (completeWrites)
{
await _sendTcs.GetFinalTask(this).ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -511,6 +514,7 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode)
/// <remarks>
/// Corresponds to an empty <see href="https://www.rfc-editor.org/rfc/rfc9000.html#frame-stream">STREAM</see> frame with <c>FIN</c> flag set to <c>true</c>.
/// </remarks>
[Obsolete("Will be removed soon, use CompleteWritesAsync instead.")]
public void CompleteWrites()
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);
Expand All @@ -535,6 +539,41 @@ public void CompleteWrites()
}
}

/// <summary>
/// Gracefully completes the writing side of the stream.
/// Equivalent to using <see cref="WriteAsync(System.ReadOnlyMemory{byte},bool,System.Threading.CancellationToken)"/> with <c>completeWrites: true</c>.
/// </summary>
/// <remarks>
/// Corresponds to an empty <see href="https://www.rfc-editor.org/rfc/rfc9000.html#frame-stream">STREAM</see> frame with <c>FIN</c> flag set to <c>true</c>.
/// </remarks>
public ValueTask CompleteWritesAsync()
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);

// Nothing to complete, the writing side is already closed.
if (_sendTcs.IsCompleted)
{
return ValueTask.CompletedTask;
}

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(this, $"{this} Completing writes.");
}
unsafe
{
int status = MsQuicApi.Api.StreamShutdown(
_handle,
QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL,
default);
if (StatusFailed(status))
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetExceptionForMsQuicStatus(status, message: "StreamShutdown failed")));
}
}
return new ValueTask(_sendTcs.GetFinalTask(this));
}

private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data)
{
Debug.Assert(_decrementStreamCapacity is not null);
Expand Down Expand Up @@ -709,7 +748,7 @@ private static unsafe int NativeCallback(QUIC_HANDLE* stream, void* context, QUI
/// <summary>
/// If the read side is not fully consumed, i.e.: <see cref="ReadsClosed"/> is not completed and/or <see cref="ReadAsync(Memory{byte}, CancellationToken)"/> hasn't returned <c>0</c>,
/// dispose will abort the read side with provided <see cref="QuicConnectionOptions.DefaultStreamErrorCode"/>.
/// If the write side hasn't been closed, it'll be closed gracefully as if <see cref="CompleteWrites"/> was called.
/// If the write side hasn't been closed, it'll be closed gracefully as if <see cref="CompleteWritesAsync"/> was called.
/// Finally, all resources associated with the stream will be released.
/// </summary>
/// <returns>A task that represents the asynchronous dispose operation.</returns>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ await RunClientServer(
}
}

stream.CompleteWrites();
await stream.CompleteWritesAsync();
},
async serverConnection =>
{
Expand All @@ -1046,7 +1046,7 @@ await RunClientServer(
int expectedTotalBytes = writes.SelectMany(x => x).Sum();
Assert.Equal(expectedTotalBytes, totalBytes);

stream.CompleteWrites();
await stream.CompleteWritesAsync();
});
}

Expand Down Expand Up @@ -1339,7 +1339,7 @@ public async Task BigWrite_SmallRead_Success(bool closeWithData)

if (!closeWithData)
{
serverStream.CompleteWrites();
await serverStream.CompleteWritesAsync();
}

readLength = await clientStream.ReadAsync(actual);
Expand Down
Loading
Loading