Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Write chunks async; unblock sync-waits directly #586

Merged
merged 2 commits into from
Jan 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,14 @@ private void WriteChunked(ArraySegment<byte> data)
SocketOutput.Write(data, immediate: false, chunk: true);
}

private async Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
await SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
}

private void WriteChunkedResponseSuffix()
private Task WriteChunkedResponseSuffix()
{
SocketOutput.Write(_endChunkedResponseBytes, immediate: true);
return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true);
}

private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
Expand Down Expand Up @@ -571,27 +571,37 @@ protected Task ProduceEnd()
return ProduceEndAwaited();
}

WriteSuffix();

return TaskUtilities.CompletedTask;
return WriteSuffix();
}

private async Task ProduceEndAwaited()
{
await ProduceStart(immediate: true, appCompleted: true);

WriteSuffix();
await WriteSuffix();
}

private void WriteSuffix()
private Task WriteSuffix()
{
// _autoChunk should be checked after we are sure ProduceStart() has been called
// since ProduceStart() may set _autoChunk to true.
if (_autoChunk)
{
WriteChunkedResponseSuffix();
return WriteAutoChunkSuffixAwaited();
}

if (_keepAlive)
{
ConnectionControl.End(ProduceEndType.ConnectionKeepAlive);
}

return TaskUtilities.CompletedTask;
}

private async Task WriteAutoChunkSuffixAwaited()
{
await WriteChunkedResponseSuffix();

if (_keepAlive)
{
ConnectionControl.End(ProduceEndType.ConnectionKeepAlive);
Expand Down
55 changes: 36 additions & 19 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SocketOutput : ISocketOutput
private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<WaitingTask> _tasksPending;
private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool;

Expand All @@ -68,7 +68,7 @@ public SocketOutput(
_connectionId = connectionId;
_log = log;
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksPending = new Queue<WaitingTask>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool;

Expand All @@ -81,7 +81,8 @@ public Task WriteAsync(
bool immediate = true,
bool chunk = false,
bool socketShutdownSend = false,
bool socketDisconnect = false)
bool socketDisconnect = false,
bool isSync = false)
{
TaskCompletionSource<object> tcs = null;
var scheduleWrite = false;
Expand Down Expand Up @@ -147,7 +148,11 @@ public Task WriteAsync(
{
// immediate write, which is not eligable for instant completion above
tcs = new TaskCompletionSource<object>(buffer.Count);
_tasksPending.Enqueue(tcs);
_tasksPending.Enqueue(new WaitingTask() {
CompletionSource = tcs,
BytesToWrite = buffer.Count,
IsSync = isSync
});
}

if (!_writePending && immediate)
Expand Down Expand Up @@ -316,21 +321,35 @@ private void OnWriteCompleted(WriteContext writeContext)
// This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
while (_tasksPending.Count > 0 &&
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer)
(_tasksPending.Peek().BytesToWrite) <= bytesLeftToBuffer)
{
var tcs = _tasksPending.Dequeue();
var bytesToWrite = (int)tcs.Task.AsyncState;
var waitingTask = _tasksPending.Dequeue();
var bytesToWrite = waitingTask.BytesToWrite;

_numBytesPreCompleted += bytesToWrite;
bytesLeftToBuffer -= bytesToWrite;

if (_lastWriteError == null)
{
_threadPool.Complete(tcs);
if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetResult(null);
}
else
{
_threadPool.Complete(waitingTask.CompletionSource);
}
}
else
{
_threadPool.Error(tcs, _lastWriteError);
if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetException(_lastWriteError);
}
else
{
_threadPool.Error(waitingTask.CompletionSource, _lastWriteError);
}
}
}

Expand Down Expand Up @@ -374,16 +393,7 @@ private void PoolWriteContext(WriteContext writeContext)

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
{
var task = WriteAsync(buffer, immediate, chunk);

if (task.Status == TaskStatus.RanToCompletion)
{
return;
}
else
{
task.GetAwaiter().GetResult();
}
WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult();
}

Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
Expand Down Expand Up @@ -634,5 +644,12 @@ public void Reset()
ShutdownSendStatus = 0;
}
}

private struct WaitingTask
{
public bool IsSync;
public int BytesToWrite;
public TaskCompletionSource<object> CompletionSource;
}
}
}