From 2852371f93974a34ac3d39a7fbeeb3228425a98e Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 13 Feb 2019 00:35:13 +0000 Subject: [PATCH] Make BufferSegment pool lock-free --- .../src/System.IO.Pipelines.csproj | 4 + .../src/System/IO/Pipelines/Pipe.cs | 47 +----- .../SingleProducerSingleConsumerPool.cs | 147 ++++++++++++++++++ 3 files changed, 156 insertions(+), 42 deletions(-) create mode 100644 src/System.IO.Pipelines/src/System/IO/Pipelines/SingleProducerSingleConsumerPool.cs diff --git a/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj index 2758ac3770a0..86e96c73fecd 100644 --- a/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -10,6 +10,9 @@ + + Common\Internal\Padding.cs + @@ -24,6 +27,7 @@ + diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index d8c609e12517..e677ed6d7ad4 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -42,11 +42,7 @@ public sealed partial class Pipe private readonly PipeScheduler _readerScheduler; private readonly PipeScheduler _writerScheduler; - // Segment Pool - private int _pooledSegmentCount; - private readonly BufferSegment[] _bufferSegmentPool; - // Temporary list to hold Segments return while being reset - private readonly BufferSegment[] _bufferSegmentsToReturn; + private readonly SingleProducerSingleConsumerPool _bufferSegmentPool; private readonly DefaultPipeReader _reader; private readonly DefaultPipeWriter _writer; @@ -99,8 +95,7 @@ public Pipe(PipeOptions options) ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } - _bufferSegmentPool = new BufferSegment[SegmentPoolSize]; - _bufferSegmentsToReturn = new BufferSegment[SegmentPoolSize]; + _bufferSegmentPool = new SingleProducerSingleConsumerPool(SegmentPoolSize); _operationState = default; _readerCompletion = default; @@ -242,15 +237,9 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) private BufferSegment CreateSegment() { - BufferSegment[] segmentPool = _bufferSegmentPool; - lock (segmentPool) + if (_bufferSegmentPool.TryDequeue(out BufferSegment segment)) { - int index = _pooledSegmentCount - 1; - if ((uint)index < (uint)segmentPool.Length) - { - _pooledSegmentCount = index; - return segmentPool[index]; - } + return segment; } return new BufferSegment(); @@ -261,9 +250,6 @@ private void ReturnSegments(BufferSegment from, BufferSegment toExclusive) Debug.Assert(from != null); Debug.Assert(from != toExclusive); - // Reset the Segments and return their data out of lock - BufferSegment[] segmentToReturn = _bufferSegmentsToReturn; - int count = 0; do { BufferSegment next = from.NextSegment; @@ -271,33 +257,10 @@ private void ReturnSegments(BufferSegment from, BufferSegment toExclusive) from.ResetMemory(); - if ((uint)count < (uint)segmentToReturn.Length) - { - // Store in temporary list while preforming expensive resets - segmentToReturn[count] = from; - count++; - } + _bufferSegmentPool.TryEnqueue(from); from = next; } while (from != toExclusive); - - // Add the Segments back to pool from the temporary list under lock - BufferSegment[] segmentPool = _bufferSegmentPool; - lock (segmentPool) - { - int index = _pooledSegmentCount; - for (int i = 0; i < count; i++) - { - if ((uint)index < (uint)segmentPool.Length) - { - segmentPool[index] = segmentToReturn[i]; - index++; - } - segmentToReturn[i] = null; - } - - _pooledSegmentCount = index; - } } internal bool CommitUnsynchronized() diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/SingleProducerSingleConsumerPool.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/SingleProducerSingleConsumerPool.cs new file mode 100644 index 000000000000..dfa0990392e3 --- /dev/null +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/SingleProducerSingleConsumerPool.cs @@ -0,0 +1,147 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Runtime.InteropServices; + +using Internal; + +namespace System.IO.Pipelines +{ + internal sealed class SingleProducerSingleConsumerPool + { + // Adapted from SingleProducerSingleConsumerQueue + + /// The maximum size to use for segments (in number of elements). + private const int MaxSegmentSize = 0x1000000; // this could be made as large as Int32.MaxValue / 2 + + /// The data stored in this segment. + private readonly T[] _array; + /// Details about the segment. + private SegmentState _state; // separated out to enable StructLayout attribute to take effect + + /// Initializes the queue. + public SingleProducerSingleConsumerPool(int size) + { + // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation + Debug.Assert(size > 0, "Initial segment size must be > 0."); + Debug.Assert((size & (size - 1)) == 0, "Initial segment size must be a power of 2"); + Debug.Assert(size <= MaxSegmentSize, "Initial segment size should be <= maximum."); + Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur."); + + // Initialize the pool + _array = new T[size]; + } + + /// Enqueues an item into the queue. + /// The item to enqueue. + public bool TryEnqueue(T item) + { + T[] array = _array; + int last = _state._last; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously room + int tail2 = (last + 1) & (array.Length - 1); + if (tail2 != _state._firstCopy) + { + array[last] = item; + _state._last = tail2; + return true; + } + // Slow path: there may not be room + else + { + return TryEnqueueSlow(item); + } + } + + /// Enqueues an item into the queue. + /// The item to enqueue. + private bool TryEnqueueSlow(T item) + { + if (_state._firstCopy != _state._first) + { + _state._firstCopy = _state._first; + return TryEnqueue(item); // will only recur once for this enqueue operation + } + + return false; + } + + /// Attempts to dequeue an item from the queue. + /// The dequeued item. + /// true if an item could be dequeued; otherwise, false. + public bool TryDequeue(out T result) + { + T[] array = _array; + int first = _state._first; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously data available + if (first != _state._lastCopy) + { + result = array[first]; + array[first] = default; // Clear the slot to release the element + _state._first = (first + 1) & (array.Length - 1); + return true; + } + // Slow path: there may not be data available + else + { + return TryDequeueSlow(out result); + } + } + + /// Attempts to dequeue an item from the queue. + /// The dequeued item. + /// true if an item could be dequeued; otherwise, false. + private bool TryDequeueSlow(out T result) + { + T[] array = _array; + if (_state._last != _state._lastCopy) + { + _state._lastCopy = _state._last; + return TryDequeue(out result); // will only recur once for this dequeue operation + } + + int first = _state._first; // local copy to avoid extraneous volatile reads + + if (first == _state._last) + { + result = default; + return false; + } + + result = array[first]; + array[first] = default; // Clear the slot to release the element + _state._first = (first + 1) & (_array.Length - 1); + _state._lastCopy = _state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy + + return true; + } + + /// Stores information about a segment. + [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing + private struct SegmentState + { + /// Padding to reduce false sharing between the segment's array and _first. + internal PaddingFor32 _pad0; + + /// The index of the current head in the segment. + internal volatile int _first; + /// A copy of the current tail index. + internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first + + /// Padding to reduce false sharing between the first and last. + internal PaddingFor32 _pad1; + + /// A copy of the current head index. + internal int _firstCopy; // not volatile as only read and written by the consumer thread + /// The index of the current tail in the segment. + internal volatile int _last; + + /// Padding to reduce false sharing with the last and what's after the segment. + internal PaddingFor32 _pad2; + } + } +}