Skip to content

Commit

Permalink
Add vparquet4 encoding (#3368)
Browse files Browse the repository at this point in the history
* Copy vparquet3 to vparquet4 folder

* Create initial vParquet4 encoding

* vParquet4: handle unsupported / dropped attributes (#3123)

* Count dropped attributes and store them in separate column

* Remove current support for arrays and kv-lists

* Store per-trace service statistics in vparquet4 blocks (#2941)

* Add support for attributes with array values (#3221)

* Implement single values as arrays and introduce type column

* Add support for array attributes

* Tests more array values and fix attribute conversion

* Use a combination of snappy and delta for ValueType

* Vendor module google/go-cmp/cmp/cmpopts

* Use all types of array attributes in TestTraceToParquet

* Improve TestFieldsAreCleared

* links and events schema changes (#3163)

* Add attributes to instrumentation scope (#3322)

* Fix typo in vParquet4 event name enconding (#3336)

* Precalculate and reuse the vParquet4 schema before opening blocks

* Convert block in vparquet4/test-data directory

* CHANGELOG.md

* Update test cases in BenchmarkBackendBlockTraceQL

* Unsupported attribute values are no longer dropped

* Replace ValueType column with IsArray column

* Skip service stats map allocation when none are present

---------

Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com>
Co-authored-by: Andreas Gerstmayr <agerstmayr@redhat.com>
Co-authored-by: Jennie Pham <94262131+ie-pham@users.noreply.github.com>
Co-authored-by: Andreas Gerstmayr <andreas@gerstmayr.me>
  • Loading branch information
4 people authored May 7, 2024
1 parent 174b428 commit fc33ae5
Show file tree
Hide file tree
Showing 64 changed files with 15,282 additions and 224 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Reduced memory consumption in the frontend for large traces. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* **Breaking Change** Remove trace by id hedging from the frontend. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* **Breaking Change** Dropped meta-tag for tenant from trace by id multitenant. [#3522](https://github.com/grafana/tempo/pull/3522) (@joe-elliott)
* [FEATURE] New block encoding vParquet4 with support for links, events, and arrays [#3368](https://github.com/grafana/tempo/pull/3368) (@stoewer @ie-pham @andreasgerstmayr)
* [CHANGE] Align metrics query time ranges to the step parameter [#3490](https://github.com/grafana/tempo/pull/3490) (@mdisibio)
* [CHANGE] Change the UID and GID of the `tempo` user to avoid root [#2265](https://github.com/grafana/tempo/pull/2265) (@zalegrala)
**BREAKING CHANGE** Ownership of /var/tempo is changing. Historyically this
Expand Down
791 changes: 581 additions & 210 deletions pkg/tempopb/tempo.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/tempopb/tempo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ message TraceSearchMetadata {
uint32 durationMs = 5;
SpanSet spanSet = 6; // deprecated. use SpanSets field below
repeated SpanSet spanSets = 7;
map<string, ServiceStats> serviceStats = 8;
}

message ServiceStats {
uint32 spanCount = 1;
uint32 errorCount = 2;
}

message SpanSet {
Expand Down
13 changes: 13 additions & 0 deletions pkg/traceql/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ func combineSearchResults(existing *tempopb.TraceSearchMetadata, incoming *tempo
existing.DurationMs = incoming.DurationMs
}

// Combine service stats
// It's possible to find multiple trace fragments that satisfy a TraceQL result,
// therefore we use max() to merge the ServiceStats.
for service, incomingStats := range incoming.ServiceStats {
existingStats, ok := existing.ServiceStats[service]
if !ok {
existingStats = &tempopb.ServiceStats{}
existing.ServiceStats[service] = existingStats
}
existingStats.SpanCount = max(existingStats.SpanCount, incomingStats.SpanCount)
existingStats.ErrorCount = max(existingStats.ErrorCount, incomingStats.ErrorCount)
}

// make a map of existing Spansets
existingSS := make(map[string]*tempopb.SpanSet)
for _, ss := range existing.SpanSets {
Expand Down
27 changes: 27 additions & 0 deletions pkg/traceql/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,33 @@ func TestCombineResults(t *testing.T) {
},
},
},
{
name: "merge ServiceStats",
existing: &tempopb.TraceSearchMetadata{
ServiceStats: map[string]*tempopb.ServiceStats{
"service1": {
SpanCount: 5,
ErrorCount: 1,
},
},
},
new: &tempopb.TraceSearchMetadata{
ServiceStats: map[string]*tempopb.ServiceStats{
"service1": {
SpanCount: 3,
ErrorCount: 2,
},
},
},
expected: &tempopb.TraceSearchMetadata{
ServiceStats: map[string]*tempopb.ServiceStats{
"service1": {
SpanCount: 5,
ErrorCount: 2,
},
},
},
},
}

for _, tc := range tcs {
Expand Down
8 changes: 8 additions & 0 deletions pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,17 @@ func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMet
RootTraceName: spanset.RootSpanName,
StartTimeUnixNano: spanset.StartTimeUnixNanos,
DurationMs: uint32(spanset.DurationNanos / 1_000_000),
ServiceStats: make(map[string]*tempopb.ServiceStats, len(spanset.ServiceStats)),
SpanSet: &tempopb.SpanSet{},
}

for service, stats := range spanset.ServiceStats {
metadata.ServiceStats[service] = &tempopb.ServiceStats{
SpanCount: stats.SpanCount,
ErrorCount: stats.ErrorCount,
}
}

for _, span := range spanset.Spans {
tempopbSpan := &tempopb.Span{
SpanID: util.SpanIDToHexString(span.ID()),
Expand Down
32 changes: 28 additions & 4 deletions pkg/traceql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TestEngine_Execute(t *testing.T) {
TraceID: []byte{1},
RootSpanName: "HTTP GET",
RootServiceName: "my-service",
ServiceStats: map[string]ServiceStats{
"my-service": {
SpanCount: 6,
ErrorCount: 0,
},
},
Spans: []Span{
&mockSpan{
id: []byte{1},
Expand Down Expand Up @@ -167,8 +173,14 @@ func TestEngine_Execute(t *testing.T) {
TraceID: "1",
RootServiceName: "my-service",
RootTraceName: "HTTP GET",
SpanSet: expectedSpanset,
SpanSets: []*tempopb.SpanSet{expectedSpanset},
ServiceStats: map[string]*tempopb.ServiceStats{
"my-service": {
SpanCount: 6,
ErrorCount: 0,
},
},
SpanSet: expectedSpanset,
SpanSets: []*tempopb.SpanSet{expectedSpanset},
},
}

Expand Down Expand Up @@ -205,6 +217,12 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
RootSpanName: "HTTP GET",
StartTimeUnixNanos: 1000,
DurationNanos: uint64(time.Second.Nanoseconds()),
ServiceStats: map[string]ServiceStats{
"service1": {
SpanCount: 2,
ErrorCount: 1,
},
},
Spans: []Span{
&mockSpan{
id: spanID1,
Expand Down Expand Up @@ -321,8 +339,14 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
RootTraceName: "HTTP GET",
StartTimeUnixNano: 1000,
DurationMs: uint32(time.Second.Milliseconds()),
SpanSet: expectedSpanset,
SpanSets: []*tempopb.SpanSet{expectedSpanset},
ServiceStats: map[string]*tempopb.ServiceStats{
"service1": {
SpanCount: 2,
ErrorCount: 1,
},
},
SpanSet: expectedSpanset,
SpanSets: []*tempopb.SpanSet{expectedSpanset},
}

// Ensure attributes are sorted to avoid a flaky test
Expand Down
2 changes: 2 additions & 0 deletions pkg/traceql/enum_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
IntrinsicTraceStartTime
IntrinsicSpanID
IntrinsicSpanStartTime

IntrinsicServiceStats
)

var (
Expand Down
7 changes: 7 additions & 0 deletions pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func SearchMetaConditions() []Condition {
{NewIntrinsic(IntrinsicSpanID), OpNone, nil},
{NewIntrinsic(IntrinsicSpanStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicDuration), OpNone, nil},
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
}
}

Expand Down Expand Up @@ -131,6 +132,11 @@ type SpansetAttribute struct {
Val Static
}

type ServiceStats struct {
SpanCount uint32
ErrorCount uint32
}

type Spanset struct {
// these fields are actually used by the engine to evaluate queries
Scalar Static
Expand All @@ -141,6 +147,7 @@ type Spanset struct {
RootServiceName string
StartTimeUnixNanos uint64
DurationNanos uint64
ServiceStats map[string]ServiceStats
Attributes []*SpansetAttribute

// Set this function to provide upstream callers with a method to
Expand Down
2 changes: 2 additions & 0 deletions pkg/traceql/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestMetaConditionsWithout(t *testing.T) {
{NewIntrinsic(IntrinsicTraceStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicSpanID), OpNone, nil},
{NewIntrinsic(IntrinsicSpanStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
},
},
{
Expand All @@ -83,6 +84,7 @@ func TestMetaConditionsWithout(t *testing.T) {
{NewIntrinsic(IntrinsicTraceStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicSpanID), OpNone, nil},
{NewIntrinsic(IntrinsicSpanStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion tempodb/encoding/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/tempo/tempodb/encoding/vparquet"
"github.com/grafana/tempo/tempodb/encoding/vparquet2"
"github.com/grafana/tempo/tempodb/encoding/vparquet3"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
)

// VersionedEncoding represents a backend block version, and the methods to
Expand Down Expand Up @@ -66,6 +67,8 @@ func FromVersion(v string) (VersionedEncoding, error) {
return vparquet2.Encoding{}, nil
case vparquet3.VersionString:
return vparquet3.Encoding{}, nil
case vparquet4.VersionString:
return vparquet4.Encoding{}, nil
default:
return nil, fmt.Errorf("%s is not a valid block version", v)
}
Expand All @@ -78,7 +81,7 @@ func DefaultEncoding() VersionedEncoding {

// LatestEncoding returns the most recent encoding.
func LatestEncoding() VersionedEncoding {
return vparquet3.Encoding{}
return vparquet4.Encoding{}
}

// AllEncodings returns all encodings
Expand All @@ -87,6 +90,7 @@ func AllEncodings() []VersionedEncoding {
v2.Encoding{},
vparquet2.Encoding{},
vparquet3.Encoding{},
vparquet4.Encoding{},
}
}

Expand Down
2 changes: 2 additions & 0 deletions tempodb/encoding/vparquet/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ var intrinsicColumnLookups = map[traceql.Intrinsic]struct {
traceql.IntrinsicTraceDuration: {intrinsicScopeTrace, traceql.TypeString, columnPathDurationNanos},
traceql.IntrinsicTraceID: {intrinsicScopeTrace, traceql.TypeDuration, columnPathTraceID},
traceql.IntrinsicTraceStartTime: {intrinsicScopeTrace, traceql.TypeDuration, columnPathStartTimeUnixNano},

traceql.IntrinsicServiceStats: {intrinsicScopeTrace, traceql.TypeNil, ""}, // Not used in vparquet, this entry is only used to assign default scope.
}

// Lookup table of all well-known attributes with dedicated columns
Expand Down
2 changes: 2 additions & 0 deletions tempodb/encoding/vparquet2/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ var intrinsicColumnLookups = map[traceql.Intrinsic]struct {
traceql.IntrinsicTraceDuration: {intrinsicScopeTrace, traceql.TypeString, columnPathDurationNanos},
traceql.IntrinsicTraceID: {intrinsicScopeTrace, traceql.TypeDuration, columnPathTraceID},
traceql.IntrinsicTraceStartTime: {intrinsicScopeTrace, traceql.TypeDuration, columnPathStartTimeUnixNano},

traceql.IntrinsicServiceStats: {intrinsicScopeTrace, traceql.TypeNil, ""}, // Not used in vparquet2, this entry is only used to assign default scope.
}

// Lookup table of all well-known attributes with dedicated columns
Expand Down
2 changes: 2 additions & 0 deletions tempodb/encoding/vparquet3/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ var intrinsicColumnLookups = map[traceql.Intrinsic]struct {
traceql.IntrinsicTraceDuration: {intrinsicScopeTrace, traceql.TypeString, columnPathDurationNanos},
traceql.IntrinsicTraceID: {intrinsicScopeTrace, traceql.TypeDuration, columnPathTraceID},
traceql.IntrinsicTraceStartTime: {intrinsicScopeTrace, traceql.TypeDuration, columnPathStartTimeUnixNano},

traceql.IntrinsicServiceStats: {intrinsicScopeTrace, traceql.TypeNil, ""}, // Not used in vparquet3, this entry is only used to assign default scope.
}

// Lookup table of all well-known attributes with dedicated columns
Expand Down
22 changes: 13 additions & 9 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,22 +568,26 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) {
query string
}{
// span
{"spanAttValNoMatch", "{ span.bloom = `bar` }"},
{"spanAttIntrinsicNoMatch", "{ name = `asdfasdf` }"},
{"spanAttValMatch", "{ span.component = `net/http` }"},
{"spanAttValNoMatch", "{ span.bloom = `does-not-exit-6c2408325a45` }"},
{"spanAttIntrinsicMatch", "{ name = `/cortex.Ingester/Push` }"},
{"spanAttIntrinsicNoMatch", "{ name = `does-not-exit-6c2408325a45` }"},

// resource
{"resourceAttValNoMatch", "{ resource.module.path = `bar` }"},
{"resourceAttIntrinsicMatch", "{ resource.service.name = `tempo-query-frontend` }"},
{"resourceAttValMatch", "{ resource.opencensus.exporterversion = `Jaeger-Go-2.30.0` }"},
{"resourceAttValNoMatch", "{ resource.module.path = `does-not-exit-6c2408325a45` }"},
{"resourceAttIntrinsicMatch", "{ resource.service.name = `tempo-gateway` }"},
{"resourceAttIntrinsicMatch", "{ resource.service.name = `does-not-exit-6c2408325a45` }"},

// mixed
{"mixedValNoMatch", "{ .bloom = `bar` }"},
{"mixedValNoMatch", "{ .bloom = `does-not-exit-6c2408325a45` }"},
{"mixedValMixedMatchAnd", "{ resource.foo = `bar` && name = `gcs.ReadRange` }"},
{"mixedValMixedMatchOr", "{ resource.foo = `bar` || name = `gcs.ReadRange` }"},

{"count", "{ } | count() > 1"},
{"struct", "{ resource.service.name != `loki-querier` } >> { resource.service.name = `loki-querier` && status = error }"},
{"||", "{ resource.service.name = `loki-querier` } || { resource.service.name = `loki-ingester` }"},
{"mixed", `{resource.namespace!="" && resource.service.name="loki-distributor" && duration>2s && resource.cluster=~"prod.*"}`},
{"struct", "{ resource.service.name != `loki-querier` } >> { resource.service.name = `loki-gateway` && status = error }"},
{"||", "{ resource.service.name = `loki-querier` } || { resource.service.name = `loki-gateway` }"},
{"mixed", `{resource.namespace!="" && resource.service.name="cortex-gateway" && duration>50ms && resource.cluster=~"prod.*"}`},
{"complex", `{resource.cluster=~"prod.*" && resource.namespace = "tempo-prod" && resource.container="query-frontend" && name = "HTTP GET - tempo_api_v2_search_tags" && span.http.status_code = 200 && duration > 1s}`},
}

Expand Down Expand Up @@ -728,7 +732,7 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {

for _, tc := range testCases {
b.Run(tc, func(b *testing.B) {
for _, minutes := range []int{7} {
for _, minutes := range []int{5, 7} {
b.Run(strconv.Itoa(minutes), func(b *testing.B) {
st := meta.StartTime
end := st.Add(time.Duration(minutes) * time.Minute)
Expand Down
32 changes: 32 additions & 0 deletions tempodb/encoding/vparquet4/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package vparquet4

import (
"sync"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
)

const (
DataFileName = "data.parquet"
)

type backendBlock struct {
meta *backend.BlockMeta
r backend.Reader

openMtx sync.Mutex
}

var _ common.BackendBlock = (*backendBlock)(nil)

func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock {
return &backendBlock{
meta: meta,
r: r,
}
}

func (b *backendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}
Loading

0 comments on commit fc33ae5

Please sign in to comment.