Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Complete sync-blocked calls directly rather than on threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Jan 18, 2016
1 parent d3d9c8d commit e6ca887
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
7 changes: 6 additions & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void Write(ArraySegment<byte> data)
{
return;
}
WriteChunkedAsync(data, RequestAborted).GetAwaiter().GetResult();
WriteChunked(data);
}
else
{
Expand Down Expand Up @@ -468,6 +468,11 @@ public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken c
}
}

private void WriteChunked(ArraySegment<byte> data)
{
SocketOutput.Write(data, immediate: false, chunk: true);
}

private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
Expand Down
56 changes: 37 additions & 19 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SocketOutput : ISocketOutput
private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<WaitingTask> _tasksPending;
private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool;

Expand All @@ -68,7 +68,7 @@ public SocketOutput(
_connectionId = connectionId;
_log = log;
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksPending = new Queue<WaitingTask>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool;

Expand All @@ -81,7 +81,8 @@ public Task WriteAsync(
bool immediate = true,
bool chunk = false,
bool socketShutdownSend = false,
bool socketDisconnect = false)
bool socketDisconnect = false,
bool isSync = false)
{
TaskCompletionSource<object> tcs = null;
var scheduleWrite = false;
Expand Down Expand Up @@ -147,7 +148,11 @@ public Task WriteAsync(
{
// immediate write, which is not eligable for instant completion above
tcs = new TaskCompletionSource<object>(buffer.Count);
_tasksPending.Enqueue(tcs);
_tasksPending.Enqueue(new WaitingTask() {
CompletionSource = tcs,
BytesToWrite = buffer.Count,
IsSync = isSync
});
}

if (!_writePending && immediate)
Expand Down Expand Up @@ -316,21 +321,35 @@ private void OnWriteCompleted(WriteContext writeContext)
// This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
while (_tasksPending.Count > 0 &&
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer)
(_tasksPending.Peek().BytesToWrite) <= bytesLeftToBuffer)
{
var tcs = _tasksPending.Dequeue();
var bytesToWrite = (int)tcs.Task.AsyncState;
var waitingTask = _tasksPending.Dequeue();
var bytesToWrite = waitingTask.BytesToWrite;

_numBytesPreCompleted += bytesToWrite;
bytesLeftToBuffer -= bytesToWrite;

if (_lastWriteError == null)
{
_threadPool.Complete(tcs);
if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetResult(null);
}
else
{
_threadPool.Complete(waitingTask.CompletionSource);
}
}
else
{
_threadPool.Error(tcs, _lastWriteError);
if (waitingTask.IsSync)
{
waitingTask.CompletionSource.TrySetException(_lastWriteError);
}
else
{
_threadPool.Error(waitingTask.CompletionSource, _lastWriteError);
}
}
}

Expand Down Expand Up @@ -374,16 +393,7 @@ private void PoolWriteContext(WriteContext writeContext)

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
{
var task = WriteAsync(buffer, immediate, chunk);

if (task.Status == TaskStatus.RanToCompletion)
{
return;
}
else
{
task.GetAwaiter().GetResult();
}
WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult();
}

Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
Expand Down Expand Up @@ -634,5 +644,13 @@ public void Reset()
ShutdownSendStatus = 0;
}
}

private struct WaitingTask
{
public bool IsSync;
public int BytesToWrite;
public IDisposable CancellationRegistration;
public TaskCompletionSource<object> CompletionSource;
}
}
}

0 comments on commit e6ca887

Please sign in to comment.