Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL Metrics] Parser and engine #3227

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7,338 changes: 4,700 additions & 2,638 deletions pkg/tempopb/tempo.pb.go

Large diffs are not rendered by default.

123 changes: 95 additions & 28 deletions pkg/tempopb/tempo.proto
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
syntax="proto3";
syntax = "proto3";

package tempopb;

import "trace/v1/trace.proto";
import "common/v1/common.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "trace/v1/trace.proto";

service Pusher {
// different versions of PushBytes expect the trace data to be pushed in different formats
rpc PushBytes(PushBytesRequest) returns (PushResponse) {}; // ./pkg/model/v1
rpc PushBytesV2(PushBytesRequest) returns (PushResponse) {}; // ./pkg/model/v2
// different versions of PushBytes expect the trace data to be pushed in
// different formats
rpc PushBytes(PushBytesRequest) returns (PushResponse) {}
rpc PushBytesV2(PushBytesRequest) returns (PushResponse) {}
}

service MetricsGenerator {
rpc PushSpans(PushSpansRequest) returns (PushResponse) {};
rpc GetMetrics(SpanMetricsRequest) returns (SpanMetricsResponse) {};
rpc PushSpans(PushSpansRequest) returns (PushResponse) {}
rpc GetMetrics(SpanMetricsRequest) returns (SpanMetricsResponse) {}
}

service Querier {
rpc FindTraceByID(TraceByIDRequest) returns (TraceByIDResponse) {};
rpc SearchRecent(SearchRequest) returns (SearchResponse) {};
rpc SearchBlock(SearchBlockRequest) returns (SearchResponse) {};
rpc SearchTags(SearchTagsRequest) returns (SearchTagsResponse) {};
rpc SearchTagsV2(SearchTagsRequest) returns (SearchTagsV2Response) {};
rpc SearchTagValues(SearchTagValuesRequest) returns (SearchTagValuesResponse) {};
rpc SearchTagValuesV2(SearchTagValuesRequest) returns (SearchTagValuesV2Response) {};
// rpc SpanMetricsSummary(SpanMetricsSummaryRequest) returns (SpanMetricsSummaryResponse) {};
rpc FindTraceByID(TraceByIDRequest) returns (TraceByIDResponse) {}
rpc SearchRecent(SearchRequest) returns (SearchResponse) {}
rpc SearchBlock(SearchBlockRequest) returns (SearchResponse) {}
rpc SearchTags(SearchTagsRequest) returns (SearchTagsResponse) {}
rpc SearchTagsV2(SearchTagsRequest) returns (SearchTagsV2Response) {}
rpc SearchTagValues(SearchTagValuesRequest) returns (SearchTagValuesResponse) {}
rpc SearchTagValuesV2(SearchTagValuesRequest) returns (SearchTagValuesV2Response) {}
// rpc SpanMetricsSummary(SpanMetricsSummaryRequest) returns
// (SpanMetricsSummaryResponse) {};
}

