Skip to content

Commit

Permalink
Add ISocketConnectionContextFactory (#34769)
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoK authored Aug 13, 2021
1 parent c7fa6c1 commit ff51fd7
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@
*REMOVED*Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportFactory.SocketTransportFactory(Microsoft.Extensions.Options.IOptions<Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions> options, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
*REMOVED*static Microsoft.AspNetCore.Hosting.WebHostBuilderSocketExtensions.UseSockets(this Microsoft.AspNetCore.Hosting.IWebHostBuilder hostBuilder) -> Microsoft.AspNetCore.Hosting.IWebHostBuilder
*REMOVED*static Microsoft.AspNetCore.Hosting.WebHostBuilderSocketExtensions.UseSockets(this Microsoft.AspNetCore.Hosting.IWebHostBuilder hostBuilder, System.Action<Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions> configureOptions) -> Microsoft.AspNetCore.Hosting.IWebHostBuilder
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionContextFactory
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionContextFactory.Create(System.Net.Sockets.Socket! socket) -> Microsoft.AspNetCore.Connections.ConnectionContext!
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionContextFactory.Dispose() -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionContextFactory.SocketConnectionContextFactory(Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions! options, Microsoft.Extensions.Logging.ILogger! logger) -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.IOQueueCount.get -> int
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.IOQueueCount.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.MaxReadBufferSize.get -> long?
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.MaxReadBufferSize.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.MaxWriteBufferSize.get -> long?
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.MaxWriteBufferSize.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.SocketConnectionFactoryOptions() -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.UnsafePreferInlineScheduling.get -> bool
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.UnsafePreferInlineScheduling.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.WaitForDataBeforeAllocatingBuffer.get -> bool
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactoryOptions.WaitForDataBeforeAllocatingBuffer.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportFactory.BindAsync(System.Net.EndPoint! endpoint, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<Microsoft.AspNetCore.Connections.IConnectionListener!>
~Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportFactory.SocketTransportFactory(Microsoft.Extensions.Options.IOptions<Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions!>! options, Microsoft.Extensions.Logging.ILoggerFactory! loggerFactory) -> void
static Microsoft.AspNetCore.Hosting.WebHostBuilderSocketExtensions.UseSockets(this Microsoft.AspNetCore.Hosting.IWebHostBuilder! hostBuilder) -> Microsoft.AspNetCore.Hosting.IWebHostBuilder!
static Microsoft.AspNetCore.Hosting.WebHostBuilderSocketExtensions.UseSockets(this Microsoft.AspNetCore.Hosting.IWebHostBuilder! hostBuilder, System.Action<Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions!>! configureOptions) -> Microsoft.AspNetCore.Hosting.IWebHostBuilder!
static Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions.CreateDefaultBoundListenSocket(System.Net.EndPoint! endpoint) -> System.Net.Sockets.Socket!
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions.CreateBoundListenSocket.get -> System.Func<System.Net.EndPoint!, System.Net.Sockets.Socket!>!
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions.CreateBoundListenSocket.set -> void
Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketTransportOptions.CreateBoundListenSocket.set -> void
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.IO.Pipelines;
using System.Net.Sockets;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
/// <summary>
/// A factory for socket based connections contexts.
/// </summary>
public sealed class SocketConnectionContextFactory : IDisposable
{
private readonly MemoryPool<byte> _memoryPool;
private readonly SocketConnectionFactoryOptions _options;
private readonly ISocketsTrace _trace;
private readonly int _settingsCount;
private readonly QueueSettings[] _settings;
private int _settingsIndex;

/// <summary>
/// Creates the <see cref="SocketConnectionContextFactory"/>.
/// </summary>
/// <param name="options">The options.</param>
/// <param name="logger">The logger.</param>
public SocketConnectionContextFactory(SocketConnectionFactoryOptions options, ILogger logger)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

if (logger == null)
{
throw new ArgumentNullException(nameof(logger));
}

_options = options;
_trace = new SocketsTrace(logger);
_memoryPool = _options.MemoryPoolFactory();
_settingsCount = _options.IOQueueCount;

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
var applicationScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

if (_settingsCount > 0)
{
_settings = new QueueSettings[_settingsCount];

for (var i = 0; i < _settingsCount; i++)
{
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue();
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

_settings[i] = new QueueSettings()
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false),
SocketSenderPool = new SocketSenderPool(awaiterScheduler)
};
}
}
else
{
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;
_settings = new QueueSettings[]
{
new QueueSettings()
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false),
SocketSenderPool = new SocketSenderPool(awaiterScheduler)
}
};
_settingsCount = 1;
}
}

/// <summary>
/// Create a <see cref="ConnectionContext"/> for a socket.
/// </summary>
/// <param name="socket">The socket for the connection.</param>
/// <returns></returns>
public ConnectionContext Create(Socket socket)
{
var setting = _settings[Interlocked.Increment(ref _settingsIndex) % _settingsCount];

var connection = new SocketConnection(socket,
_memoryPool,
setting.Scheduler,
_trace,
setting.SocketSenderPool,
setting.InputOptions,
setting.OutputOptions,
waitForData: _options.WaitForDataBeforeAllocatingBuffer);

connection.Start();
return connection;
}

/// <inheritdoc />
public void Dispose()
{
// Dispose the memory pool
_memoryPool.Dispose();

// Dispose any pooled senders
foreach (var setting in _settings)
{
setting.SocketSenderPool.Dispose();
}
}

