From 480e967df35e5a19e85a1a191e11b42a4aa2074e Mon Sep 17 00:00:00 2001 From: Mario Date: Fri, 21 Jun 2024 15:11:17 +0200 Subject: [PATCH] Add caching to QueryRange requests (#3796) * Add caching to QueryRange requests * fmt * chlog --- CHANGELOG.md | 3 ++- modules/frontend/cache_keys.go | 5 ++++ .../frontend/metrics_query_range_sharder.go | 27 +++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ee8deebc85..799efdfcdb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,8 @@ * [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno) * [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott) * [ENHANCEMENT] Protect ingesters from panics by adding defer/recover to all read path methods. [#3790](https://github.com/grafana/tempo/pull/3790) (@joe-elliott) -* [ENHANCEMENT] Added a boolean flag to enable or disable dualstack mode on Storage block config for S3 [#3721](https://github.com/grafana/tempo/pull/3721) (@sid-jar, @mapno) +* [ENHANCEMENT] Added a boolean flag to enable or disable dualstack mode on Storage block config for S3 [#3721](https://github.com/grafana/tempo/pull/3721) (@sid-jar, @mapno) +* [ENHANCEMENT] Add caching to query range queries [#3796](https://github.com/grafana/tempo/pull/3796) (@mapno) * [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio) * [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio) * [BUGFIX] max_global_traces_per_user: take into account ingestion.tenant_shard_size when converting to local limit [#3618](https://github.com/grafana/tempo/pull/3618) (@kvrhdn) diff --git a/modules/frontend/cache_keys.go b/modules/frontend/cache_keys.go index e0fd5dd4bc1..a5692542b24 100644 --- a/modules/frontend/cache_keys.go +++ b/modules/frontend/cache_keys.go @@ -11,12 +11,17 @@ const ( cacheKeyPrefixSearchJob = "sj:" cacheKeyPrefixSearchTag = "st:" cacheKeyPrefixSearchTagValues = "stv:" + cacheKeyPrefixQueryRange = "qr:" ) func searchJobCacheKey(tenant string, queryHash uint64, start int64, end int64, meta *backend.BlockMeta, startPage, pagesToSearch int) string { return cacheKey(cacheKeyPrefixSearchJob, tenant, queryHash, start, end, meta, startPage, pagesToSearch) } +func queryRangeCacheKey(tenant string, queryHash uint64, start int64, end int64, meta *backend.BlockMeta, startPage, pagesToSearch int) string { + return cacheKey(cacheKeyPrefixQueryRange, tenant, queryHash, start, end, meta, startPage, pagesToSearch) +} + // cacheKey returns a string that can be used as a cache key for a backend search job. if a valid key cannot be calculated // it returns an empty string. func cacheKey(prefix string, tenant string, queryHash uint64, start int64, end int64, meta *backend.BlockMeta, startPage, pagesToSearch int) string { diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 65eae768421..fbf62e57d52 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -12,6 +12,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/grafana/dskit/user" "github.com/opentracing/opentracing-go" + "github.com/segmentio/fasthash/fnv1a" "github.com/grafana/tempo/modules/frontend/combiner" "github.com/grafana/tempo/modules/frontend/pipeline" @@ -352,6 +353,8 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) { defer close(reqCh) + queryHash := hashForQueryRangeRequest(&searchReq) + for _, m := range metas { pages := pagesPerRequest(m, targetBytesPerRequest) if pages == 0 { @@ -393,6 +396,10 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query()) // TODO: Handle sampling rate + key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch)) + if len(key) > 0 { + subR = pipeline.ContextAddCacheKey(key, subR) + } select { case reqCh <- subR: @@ -518,3 +525,23 @@ func (s *queryRangeSharder) jobInterval(expr *traceql.RootExpr, allowUnsafe bool // Else use configured value return s.cfg.Interval } + +func hashForQueryRangeRequest(req *tempopb.QueryRangeRequest) uint64 { + if req.Query == "" { + return 0 + } + + ast, err := traceql.Parse(req.Query) + if err != nil { // this should never occur. if we've made this far we've already validated the query can parse. however, for sanity, just fail to cache if we can't parse + return 0 + } + + // forces the query into a canonical form + query := ast.String() + + // add the query, limit and spss to the hash + hash := fnv1a.HashString64(query) + hash = fnv1a.AddUint64(hash, req.Step) + + return hash +}