service StreamingQuerier {
Expand All @@ -34,6 +36,7 @@ service StreamingQuerier {

service Metrics {
rpc SpanMetricsSummary(SpanMetricsSummaryRequest) returns (SpanMetricsSummaryResponse) {}
rpc QueryRange(QueryRangeRequest) returns (QueryRangeResponse) {}
}

// Read
Expand All @@ -49,8 +52,7 @@ message TraceByIDResponse {
TraceByIDMetrics metrics = 2;
}

message TraceByIDMetrics {
}
message TraceByIDMetrics {}

// SearchRequest takes no block parameters and implies a "recent traces" search
message SearchRequest {
Expand All @@ -66,8 +68,8 @@ message SearchRequest {
uint32 SpansPerSpanSet = 9;
}

// SearchBlockRequest takes SearchRequest parameters as well as all information necessary
// to search a block in the backend.
// SearchBlockRequest takes SearchRequest parameters as well as all information
// necessary to search a block in the backend.
message SearchBlockRequest {
SearchRequest searchReq = 1;
string blockID = 2;
Expand Down Expand Up @@ -175,18 +177,27 @@ message Trace {
}

// Write
message PushResponse {
}
message PushResponse {}

// PushBytesRequest pushes slices of traces, ids and searchdata. Traces are encoded using the
// PushBytesRequest pushes slices of traces, ids and searchdata. Traces are
// encoded using the
// current BatchDecoder in ./pkg/model
message PushBytesRequest {
// pre-marshalled Traces. length must match ids
repeated bytes traces = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"];
repeated bytes traces = 2 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "PreallocBytes"
];
// trace ids. length must match traces
repeated bytes ids = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"];
repeated bytes ids = 3 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "PreallocBytes"
];
// search data, length must match traces
repeated bytes searchData = 4 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"];
repeated bytes searchData = 4 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "PreallocBytes"
];
}

message PushSpansRequest {
Expand All @@ -199,9 +210,10 @@ message TraceBytes {
repeated bytes traces = 1;
}

// this message exists for marshalling/unmarshalling convenience to/from parquet. in parquet we proto encode
// links to a column. unfortunately you can't encode a slice directly so we use this wrapper to generate
// the required marshalling/unmarshalling functions.
// this message exists for marshalling/unmarshalling convenience to/from
// parquet. in parquet we proto encode links to a column. unfortunately you
// can't encode a slice directly so we use this wrapper to generate the required
// marshalling/unmarshalling functions.
message LinkSlice {
repeated tempopb.trace.v1.Span.Link links = 1;
}
Expand Down Expand Up @@ -239,6 +251,11 @@ message KeyValue {
TraceQLStatic value = 2;
}

message RawExemplar {
bytes traceID = 1; // traceID
uint64 val = 2; // duration ns
}

message SpanMetrics {
repeated RawHistogram latency_histogram = 1;
repeated KeyValue series = 2;
Expand Down Expand Up @@ -269,3 +286,53 @@ message TraceQLStatic {
int32 status = 7;
int32 kind = 8;
}

message SpanMetricsData {
string resultType = 1;
repeated SpanMetricsResult result = 2;
}

message SpanMetricsResult {
string labelName = 1; // if these are empty it is the primary trend
string labelValue = 2;
repeated SpanMetricsResultPoint ts = 3;
}

message SpanMetricsResultPoint {
uint32 time = 1;
double val = 2;
bytes exemplarTraceID = 3;
uint64 exemplarDuration = 4;
}

message QueryRangeRequest {
string query = 1;
uint64 start = 2;
uint64 end = 3;
uint64 step = 4;
uint32 shard = 5;
uint32 of = 6;
}

message QueryRangeResponse {
repeated TimeSeries series = 1;
}

message Sample {
// Fields order MUST match promql.FPoint so that we can cast types between them.
int64 timestamp_ms = 2;
double value = 1;
}

// https : // github.com/grafana/mimir/blob/main/pkg/mimirpb/mimir.proto#L53
message TimeSeries {
// repeated LabelPair labels = 1 [(gogoproto.nullable) = false,
// (gogoproto.customtype) = "LabelAdapter"];
repeated tempopb.common.v1.KeyValue labels = 1 [(gogoproto.nullable) = false];
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
// repeated Exemplar exemplars = 3 [ (gogoproto.nullable) = false ];
// repeated Histogram histograms = 4 [ (gogoproto.nullable) = false ];
// TODO: review the LabelAdapter and migrate the use of this string
string prom_labels = 3;
}
98 changes: 94 additions & 4 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ import (
"math"
"regexp"
"time"

"github.com/grafana/tempo/pkg/tempopb"
)

type Element interface {
fmt.Stringer
validate() error
}

type metricsFirstStageElement interface {
Element
extractConditions(request *FetchSpansRequest)
init(*tempopb.QueryRangeRequest)
observe(Span) // TODO - batching?
result() SeriesSet
}

type pipelineElement interface {
Element
extractConditions(request *FetchSpansRequest)
Expand All @@ -23,7 +33,8 @@ type typedExpression interface {
}

type RootExpr struct {
Pipeline Pipeline
Pipeline Pipeline
MetricsPipeline metricsFirstStageElement
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the metrics pipeline split from the spanset filter pipeline isn't ideal, but best for now. We send the eval callback of the spanset pipeline into the storage SecondPass, and the metrics pipeline has a different signature (output is time-series). Long-term this break needs to be closed and end up with one pipeline, but this current form will carry the functionality a long way. This will work until we add cross time-series arithmetic like:

  ({a} | rate()) / 
  ({b} | rate())

}

func newRootExpr(e pipelineElement) *RootExpr {
Expand All @@ -37,6 +48,18 @@ func newRootExpr(e pipelineElement) *RootExpr {
}
}

func newRootExprWithMetrics(e pipelineElement, m metricsFirstStageElement) *RootExpr {
p, ok := e.(Pipeline)
if !ok {
p = newPipeline(e)
}

return &RootExpr{
Pipeline: p,
MetricsPipeline: m,
}
}

// **********************
// Pipeline
// **********************
Expand Down Expand Up @@ -130,12 +153,12 @@ func (o CoalesceOperation) extractConditions(*FetchSpansRequest) {
}

type SelectOperation struct {
exprs []FieldExpression
attrs []Attribute
}

func newSelectOperation(exprs []FieldExpression) SelectOperation {
func newSelectOperation(exprs []Attribute) SelectOperation {
return SelectOperation{
exprs: exprs,
attrs: exprs,
}
}

Expand Down Expand Up @@ -718,3 +741,70 @@ var (
_ pipelineElement = (*ScalarFilter)(nil)
_ pipelineElement = (*GroupOperation)(nil)
)

type MetricsAggregate struct {
op MetricsAggregateOp
by []Attribute
agg SpanAggregator
}

func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: agg,
by: by,
}
}

func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
switch a.op {
case metricsAggregateRate, metricsAggregateCountOverTime:
// No extra conditions, start time is already enough
}

selectR := &FetchSpansRequest{}
// copy any conditions to the normal request's SecondPassConditions
for _, b := range a.by {
b.extractConditions(selectR)
}
request.SecondPassConditions = append(request.SecondPassConditions, selectR.Conditions...)
}

func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest) {
var innerAgg func() VectorAggregator

switch a.op {
case metricsAggregateCountOverTime:
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
}

a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
}, a.by)
}

