Skip to content

Commit

Permalink
Optimize hot path on QuicStream.ReadAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
rzikm committed Jun 24, 2024
1 parent d719019 commit 4b8db0e
Showing 1 changed file with 45 additions and 30 deletions.
75 changes: 45 additions & 30 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default)
}

/// <inheritdoc />
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);

Expand All @@ -283,21 +283,41 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
cancellationToken.ThrowIfCancellationRequested();
}

// The following loop will repeat at most twice depending whether some data are readily available in the buffer (one iteration) or not.
// In which case, it'll wait on RECEIVE or any of PEER_SEND_(SHUTDOWN|ABORTED) event and attempt to copy data in the second iteration.
int totalCopied = 0;
do
if (TryReadAsyncInternal(buffer, out int totalCopied, out ValueTask valueTask, cancellationToken))
{
// hot path: there was data available.
Debug.Assert(valueTask.IsCompletedSuccessfully, "No pending read expected.");
valueTask.GetAwaiter().GetResult();

FinalizeRead(totalCopied);
return ValueTask.FromResult(totalCopied);
}

// cold path: no data available, await pending read
return ReadAsyncCold(buffer, valueTask, cancellationToken);

async ValueTask<int> ReadAsyncCold(Memory<byte> buffer, ValueTask pendingTask, CancellationToken cancellationToken)
{
await pendingTask.ConfigureAwait(false);

bool success = TryReadAsyncInternal(buffer, out int totalCopied, out pendingTask, cancellationToken);
Debug.Assert(success, "TryReadAsyncInternal should succeed after the await.");
Debug.Assert(pendingTask.IsCompletedSuccessfully, "No pending read expected.");
pendingTask.GetAwaiter().GetResult();

FinalizeRead(totalCopied);
return totalCopied;
}

bool TryReadAsyncInternal(Memory<byte> buffer, out int read, out ValueTask valueTask, CancellationToken cancellationToken)
{
// Concurrent call, this one lost the race.
if (!_receiveTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken))
if (!_receiveTcs.TryGetValueTask(out valueTask, this, cancellationToken))
{
throw new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "read"));
}

// Copy data from the buffer, reduce target and increment total.
int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty);
buffer = buffer.Slice(copied);
totalCopied += copied;
read = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty);

// Make sure the task transitions into final state before the method finishes.
if (complete)
Expand All @@ -306,38 +326,33 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}

// Unblock the next await to end immediately, i.e. there were/are any data in the buffer.
if (totalCopied > 0 || !empty)
if (read > 0 || !empty)
{
_receiveTcs.TrySetResult();
}

// This will either wait for RECEIVE event (no data in buffer) or complete immediately and reset the task.
await valueTask.ConfigureAwait(false);
return read > 0 || !empty || complete;
}

// This is the last read, finish even despite not copying anything.
if (complete)
void FinalizeRead(int totalCopied)
{
if (totalCopied > 0 && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1)
{
break;
unsafe
{
ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamReceiveSetEnabled(
_handle,
1),
"StreamReceivedSetEnabled failed");
}
}
} while (!buffer.IsEmpty && totalCopied == 0); // Exit the loop if target buffer is full we at least copied something.

if (totalCopied > 0 && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1)
{
unsafe
if (NetEventSource.Log.IsEnabled())
{
ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamReceiveSetEnabled(
_handle,
1),
"StreamReceivedSetEnabled failed");
NetEventSource.Info(this, $"{this} Stream read '{totalCopied}' bytes.");
}
}

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(this, $"{this} Stream read '{totalCopied}' bytes.");
}

return totalCopied;
}

/// <inheritdoc />
Expand Down

0 comments on commit 4b8db0e

Please sign in to comment.