Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Make BufferSegment pool lock-free
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Feb 13, 2019
1 parent e778d5b commit 2852371
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 42 deletions.
4 changes: 4 additions & 0 deletions src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
<Compile Include="System\IO\Pipelines\FlushResult.cs" />
<Compile Include="System\IO\Pipelines\InlineScheduler.cs" />
<Compile Include="System\IO\Pipelines\IDuplexPipe.cs" />
<Compile Include="$(CommonPath)\Internal\Padding.cs">
<Link>Common\Internal\Padding.cs</Link>
</Compile>
<Compile Include="System\IO\Pipelines\Pipe.DefaultPipeReader.cs" />
<Compile Include="System\IO\Pipelines\Pipe.DefaultPipeWriter.cs" />
<Compile Include="System\IO\Pipelines\Pipe.cs" />
Expand All @@ -24,6 +27,7 @@
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\SingleProducerSingleConsumerPool.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
<Compile Include="System\IO\Pipelines\ThrowHelper.cs" />
</ItemGroup>
Expand Down
47 changes: 5 additions & 42 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ public sealed partial class Pipe
private readonly PipeScheduler _readerScheduler;
private readonly PipeScheduler _writerScheduler;

// Segment Pool
private int _pooledSegmentCount;
private readonly BufferSegment[] _bufferSegmentPool;
// Temporary list to hold Segments return while being reset
private readonly BufferSegment[] _bufferSegmentsToReturn;
private readonly SingleProducerSingleConsumerPool<BufferSegment> _bufferSegmentPool;

private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
Expand Down Expand Up @@ -99,8 +95,7 @@ public Pipe(PipeOptions options)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
}

_bufferSegmentPool = new BufferSegment[SegmentPoolSize];
_bufferSegmentsToReturn = new BufferSegment[SegmentPoolSize];
_bufferSegmentPool = new SingleProducerSingleConsumerPool<BufferSegment>(SegmentPoolSize);

_operationState = default;
_readerCompletion = default;
Expand Down Expand Up @@ -242,15 +237,9 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)

private BufferSegment CreateSegment()
{
BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
if (_bufferSegmentPool.TryDequeue(out BufferSegment segment))
{
int index = _pooledSegmentCount - 1;
if ((uint)index < (uint)segmentPool.Length)
{
_pooledSegmentCount = index;
return segmentPool[index];
}
return segment;
}

return new BufferSegment();
Expand All @@ -261,43 +250,17 @@ private void ReturnSegments(BufferSegment from, BufferSegment toExclusive)
Debug.Assert(from != null);
Debug.Assert(from != toExclusive);

// Reset the Segments and return their data out of lock
BufferSegment[] segmentToReturn = _bufferSegmentsToReturn;
int count = 0;
do
{
BufferSegment next = from.NextSegment;
Debug.Assert(next != null || toExclusive == null);

from.ResetMemory();

if ((uint)count < (uint)segmentToReturn.Length)
{
// Store in temporary list while preforming expensive resets
segmentToReturn[count] = from;
count++;
}
_bufferSegmentPool.TryEnqueue(from);

from = next;
} while (from != toExclusive);

// Add the Segments back to pool from the temporary list under lock
BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
{
int index = _pooledSegmentCount;
for (int i = 0; i < count; i++)
{
if ((uint)index < (uint)segmentPool.Length)
{
segmentPool[index] = segmentToReturn[i];
index++;
}
segmentToReturn[i] = null;
}

_pooledSegmentCount = index;
}
}

internal bool CommitUnsynchronized()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.Runtime.InteropServices;

using Internal;

namespace System.IO.Pipelines
{
internal sealed class SingleProducerSingleConsumerPool<T>
{
// Adapted from SingleProducerSingleConsumerQueue

/// <summary>The maximum size to use for segments (in number of elements).</summary>
private const int MaxSegmentSize = 0x1000000; // this could be made as large as Int32.MaxValue / 2

/// <summary>The data stored in this segment.</summary>
private readonly T[] _array;
/// <summary>Details about the segment.</summary>
private SegmentState _state; // separated out to enable StructLayout attribute to take effect

/// <summary>Initializes the queue.</summary>
public SingleProducerSingleConsumerPool(int size)
{
// Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
Debug.Assert(size > 0, "Initial segment size must be > 0.");
Debug.Assert((size & (size - 1)) == 0, "Initial segment size must be a power of 2");
Debug.Assert(size <= MaxSegmentSize, "Initial segment size should be <= maximum.");
Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");

// Initialize the pool
_array = new T[size];
}

/// <summary>Enqueues an item into the queue.</summary>
/// <param name="item">The item to enqueue.</param>
public bool TryEnqueue(T item)
{
T[] array = _array;
int last = _state._last; // local copy to avoid multiple volatile reads

// Fast path: there's obviously room
int tail2 = (last + 1) & (array.Length - 1);
if (tail2 != _state._firstCopy)
{
array[last] = item;
_state._last = tail2;
return true;
}
// Slow path: there may not be room
else
{
return TryEnqueueSlow(item);
}
}

/// <summary>Enqueues an item into the queue.</summary>
/// <param name="item">The item to enqueue.</param>
private bool TryEnqueueSlow(T item)
{
if (_state._firstCopy != _state._first)
{
_state._firstCopy = _state._first;
return TryEnqueue(item); // will only recur once for this enqueue operation
}

return false;
}

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
public bool TryDequeue(out T result)
{
T[] array = _array;
int first = _state._first; // local copy to avoid multiple volatile reads

// Fast path: there's obviously data available
if (first != _state._lastCopy)
{
result = array[first];
array[first] = default; // Clear the slot to release the element
_state._first = (first + 1) & (array.Length - 1);
return true;
}
// Slow path: there may not be data available
else
{
return TryDequeueSlow(out result);
}
}

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
private bool TryDequeueSlow(out T result)
{
T[] array = _array;
if (_state._last != _state._lastCopy)
{
_state._lastCopy = _state._last;
return TryDequeue(out result); // will only recur once for this dequeue operation
}

int first = _state._first; // local copy to avoid extraneous volatile reads

if (first == _state._last)
{
result = default;
return false;
}

result = array[first];
array[first] = default; // Clear the slot to release the element
_state._first = (first + 1) & (_array.Length - 1);
_state._lastCopy = _state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy

return true;
}

/// <summary>Stores information about a segment.</summary>
[StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
private struct SegmentState
{
/// <summary>Padding to reduce false sharing between the segment's array and _first.</summary>
internal PaddingFor32 _pad0;

/// <summary>The index of the current head in the segment.</summary>
internal volatile int _first;
/// <summary>A copy of the current tail index.</summary>
internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first

/// <summary>Padding to reduce false sharing between the first and last.</summary>
internal PaddingFor32 _pad1;

/// <summary>A copy of the current head index.</summary>
internal int _firstCopy; // not volatile as only read and written by the consumer thread
/// <summary>The index of the current tail in the segment.</summary>
internal volatile int _last;

/// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
internal PaddingFor32 _pad2;
}
}
}

0 comments on commit 2852371

Please sign in to comment.