private class QueueSettings
{
public PipeScheduler Scheduler { get; init; } = default!;
public PipeOptions InputOptions { get; init; } = default!;
public PipeOptions OutputOptions { get; init; } = default!;
public SocketSenderPool SocketSenderPool { get; init; } = default!;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
/// <summary>
/// Options for <see cref="SocketConnectionContextFactory"/>.
/// </summary>
public class SocketConnectionFactoryOptions
{
/// <summary>
/// Create a new instance.
/// </summary>
public SocketConnectionFactoryOptions() { }

internal SocketConnectionFactoryOptions(SocketTransportOptions transportOptions)
{
IOQueueCount = transportOptions.IOQueueCount;
WaitForDataBeforeAllocatingBuffer = transportOptions.WaitForDataBeforeAllocatingBuffer;
MaxReadBufferSize = transportOptions.MaxReadBufferSize;
MaxWriteBufferSize = transportOptions.MaxWriteBufferSize;
UnsafePreferInlineScheduling = transportOptions.UnsafePreferInlineScheduling;
MemoryPoolFactory = transportOptions.MemoryPoolFactory;
}

/// <summary>
/// The number of I/O queues used to process requests. Set to 0 to directly schedule I/O to the ThreadPool.
/// </summary>
/// <remarks>
/// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
/// </remarks>
public int IOQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);

/// <summary>
/// Wait until there is data available to allocate a buffer. Setting this to false can increase throughput at the cost of increased memory usage.
/// </summary>
/// <remarks>
/// Defaults to true.
/// </remarks>
public bool WaitForDataBeforeAllocatingBuffer { get; set; } = true;

/// <summary>
/// Gets or sets the maximum unconsumed incoming bytes the transport will buffer.
/// </summary>
public long? MaxReadBufferSize { get; set; } = 1024 * 1024;

/// <summary>
/// Gets or sets the maximum outgoing bytes the transport will buffer before applying write backpressure.
/// </summary>
public long? MaxWriteBufferSize { get; set; } = 64 * 1024;

/// <summary>
/// Inline application and transport continuations instead of dispatching to the threadpool.
/// </summary>
/// <remarks>
/// This will run application code on the IO thread which is why this is unsafe.
/// It is recommended to set the DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS environment variable to '1' when using this setting to also inline the completions
/// at the runtime layer as well.
/// This setting can make performance worse if there is expensive work that will end up holding onto the IO thread for longer than needed.
/// Test to make sure this setting helps performance.
/// </remarks>
public bool UnsafePreferInlineScheduling { get; set; }

internal Func<MemoryPool<byte>> MemoryPoolFactory { get; set; } = PinnedBlockMemoryPoolFactory.Create;
}
}
Original file line number Diff line number Diff line change
@@ -1,87 +1,34 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers;
using System.ComponentModel;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
internal sealed class SocketConnectionListener : IConnectionListener
{
private readonly MemoryPool<byte> _memoryPool;
private readonly int _settingsCount;
private readonly Settings[] _settings;
private readonly SocketConnectionContextFactory _factory;
private readonly ISocketsTrace _trace;
private Socket? _listenSocket;
private int _settingsIndex;
private readonly SocketTransportOptions _options;

public EndPoint EndPoint { get; private set; }

internal SocketConnectionListener(
EndPoint endpoint,
SocketTransportOptions options,
ISocketsTrace trace)
ILoggerFactory loggerFactory)
{
EndPoint = endpoint;
_trace = trace;
_options = options;
_memoryPool = _options.MemoryPoolFactory();
var ioQueueCount = options.IOQueueCount;

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
var applicationScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

if (ioQueueCount > 0)
{
_settingsCount = ioQueueCount;
_settings = new Settings[_settingsCount];

for (var i = 0; i < _settingsCount; i++)
{
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue();
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

_settings[i] = new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false),
SocketSenderPool = new SocketSenderPool(awaiterScheduler)
};
}
}
else
{
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

var directScheduler = new Settings[]
{
new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false),
SocketSenderPool = new SocketSenderPool(awaiterScheduler)
}
};

_settingsCount = directScheduler.Length;
_settings = directScheduler;
}
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets");
_trace = new SocketsTrace(logger);
_factory = new SocketConnectionContextFactory(new SocketConnectionFactoryOptions(options), logger);
}

internal void Bind()
Expand Down Expand Up @@ -125,22 +72,7 @@ internal void Bind()
acceptSocket.NoDelay = _options.NoDelay;
}

var setting = _settings[_settingsIndex];

var connection = new SocketConnection(acceptSocket,
_memoryPool,
setting.Scheduler,
_trace,
setting.SocketSenderPool,
setting.InputOptions,
setting.OutputOptions,
waitForData: _options.WaitForDataBeforeAllocatingBuffer);

connection.Start();

_settingsIndex = (_settingsIndex + 1) % _settingsCount;

return connection;
return _factory.Create(acceptSocket);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -170,24 +102,9 @@ public ValueTask DisposeAsync()
{
_listenSocket?.Dispose();

// Dispose the memory pool
_memoryPool.Dispose();

// Dispose any pooled senders
foreach (var setting in _settings)
{
setting.SocketSenderPool.Dispose();
}
_factory.Dispose();

return default;
}

private class Settings
{
public PipeScheduler Scheduler { get; init; } = default!;
public PipeOptions InputOptions { get; init; } = default!;
public PipeOptions OutputOptions { get; init; } = default!;
public SocketSenderPool SocketSenderPool { get; init; } = default!;
}
}
}
Loading

0 comments on commit ff51fd7

Please sign in to comment.