Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Schedule completion prior to pooling segments
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Feb 12, 2019
1 parent caba0ef commit 5129e3a
Showing 1 changed file with 45 additions and 26 deletions.
71 changes: 45 additions & 26 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private void AllocateWriteHeadUnsynchronized(int sizeHint)

private BufferSegment AllocateSegment(int sizeHint)
{
BufferSegment newSegment = CreateSegmentUnsynchronized();
BufferSegment newSegment = CreateSegment();

if (_pool is null)
{
Expand Down Expand Up @@ -236,23 +236,46 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
return adjustedToMaximumSize;
}

private BufferSegment CreateSegmentUnsynchronized()
private BufferSegment CreateSegment()
{
if (_pooledSegmentCount > 0)
BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
{
_pooledSegmentCount--;
return _bufferSegmentPool[_pooledSegmentCount];
int index = _pooledSegmentCount - 1;
if ((uint)index < (uint)segmentPool.Length)
{
_pooledSegmentCount = index;
return segmentPool[index];
}
}

return new BufferSegment();
}

private void ReturnSegmentUnsynchronized(BufferSegment segment)
private void ReturnSegments(BufferSegment from, BufferSegment toExclusive)
{
if (_pooledSegmentCount < _bufferSegmentPool.Length)
Debug.Assert(from != null);
Debug.Assert(from != toExclusive);

BufferSegment[] segmentPool = _bufferSegmentPool;
lock (segmentPool)
{
_bufferSegmentPool[_pooledSegmentCount] = segment;
_pooledSegmentCount++;
int index = _pooledSegmentCount;
do
{
BufferSegment next = from.NextSegment;
from.ResetMemory();

if ((uint)index < (uint)segmentPool.Length)
{
segmentPool[index] = from;
index++;
}

from = next;
} while (from != toExclusive);

_pooledSegmentCount = index;
}
}

Expand Down Expand Up @@ -494,18 +517,15 @@ private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Buf
_readerAwaitable.SetUncompleted();
}

while (returnStart != null && returnStart != returnEnd)
{
BufferSegment next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
}

_operationState.EndRead();
}

TrySchedule(_writerScheduler, completionData);

if (returnStart != null && returnStart != returnEnd)
{
ReturnSegments(returnStart, returnEnd);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -735,6 +755,7 @@ private static void ExecuteWithExecutionContext(object state)

private void CompletePipe()
{
BufferSegment segment;
lock (_sync)
{
if (_disposed)
Expand All @@ -743,22 +764,20 @@ private void CompletePipe()
}

_disposed = true;
// Return all segments
// Get segments chain to return
// if _readHead is null we need to try return _commitHead
// because there might be a block allocated for writing
BufferSegment segment = _readHead ?? _readTail;
while (segment != null)
{
BufferSegment returnSegment = segment;
segment = segment.NextSegment;

returnSegment.ResetMemory();
}
segment = _readHead ?? _readTail;

_writingHead = null;
_readHead = null;
_readTail = null;
}

if (segment != null)
{
ReturnSegments(segment, toExclusive: null);
}
}

internal ValueTaskSourceStatus GetReadAsyncStatus()
Expand Down

0 comments on commit 5129e3a

Please sign in to comment.