Skip to content

Commit

Permalink
Exponential Bucket Histogram - part 5 (#3482)
Browse files Browse the repository at this point in the history
  • Loading branch information
reyang authored Jul 25, 2022
1 parent a55e339 commit 6f2b1a0
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 15 deletions.
31 changes: 31 additions & 0 deletions src/OpenTelemetry/Internal/MathHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// </copyright>

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace OpenTelemetry.Internal;
Expand Down Expand Up @@ -96,6 +97,36 @@ public static int LeadingZero64(long value)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int PositiveModulo32(int value, int divisor)
{
Debug.Assert(divisor > 0, $"{nameof(divisor)} must be a positive integer.");

value %= divisor;

if (value < 0)
{
value += divisor;
}

return value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static long PositiveModulo64(long value, long divisor)
{
Debug.Assert(divisor > 0, $"{nameof(divisor)} must be a positive integer.");

value %= divisor;

if (value < 0)
{
value += divisor;
}

return value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool IsFinite(double value)
{
Expand Down
155 changes: 143 additions & 12 deletions src/OpenTelemetry/Metrics/CircularBufferBuckets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
// </copyright>

using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

Expand All @@ -40,6 +41,11 @@ public CircularBufferBuckets(int capacity)
/// </summary>
public int Capacity { get; }

/// <summary>
/// Gets the offset of the start index for the <see cref="CircularBufferBuckets"/>.
/// </summary>
public int Offset => this.begin;

/// <summary>
/// Gets the size of the <see cref="CircularBufferBuckets"/>.
/// </summary>
Expand All @@ -60,20 +66,20 @@ public long this[int index]
}

/// <summary>
/// Attempts to increment the value of <c>Bucket[index]</c>.
/// Attempts to increment the value of <c>Bucket[index]</c> by <c>value</c>.
/// </summary>
/// <param name="index">The index of the bucket.</param>
/// <param name="value">The increment.</param>
/// <returns>
/// Returns <c>0</c> if the increment attempt succeeded;
/// Returns a positive integer <c>Math.Ceiling(log_2(X))</c> if the
/// underlying buffer is running out of capacity, and the buffer has to
/// increase to <c>X * Capacity</c> at minimum.
/// Returns a positive integer indicating the minimum scale reduction level
/// if the increment attempt failed.
/// </returns>
/// <remarks>
/// The "index" value can be positive, zero or negative.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int TryIncrement(int index)
public int TryIncrement(int index, long value = 1)
{
var capacity = this.Capacity;

Expand Down Expand Up @@ -107,7 +113,7 @@ public int TryIncrement(int index)
this.begin = index;
}

this.trait[this.ModuloIndex(index)] += 1;
this.trait[this.ModuloIndex(index)] += value;

return 0;

Expand All @@ -130,16 +136,141 @@ static int CalculateScaleReduction(int size, int capacity)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int ModuloIndex(int value)
public void ScaleDown(int level = 1)
{
value %= this.Capacity;
Debug.Assert(level > 0, "The scale down level must be a positive integer.");

if (this.trait == null)
{
return;
}

// 0 <= offset < capacity <= 2147483647
uint capacity = (uint)this.Capacity;
var offset = (uint)this.ModuloIndex(this.begin);

var currentBegin = this.begin;
var currentEnd = this.end;

for (int i = 0; i < level; i++)
{
var newBegin = currentBegin >> 1;
var newEnd = currentEnd >> 1;

if (currentBegin != currentEnd)
{
if (currentBegin % 2 == 0)
{
ScaleDownInternal(this.trait, offset, currentBegin, currentEnd, capacity);
}
else
{
currentBegin++;

if (currentBegin != currentEnd)
{
ScaleDownInternal(this.trait, offset + 1, currentBegin, currentEnd, capacity);
}
}
}

currentBegin = newBegin;
currentEnd = newEnd;
}

this.begin = currentBegin;
this.end = currentEnd;

if (capacity > 1)
{
AdjustPosition(this.trait, offset, (uint)this.ModuloIndex(currentBegin), (uint)(currentEnd - currentBegin + 1), capacity);
}

return;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void ScaleDownInternal(long[] array, uint offset, int begin, int end, uint capacity)
{
for (var index = begin + 1; index < end; index++)
{
Consolidate(array, (offset + (uint)(index - begin)) % capacity, (offset + (uint)((index >> 1) - (begin >> 1))) % capacity);
}

Consolidate(array, (offset + (uint)(end - begin)) % capacity, (offset + (uint)((end >> 1) - (begin >> 1))) % capacity);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void AdjustPosition(long[] array, uint src, uint dst, uint size, uint capacity)
{
var advancement = (dst + capacity - src) % capacity;

if (advancement == 0)
{
return;
}

if (size - 1 == advancement && advancement << 1 == capacity)
{
Exchange(array, src++, dst++);
size -= 2;
}
else if (advancement < size)
{
src = src + size - 1;
dst = dst + size - 1;

while (size-- != 0)
{
Move(array, src-- % capacity, dst-- % capacity);
}

return;
}

while (size-- != 0)
{
Move(array, src++ % capacity, dst++ % capacity);
}
}

if (value < 0)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void Consolidate(long[] array, uint src, uint dst)
{
array[dst] += array[src];
array[src] = 0;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void Exchange(long[] array, uint src, uint dst)
{
var value = array[dst];
array[dst] = array[src];
array[src] = value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void Move(long[] array, uint src, uint dst)
{
value += this.Capacity;
array[dst] = array[src];
array[src] = 0;
}
}

public override string ToString()
{
return nameof(CircularBufferBuckets)
+ "{"
+ nameof(this.Capacity) + "=" + this.Capacity + ", "
+ nameof(this.Size) + "=" + this.Size + ", "
+ nameof(this.begin) + "=" + this.begin + ", "
+ nameof(this.end) + "=" + this.end + ", "
+ (this.trait == null ? "null" : "{" + string.Join(", ", this.trait) + "}")
+ "}";
}

return value;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int ModuloIndex(int value)
{
return MathHelper.PositiveModulo32(value, this.Capacity);
}
}
22 changes: 20 additions & 2 deletions src/OpenTelemetry/Metrics/ExponentialBucketHistogram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,29 @@ public void Record(double value)

if (c > 0)
{
this.PositiveBuckets.TryIncrement(this.MapToIndex(value));
var index = this.MapToIndex(value);
var n = this.PositiveBuckets.TryIncrement(index);

if (n != 0)
{
this.PositiveBuckets.ScaleDown(n);
this.NegativeBuckets.ScaleDown(n);
n = this.PositiveBuckets.TryIncrement(index);
Debug.Assert(n == 0, "Increment should always succeed after scale down.");
}
}
else if (c < 0)
{
this.NegativeBuckets.TryIncrement(this.MapToIndex(-value));
var index = this.MapToIndex(-value);
var n = this.NegativeBuckets.TryIncrement(index);

if (n != 0)
{
this.PositiveBuckets.ScaleDown(n);
this.NegativeBuckets.ScaleDown(n);
n = this.NegativeBuckets.TryIncrement(index);
Debug.Assert(n == 0, "Increment should always succeed after scale down.");
}
}
else
{
Expand Down
Loading

0 comments on commit 6f2b1a0

Please sign in to comment.