func (a *MetricsAggregate) observe(span Span) {
a.agg.Observe(span)
}

func (a *MetricsAggregate) result() SeriesSet {
return a.agg.Series()
}

func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateRate:
default:
return newUnsupportedError(fmt.Sprintf("metrics aggregate operation (%v)", a.op))
}

if len(a.by) > maxGroupBys {
return newUnsupportedError(fmt.Sprintf("metrics group by %v values", len(a.by)))
}

return nil
}

var _ metricsFirstStageElement = (*MetricsAggregate)(nil)
9 changes: 8 additions & 1 deletion pkg/traceql/ast_conditions.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package traceql

func (r RootExpr) extractConditions(request *FetchSpansRequest) {
r.Pipeline.extractConditions(request)
if r.MetricsPipeline != nil {
r.MetricsPipeline.extractConditions(request)
}
}

func (f SpansetFilter) extractConditions(request *FetchSpansRequest) {
f.Expression.extractConditions(request)

Expand All @@ -24,7 +31,7 @@ func (f SpansetFilter) extractConditions(request *FetchSpansRequest) {
// extractConditions on Select puts its conditions into the SecondPassConditions
func (o SelectOperation) extractConditions(request *FetchSpansRequest) {
selectR := &FetchSpansRequest{}
for _, expr := range o.exprs {
for _, expr := range o.attrs {
expr.extractConditions(selectR)
}
// copy any conditions to the normal request's SecondPassConditions
Expand Down
10 changes: 7 additions & 3 deletions pkg/traceql/ast_stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (o CoalesceOperation) String() string {
}

func (o SelectOperation) String() string {
s := make([]string, 0, len(o.exprs))
for _, e := range o.exprs {
s := make([]string, 0, len(o.attrs))
for _, e := range o.attrs {
s = append(s, e.String())
}
return "select(" + strings.Join(s, ", ") + ")"
Expand Down Expand Up @@ -118,13 +118,17 @@ func (a Attribute) String() string {
}

// Top-level attributes get a "." but top-level intrinsics don't
if scope == "" && a.Intrinsic == IntrinsicNone {
if scope == "" && a.Intrinsic == IntrinsicNone && len(att) > 0 {
scope += "."
}

return scope + att
}

func (a MetricsAggregate) String() string {
return a.op.String()
}

func binaryOp(op Operator, lhs Element, rhs Element) string {
return wrapElement(lhs) + " " + op.String() + " " + wrapElement(rhs)
}
Expand Down
Loading
Loading