From 912be300e9c0750187a9d9215e29529213e002fa Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 18 Jan 2016 13:27:09 +0000 Subject: [PATCH] Complete sync-blocked calls directly rather than on threadpool --- .../Http/Frame.cs | 7 ++- .../Http/SocketOutput.cs | 56 ++++++++++++------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 54cc69855..9d905384b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -421,7 +421,7 @@ public void Write(ArraySegment data) { return; } - WriteChunkedAsync(data, RequestAborted).GetAwaiter().GetResult(); + WriteChunked(data); } else { @@ -468,6 +468,11 @@ public async Task WriteAsyncAwaited(ArraySegment data, CancellationToken c } } + private void WriteChunked(ArraySegment data) + { + SocketOutput.Write(data, immediate: false, chunk: true); + } + private Task WriteChunkedAsync(ArraySegment data, CancellationToken cancellationToken) { return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index cf3f8dc52..4f252ba89 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -48,7 +48,7 @@ public class SocketOutput : ISocketOutput private int _numBytesPreCompleted = 0; private Exception _lastWriteError; private WriteContext _nextWriteContext; - private readonly Queue> _tasksPending; + private readonly Queue _tasksPending; private readonly Queue _writeContextPool; private readonly Queue _writeReqPool; @@ -68,7 +68,7 @@ public SocketOutput( _connectionId = connectionId; _log = log; _threadPool = threadPool; - _tasksPending = new Queue>(_initialTaskQueues); + _tasksPending = new Queue(_initialTaskQueues); _writeContextPool = new Queue(_maxPooledWriteContexts); _writeReqPool = writeReqPool; @@ -81,7 +81,8 @@ public Task WriteAsync( bool immediate = true, bool chunk = false, bool socketShutdownSend = false, - bool socketDisconnect = false) + bool socketDisconnect = false, + bool isSync = false) { TaskCompletionSource tcs = null; var scheduleWrite = false; @@ -147,7 +148,11 @@ public Task WriteAsync( { // immediate write, which is not eligable for instant completion above tcs = new TaskCompletionSource(buffer.Count); - _tasksPending.Enqueue(tcs); + _tasksPending.Enqueue(new WaitingTask() { + CompletionSource = tcs, + BytesToWrite = buffer.Count, + isSync = isSync + }); } if (!_writePending && immediate) @@ -316,21 +321,35 @@ private void OnWriteCompleted(WriteContext writeContext) // This allows large writes to complete once they've actually finished. var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; while (_tasksPending.Count > 0 && - (int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer) + (_tasksPending.Peek().BytesToWrite) <= bytesLeftToBuffer) { - var tcs = _tasksPending.Dequeue(); - var bytesToWrite = (int)tcs.Task.AsyncState; + var waitingTask = _tasksPending.Dequeue(); + var bytesToWrite = waitingTask.BytesToWrite; _numBytesPreCompleted += bytesToWrite; bytesLeftToBuffer -= bytesToWrite; if (_lastWriteError == null) { - _threadPool.Complete(tcs); + if (waitingTask.isSync) + { + waitingTask.CompletionSource.TrySetResult(null); + } + else + { + _threadPool.Complete(waitingTask.CompletionSource); + } } else { - _threadPool.Error(tcs, _lastWriteError); + if (waitingTask.isSync) + { + waitingTask.CompletionSource.TrySetException(_lastWriteError); + } + else + { + _threadPool.Error(waitingTask.CompletionSource, _lastWriteError); + } } } @@ -374,16 +393,7 @@ private void PoolWriteContext(WriteContext writeContext) void ISocketOutput.Write(ArraySegment buffer, bool immediate, bool chunk) { - var task = WriteAsync(buffer, immediate, chunk); - - if (task.Status == TaskStatus.RanToCompletion) - { - return; - } - else - { - task.GetAwaiter().GetResult(); - } + WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult(); } Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, bool chunk, CancellationToken cancellationToken) @@ -634,5 +644,13 @@ public void Reset() ShutdownSendStatus = 0; } } + + private struct WaitingTask + { + public bool isSync; + public int BytesToWrite; + public IDisposable CancellationRegistration; + public TaskCompletionSource CompletionSource; + } } }