Skip to content

Commit

Permalink
[Traceql metrics] Better trace ID sharding (grafana#3399)
Browse files Browse the repository at this point in the history
* Extend trace ID shard to more bits, add test for quality of sharding on a real block

* test

* lint and cleanup
  • Loading branch information
mdisibio authored and kvrhdn committed Feb 26, 2024
1 parent 3d82387 commit 0d79a1e
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 37 deletions.
99 changes: 74 additions & 25 deletions pkg/util/traceidboundary/traceidboundary.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package traceidboundary

import (
"bytes"
"encoding/binary"

"github.com/grafana/tempo/pkg/blockboundary"
)

const defaultBitSharding = 12

type Boundary struct {
Min, Max []byte
}
Expand All @@ -20,32 +23,23 @@ type Boundary struct {
// - Trace IDs can be 16 or 8 bytes. If we naively sharded only in 16-byte space it would
// be unbalanced because all 8-byte IDs would land in the first shard. Therefore we
// divide in both 16- and 8-byte spaces and a single shard covers a range in each.
// - Technically 8-byte IDs are only 63 bits, so we account for this
// - The boundaries are inclusive/exclusive: [min, max), except the max of the last shard
// is the valid ID FFFFF... and inclusive/inclusive.
func Pairs(shard, of uint32) (boundaries []Boundary, upperInclusive bool) {
// First pair is 63-bit IDs left-padded with zeroes to make 16-byte divisions
// that matches the 16-byte layout in the block.
// To create 63-bit boundaries we create twice as many as needed,
// then only use the first half. i.e. shaving off the top-most bit.
int63bounds := blockboundary.CreateBlockBoundaries(int(of * 2))

// Adjust last boundary to be inclusive so it matches the other pair.
int63bounds[of] = []byte{0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
return PairsWithBitSharding(shard, of, defaultBitSharding)
}

boundaries = append(boundaries, Boundary{
Min: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, int63bounds[shard-1][0:8]...),
Max: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, int63bounds[shard][0:8]...),
})
// PairsWithBitSharding allows choosing a specific level of sub-sharding.
func PairsWithBitSharding(shard, of uint32, bits int) (boundaries []Boundary, upperInclusive bool) {
if bits > 0 {
boundaries = append(boundaries, complicatedShardingFor8ByteIDs(shard, of, bits)...)
}

// Second pair is normal full precision 16-byte IDs.
// Final pair is the normal full precision 16-byte IDs.
int128bounds := blockboundary.CreateBlockBoundaries(int(of))

// However there is one caveat - We adjust the very first boundary to ensure it doesn't
// overlap with the 63-bit precision ones. I.e. a minimum of 0x0000.... would
// unintentionally include all 63-bit IDs.
// The first 64-bit ID starts here:
int128bounds[0] = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0x80, 0, 0, 0, 0, 0, 0, 0}
if bits > 0 {
// Avoid overlap with the 64-bit precision ones. I.e. a minimum of 0x0000.... would
// unintentionally include all 64-bit IDs. The first 65-bit ID starts here:
int128bounds[0] = []byte{0, 0, 0, 0, 0, 0, 0, 0x01, 0, 0, 0, 0, 0, 0, 0, 0}
}

boundaries = append(boundaries, Boundary{
Min: int128bounds[shard-1],
Expand All @@ -60,7 +54,12 @@ func Pairs(shard, of uint32) (boundaries []Boundary, upperInclusive bool) {

// Funcs returns helper functions that match trace IDs in the given shard.
func Funcs(shard, of uint32) (testSingle func([]byte) bool, testRange func([]byte, []byte) bool) {
pairs, upperInclusive := Pairs(shard, of)
return FuncsWithBitSharding(shard, of, defaultBitSharding)
}

// FuncsWithBitSharding is like Funcs but allows choosing a specific level of sub-sharding.
func FuncsWithBitSharding(shard, of uint32, bits int) (testSingle func([]byte) bool, testRange func([]byte, []byte) bool) {
pairs, upperInclusive := PairsWithBitSharding(shard, of, bits)

upper := -1
if upperInclusive {
Expand All @@ -70,11 +69,9 @@ func Funcs(shard, of uint32) (testSingle func([]byte) bool, testRange func([]byt
isMatch := func(id []byte) bool {
for _, p := range pairs {
if bytes.Compare(p.Min, id) <= 0 && bytes.Compare(id, p.Max) <= upper {
// fmt.Printf("TraceID: %16X true\n", id)
return true
}
}
// fmt.Printf("TraceID: %16X false\n", id)
return false
}

Expand All @@ -89,3 +86,55 @@ func Funcs(shard, of uint32) (testSingle func([]byte) bool, testRange func([]byt

return isMatch, withinRange
}

// complicatedShardingFor8ByteIDs generates a list of trace ID boundaries that is subdividing
// the 64-bit space by the given number of bits. This seems like overkill but in practice
// 8-byte IDs are unevenly weighted towards lower values starting with zeros. The benefit of
// this approach is better fairness across shards, and also *invariance* across workloads,
// no matter if your instrumentation is generating 8-byte or 16-byte trace IDs.
func complicatedShardingFor8ByteIDs(shard, of uint32, bits int) []Boundary {
// This function takes a trace ID boundary and shifts it down to the
// same space by the given number of bits.
// For example shard 2 of 4 has the boundary:
// 0x40 b0100
// 0x80 b1000
// Shifting by 1 bit gives shard 2 of 4 in 64-bit-only space:
// |
// v
// 0xA0 b1010
// 0xC0 b1100
// Shifting by 2 bits gives shard 2 of 4 in 63-bit-only space:
// |
// v
// 0x50 b0101
// 0x60 b0110
// ... and so on
cloneRotateAndSet := func(v []byte, right int) []byte {
v2 := binary.BigEndian.Uint64(v)
v2 >>= right
v2 |= 0x01 << (64 - right)

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, v2)
return buf
}

var boundaries []Boundary
original := blockboundary.CreateBlockBoundaries(int(of))

for i := bits; i >= 1; i-- {
min := cloneRotateAndSet(original[shard-1], i)
max := cloneRotateAndSet(original[shard], i)

if i == bits && shard == 1 {
// We don't shard below this, so its minimum is absolute zero.
clear(min)
}
boundaries = append(boundaries, Boundary{
Min: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, min[0:8]...),
Max: append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, max[0:8]...),
})
}

return boundaries
}
63 changes: 51 additions & 12 deletions pkg/util/traceidboundary/traceidboundary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,70 @@ import (
func TestPairs(t *testing.T) {
testCases := []struct {
shard, of uint32
bits int
expectedPairs []Boundary
expectedUpper bool
}{
//---------------------------------------------
// Simplest case, no sub-sharding,
// 4 shards all at the top level.
//---------------------------------------------
{
1, 2,
[]Boundary{
1, 4, 0, []Boundary{
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Min 63-bit value
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x40, 0, 0, 0, 0, 0, 0, 0}, // Half of 63-bit space (exlusive)
[]byte{0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
[]byte{0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
}, false,
},
{
2, 4, 0, []Boundary{
{
[]byte{0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
}, false,
},
{
3, 4, 0, []Boundary{
{
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
[]byte{0xC0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
}, false,
},
{
4, 4, 0, []Boundary{
{
[]byte{0xC0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
[]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
},
}, true,
},

//---------------------------------------------
// Sub-sharding of 1 bit down into 64-bit IDs.
//---------------------------------------------
{
1, 2, 1, []Boundary{
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Min value overall is always zero
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0xC0, 0, 0, 0, 0, 0, 0, 0}, // Half of 64-bit space (exlusive)
},
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x80, 0, 0, 0, 0, 0, 0, 0}, // Min 64-bit value (not overlapping with max 63-bit value)
[]byte{0, 0, 0, 0, 0, 0, 0, 0x01, 0, 0, 0, 0, 0, 0, 0, 0}, // Min 65-bit value (not overlapping with max 64-bit value)
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Half of 128-bit space (exlusive)
},
},
false,
}, false,
},
{
2, 2, []Boundary{
2, 2, 1, []Boundary{
{
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x40, 0, 0, 0, 0, 0, 0, 0}, // Half of 63-bit space
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, // Max 63-bit space (inclusive)
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0xC0, 0, 0, 0, 0, 0, 0, 0}, // Half of 64-bit space
[]byte{0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, // Max 64-bit space (inclusive)
},
{
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Half of 128-bit space (not overlapping with max 63-bit value)
[]byte{0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // Half of 128-bit space (not overlapping with max 64-bit value)
[]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, // Max 128-bit value (inclusive)
},
}, true,
Expand All @@ -43,7 +82,7 @@ func TestPairs(t *testing.T) {

for _, tc := range testCases {
t.Run(fmt.Sprintf("%d of %d", tc.shard, tc.of), func(t *testing.T) {
pairs, upper := Pairs(tc.shard, tc.of)
pairs, upper := PairsWithBitSharding(tc.shard, tc.of, tc.bits)
require.Equal(t, tc.expectedPairs, pairs)
require.Equal(t, tc.expectedUpper, upper)
})
Expand Down
107 changes: 107 additions & 0 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"os"
"path"
"strconv"
"testing"
Expand All @@ -19,6 +21,7 @@ import (
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/traceqlmetrics"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/pkg/util/traceidboundary"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand Down Expand Up @@ -756,3 +759,107 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
})
}
}

func TestTraceIDShardingQuality(t *testing.T) {
// Use debug=1 go test -v -run=TestTraceIDShardingQuality
if os.Getenv("debug") != "1" {
t.Skip()
}

var (
ctx = context.TODO()
opts = common.DefaultSearchOptions()
tenantID = "1"
// blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
blockID = uuid.MustParse("18364616-f80d-45a6-b2a3-cb63e203edff")
path = "/Users/marty/src/tmp/"
)

r, _, _, err := local.New(&local.Config{
Path: path,
})
require.NoError(t, err)

rr := backend.NewReader(r)
meta, err := rr.BlockMeta(ctx, blockID, tenantID)
require.NoError(t, err)
require.Equal(t, VersionString, meta.Version)

block := newBackendBlock(meta, rr)
_, _, err = block.openForSearch(ctx, opts)
require.NoError(t, err)

fetchReq := traceql.FetchSpansRequest{
AllConditions: true,
Conditions: []traceql.Condition{
{Attribute: traceql.IntrinsicTraceIDAttribute},
},
}

f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return block.Fetch(ctx, req, opts)
})

shardsToTest := []int{2, 5, 10 /*, 100, 500, 1000, 2000*/}
bitsToTest := []int{4, 8, 12 /*16, 20, 24 /*28, 32*/}

for _, shards := range shardsToTest {
t.Run(strconv.Itoa(shards), func(t *testing.T) {
for _, bits := range bitsToTest {
t.Run(strconv.Itoa(bits), func(t *testing.T) {
// Create all shard-ownership functions
counts := make([]int, shards)
funcs := make([]func([]byte) bool, shards)
for s := 1; s <= shards; s++ {
funcs[s-1], _ = traceidboundary.FuncsWithBitSharding(uint32(s), uint32(shards), bits)
}

resp, err := f.Fetch(ctx, fetchReq)
require.NoError(t, err)
defer resp.Results.Close()

for {
ss, err := resp.Results.Next(ctx)
require.NoError(t, err)

if ss == nil {
break
}

// Match the trace ID against every shard
matches := 0
for i := 0; i < shards; i++ {
if funcs[i](ss.TraceID) {
counts[i]++
matches++
// fmt.Println("TraceID:", ss.TraceID, "is bucket", i)
}
}

if matches > 1 {
fmt.Println("TraceID matched mutiple shards:", ss.TraceID)
panic("trace matched multiple shards")
} else if matches == 0 {
fmt.Println("TraceID not matched by any shard:", ss.TraceID)
panic("trace id not matched by any shard")
}

ss.Release()
}

fmt.Println(counts)

sum := 0
minv := math.MaxInt
maxv := math.MinInt
for _, v := range counts {
sum += v
minv = min(minv, v)
maxv = max(maxv, v)
}
fmt.Printf("Shards:%d Bits:%d Total:%d spans Min:%d Max:%d Quality:%.1f %%\n", shards, bits, sum, minv, maxv, float64(minv)/float64(maxv)*100.0)
})
}
})
}
}

0 comments on commit 0d79a1e

Please sign in to comment.