Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query-frontend: Parrallelize unmarshalling of responses #3713

Merged
merged 13 commits into from
Jun 4, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
* [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
* [BUGFIX] max_global_traces_per_user: take into account ingestion.tenant_shard_size when converting to local limit [#3618](https://github.com/grafana/tempo/pull/3618) (@kvrhdn)

Expand Down
5 changes: 5 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ query_frontend:
# (default: 2)
[max_retries: <int>]

# The number of goroutines dedicated to consuming, unmarshalling and recombining responses per request. This
# same parameter is used for all endpoints.
# (default: 10)
[response_consumers: <int>]

# Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.
# (default: 2000)
[max_outstanding_per_tenant: <int>]
Expand Down
8 changes: 7 additions & 1 deletion integration/e2e/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,13 @@ func TestQueryRateLimits(t *testing.T) {
})
require.NoError(t, err)

_, err = resp.Recv()
// loop until we get io.EOF or an error
for {
_, err = resp.Recv()
if err != nil {
break
}
}
require.ErrorContains(t, err, "job queue full")
require.ErrorContains(t, err, "code = ResourceExhausted")
}
50 changes: 34 additions & 16 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type genericCombiner[T TResponse] struct {

// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
c.mu.Lock()
defer c.mu.Unlock()

res := r.HTTPResponse()
if res == nil {
return nil
Expand All @@ -52,21 +49,33 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
// todo: reevaluate this. should the caller owner the lifecycle of the http.response body?
defer func() { _ = res.Body.Close() }()

if c.shouldQuit() {
return nil
}
// test shouldQuit and set response all under the same lock. this prevents race conditions where
// two responses can make it pass shouldQuit() with different results.
shouldQuitAndSetResponse := func() (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

c.httpStatusCode = res.StatusCode
if res.StatusCode != http.StatusOK {
bytesMsg, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("error reading response body: %w", err)
if c.shouldQuit() {
return true, nil
}
c.httpRespBody = string(bytesMsg)
c.httpStatusCode = res.StatusCode
// don't return error. the error path is reserved for unexpected errors.
// http pipeline errors should be returned through the final response. (Complete/TypedComplete/TypedDiff)
return nil

if res.StatusCode != http.StatusOK {
bytesMsg, err := io.ReadAll(res.Body)
if err != nil {
return true, fmt.Errorf("error reading response body: %w", err)
}
c.httpRespBody = string(bytesMsg)
c.httpStatusCode = res.StatusCode
// don't return error. the error path is reserved for unexpected errors.
// http pipeline errors should be returned through the final response. (Complete/TypedComplete/TypedDiff)
return true, nil
}

return false, nil
}

if quit, err := shouldQuitAndSetResponse(); quit {
return err
}

partial := c.new() // instantiating directly requires additional type constraints. this seemed cleaner: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
Expand All @@ -87,6 +96,15 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
}
}

c.mu.Lock()
defer c.mu.Unlock()

// test again for should quit. it's possible that another response came in while we were unmarshalling that would make us quit.
if c.shouldQuit() {
return nil
}

c.httpStatusCode = res.StatusCode
if err := c.combine(partial, c.current, r); err != nil {
c.httpRespBody = internalErrorMsg
return fmt.Errorf("error combining in combiner: %w", err)
Expand Down
173 changes: 173 additions & 0 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package combiner
import (
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -79,3 +83,172 @@ func TestErroredResponse(t *testing.T) {
})
}
}

func TestGenericCombiner(t *testing.T) {
combiner := newTestCombiner()
wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {

err := combiner.AddResponse(newTestResponse(t))
require.NoError(t, err)
}
}()
}

wg.Wait()
actual, err := combiner.finalize(nil)
require.NoError(t, err)

expected := 10 * 10000
require.Equal(t, expected, int(actual.SpanCount))
require.Equal(t, expected, int(actual.ErrorCount))
}

func TestGenericCombinerHoldsErrors(t *testing.T) {
// slam a combiner with successful responses and just one error. confirm that the error is returned
combiner := newTestCombiner()
wg := sync.WaitGroup{}

for j := 0; j < 10; j++ {
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10000; i++ {
err := combiner.AddResponse(newTestResponse(t))
require.NoError(t, err)
}
}()
}

wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond)
err := combiner.AddResponse(newFailedTestResponse())
require.NoError(t, err)
}()

wg.Wait()
resp, err := combiner.HTTPFinal()
require.NoError(t, err)
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
}

func TestGenericCombinerDoesntRace(t *testing.T) {
combiner := newTestCombiner()
end := make(chan struct{})

concurrent := func(f func()) {
for {
select {
case <-end:
return
default:
f()
}
}
}
go concurrent(func() {
_ = combiner.AddResponse(newTestResponse(t))
})

go concurrent(func() {
// this test is going to add a failed response which cuts off certain code paths. just wait a bit to test the other paths
time.Sleep(10 * time.Millisecond)
_ = combiner.AddResponse(newFailedTestResponse())
})

go concurrent(func() {
combiner.ShouldQuit()
})

go concurrent(func() {
combiner.StatusCode()
})

go concurrent(func() {
_, _ = combiner.HTTPFinal()
})

go concurrent(func() {
_, _ = combiner.GRPCFinal()
})

go concurrent(func() {
_, _ = combiner.GRPCDiff()
})

time.Sleep(100 * time.Millisecond)
close(end)
}

type testPipelineResponse struct {
r *http.Response
}

func newTestResponse(t *testing.T) *testPipelineResponse {
serviceStats := &tempopb.ServiceStats{
SpanCount: 1,
ErrorCount: 1,
}

rec := httptest.NewRecorder()
err := (&jsonpb.Marshaler{}).Marshal(rec, serviceStats)
require.NoError(t, err)

return &testPipelineResponse{
r: rec.Result(),
}
}

func newFailedTestResponse() *testPipelineResponse {
rec := httptest.NewRecorder()
rec.WriteHeader(http.StatusInternalServerError)

return &testPipelineResponse{
r: rec.Result(),
}
}

func (p *testPipelineResponse) HTTPResponse() *http.Response {
return p.r
}

func (p *testPipelineResponse) AdditionalData() any {
return nil
}

func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {
count := 0

return &genericCombiner[*tempopb.ServiceStats]{
httpStatusCode: 200,
new: func() *tempopb.ServiceStats { return &tempopb.ServiceStats{} },
current: nil,
combine: func(_, _ *tempopb.ServiceStats, _ PipelineResponse) error {
count++
return nil
},
finalize: func(_ *tempopb.ServiceStats) (*tempopb.ServiceStats, error) {
return &tempopb.ServiceStats{
SpanCount: uint32(count),
ErrorCount: uint32(count),
}, nil
},
quit: func(_ *tempopb.ServiceStats) bool {
return false
},
diff: func(_ *tempopb.ServiceStats) (*tempopb.ServiceStats, error) {
return &tempopb.ServiceStats{
SpanCount: uint32(count),
ErrorCount: uint32(count),
}, nil
},
}
}
5 changes: 3 additions & 2 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
d := util.NewDistinctValueCollector(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })

return &genericCombiner[*tempopb.SearchTagValuesV2Response]{
current: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{}},
new: func() *tempopb.SearchTagValuesV2Response { return &tempopb.SearchTagValuesV2Response{} },
httpStatusCode: 200,
current: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{}},
new: func() *tempopb.SearchTagValuesV2Response { return &tempopb.SearchTagValuesV2Response{} },
combine: func(partial, final *tempopb.SearchTagValuesV2Response, _ PipelineResponse) error {
for _, v := range partial.TagValues {
d.Collect(*v)
Expand Down
2 changes: 2 additions & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
ResponseConsumers int `yaml:"response_consumers"`

// the maximum time limit that tempo will work on an api request. this includes both
// grpc and http requests and applies to all "api" frontend query endpoints such as
Expand Down Expand Up @@ -61,6 +62,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
cfg.Config.MaxOutstandingPerTenant = 2000
cfg.Config.MaxBatchSize = 5
cfg.MaxRetries = 2
cfg.ResponseConsumers = 10
cfg.Search = SearchConfig{
Sharder: SearchSharderConfig{
QueryBackendAfter: 15 * time.Minute,
Expand Down
16 changes: 8 additions & 8 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo

traces := newTraceIDHandler(cfg, o, tracePipeline, logger)
search := newSearchHTTPHandler(cfg, searchPipeline, logger)
searchTags := newTagHTTPHandler(searchTagsPipeline, o, combiner.NewSearchTags, logger)
searchTagsV2 := newTagHTTPHandler(searchTagsPipeline, o, combiner.NewSearchTagsV2, logger)
searchTagValues := newTagHTTPHandler(searchTagValuesPipeline, o, combiner.NewSearchTagValues, logger)
searchTagValuesV2 := newTagHTTPHandler(searchTagValuesPipeline, o, combiner.NewSearchTagValuesV2, logger)
searchTags := newTagHTTPHandler(cfg, searchTagsPipeline, o, combiner.NewSearchTags, logger)
searchTagsV2 := newTagHTTPHandler(cfg, searchTagsPipeline, o, combiner.NewSearchTagsV2, logger)
searchTagValues := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValues, logger)
searchTagValuesV2 := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValuesV2, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryrange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)

Expand All @@ -155,10 +155,10 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo

// grpc/streaming
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger),
streamingTags: newTagStreamingGRPCHandler(searchTagsPipeline, apiPrefix, o, logger),
streamingTagsV2: newTagV2StreamingGRPCHandler(searchTagsPipeline, apiPrefix, o, logger),
streamingTagValues: newTagValuesStreamingGRPCHandler(searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(searchTagValuesPipeline, apiPrefix, o, logger),
streamingTags: newTagStreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagsV2: newTagV2StreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagValues: newTagValuesStreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingQueryRange: newQueryRangeStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger),

cacheProvider: cacheProvider,
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
return err
}

collector := pipeline.NewGRPCCollector(next, c, func(qrr *tempopb.QueryRangeResponse) error {
collector := pipeline.NewGRPCCollector(next, cfg.ResponseConsumers, c, func(qrr *tempopb.QueryRangeResponse) error {
finalResponse = qrr // sadly we can't pass srv.Send directly into the collector. we need bytesProcessed for the SLO calculations
return srv.Send(qrr)
})
Expand Down Expand Up @@ -92,7 +92,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}
rt := pipeline.NewHTTPCollector(next, combiner)
rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, combiner)

resp, err := rt.RoundTrip(req)

Expand Down
Loading
Loading