Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frontend Refactor #3400

Merged
merged 28 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9591c52
async handlers
joe-elliott Jan 19, 2024
c8f47f5
search pipeline
joe-elliott Feb 1, 2024
5883ab4
cleanup
joe-elliott Feb 15, 2024
37cb548
lint
joe-elliott Feb 15, 2024
753d959
fixed api tests
joe-elliott Feb 15, 2024
42d22e9
make err last param - lint
joe-elliott Feb 15, 2024
d924132
lint
joe-elliott Feb 15, 2024
4ae78ca
remove test for dropped metrics
joe-elliott Feb 16, 2024
82a2439
pipeline readme
joe-elliott Feb 16, 2024
25fb5ad
cleanup
joe-elliott Feb 20, 2024
e40432b
Merge branch 'main' into frontend-o7-beta
joe-elliott Feb 20, 2024
2a8aff0
responses cleanup
joe-elliott Feb 20, 2024
d2ec088
review comments
joe-elliott Feb 26, 2024
a1c8833
Merge branch 'main' into frontend-o7-beta
joe-elliott Feb 26, 2024
87edbca
remove unnecessary goroutine
joe-elliott Feb 26, 2024
3904855
async sharder takes len
joe-elliott Feb 26, 2024
78f71eb
nolint
joe-elliott Feb 26, 2024
4b01755
AsyncSharder rename/cleanup
joe-elliott Feb 26, 2024
6676b4e
lint
joe-elliott Feb 27, 2024
8da6169
fix integration tests
joe-elliott Feb 27, 2024
5140496
lint >.<
joe-elliott Feb 27, 2024
6c7eeaf
remove special context handling in the grpc collector
joe-elliott Feb 27, 2024
cf260d0
prevent goroutine leak when sending to a full channel
joe-elliott Feb 27, 2024
fd90fe0
buffer error to guarantee at least one makes it
joe-elliott Feb 27, 2024
2222855
responses in error should stay in error
joe-elliott Feb 28, 2024
752ad3d
test for returning the same error
joe-elliott Feb 28, 2024
0589996
don't pass requests in error forward
joe-elliott Feb 28, 2024
7375e3b
log http results even on err
joe-elliott Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/tempo-cli/cmd-query-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"path"
Expand Down Expand Up @@ -113,7 +112,7 @@ func (cmd *querySearchCmd) searchHTTP(req *tempopb.SearchRequest) error {
return err
}

fmt.Println(httpReq)
// fmt.Println(httpReq)
httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
Expand All @@ -126,7 +125,7 @@ func (cmd *querySearchCmd) searchHTTP(req *tempopb.SearchRequest) error {
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query: " + string(body))
return errors.New("failed to query. body: " + string(body) + " status: " + httpResp.Status)
}

resp := &tempopb.SearchResponse{}
Expand Down
16 changes: 1 addition & 15 deletions integration/e2e/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,6 @@ func TestMultiTenantSearch(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, tagsValuesV2Resp.TagValues)

// assert tenant federation metrics
if tc.tenantSize > 1 {
for _, ta := range tenants {
matcher, err := labels.NewMatcher(labels.MatchEqual, "tenant", ta)
require.NoError(t, err)
// we should have 8 requests for each tenant
err = tempo.WaitSumMetricsWithOptions(e2e.Equals(8),
[]string{"tempo_query_frontend_multitenant_success_total"},
e2e.WithLabelMatchers(matcher),
)
require.NoError(t, err)
}
}

// check metrics for all routes
routeTable := []struct {
route string
Expand Down Expand Up @@ -215,7 +201,7 @@ func TestMultiTenantSearch(t *testing.T) {

time.Sleep(2 * time.Second) // ensure that blocklist poller has built the blocklist
now := time.Now()
util.SearchStreamAndAssertTrace(t, grpcCtx, grpcClient, info, now.Add(-5*time.Minute).Unix(), now.Add(5*time.Minute).Unix())
util.SearchStreamAndAssertTrace(t, grpcCtx, grpcClient, info, now.Add(-10*time.Minute).Unix(), now.Add(10*time.Minute).Unix())
assertRequestCountMetric(t, tempo, "/tempopb.StreamingQuerier/Search", 1)

// test unsupported endpoint
Expand Down
69 changes: 0 additions & 69 deletions modules/frontend/cache.go

This file was deleted.

145 changes: 118 additions & 27 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
@@ -1,74 +1,104 @@
package combiner

import (
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"google.golang.org/grpc/codes"
)

type TResponse interface {
*tempopb.SearchResponse | *tempopb.SearchTagsResponse | *tempopb.SearchTagsV2Response | *tempopb.SearchTagValuesResponse | *tempopb.SearchTagValuesV2Response
proto.Message
}

type genericCombiner[R TResponse] struct {
type genericCombiner[T TResponse] struct {
mu sync.Mutex

final R
current T // todo: state mgmt is mixed between the combiner and the various implementations. put it in one spot.

combine func(body io.ReadCloser, final R) error
result func(R) (string, error)
new func() T
combine func(partial T, final T) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool

code int
statusMessage string
err error
//
httpStatusCode int
httpRespBody string
}

func (c *genericCombiner[R]) AddRequest(res *http.Response, _ string) error {
// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(res *http.Response, _ string) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.shouldQuit() {
if res == nil {
return nil
}

c.code = res.StatusCode
// todo: reevaluate this. should the caller owner the lifecycle of the http.response body?
defer func() { _ = res.Body.Close() }()

if c.shouldQuit() {
return nil
}

c.httpStatusCode = res.StatusCode
if res.StatusCode != http.StatusOK {
bytesMsg, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("error reading response body: %w", err)
}
c.statusMessage = string(bytesMsg)
return errors.New(c.statusMessage)
c.httpRespBody = string(bytesMsg)
c.httpStatusCode = res.StatusCode
// don't return error. the error path is reserved for unexpected errors.
// http pipeline errors should be returned through the final response. (Complete/TypedComplete/TypedDiff)
return nil
}

defer func() { _ = res.Body.Close() }()
if err := c.combine(res.Body, c.final); err != nil {
c.statusMessage = internalErrorMsg
c.err = fmt.Errorf("error unmarshalling response body: %w", err)
return c.err
partial := c.new() // instantiating directly requires additional type constraints. this seemed cleaner: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
if err := jsonpb.Unmarshal(res.Body, partial); err != nil {
return fmt.Errorf("error unmarshalling response body: %w", err)
}

if err := c.combine(partial, c.current); err != nil {
c.httpRespBody = internalErrorMsg
return fmt.Errorf("error combining in combiner: %w", err)
}

return nil
}

func (c *genericCombiner[R]) Complete() (*http.Response, error) {
// HTTPFinal, GRPCComplete, and GRPCDiff are all responsible for returning something
// usable in grpc streaming/http response.
func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()

bodyString, err := c.result(c.final)
httpErr, _ := c.erroredResponse()
if httpErr != nil {
return httpErr, nil
}

final, err := c.finalize(c.current)
if err != nil {
return nil, err
}

bodyString, err := new(jsonpb.Marshaler).MarshalToString(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}

return &http.Response{
StatusCode: c.code,
StatusCode: c.httpStatusCode,
Header: http.Header{
api.HeaderContentType: {api.HeaderAcceptJSON},
},
Expand All @@ -77,11 +107,68 @@ func (c *genericCombiner[R]) Complete() (*http.Response, error) {
}, nil
}

func (c *genericCombiner[T]) GRPCFinal() (T, error) {
c.mu.Lock()
defer c.mu.Unlock()

var empty T
_, grpcErr := c.erroredResponse()
if grpcErr != nil {
return empty, grpcErr
}

final, err := c.finalize(c.current)
if err != nil {
return empty, err
}

return final, nil
}

func (c *genericCombiner[T]) GRPCDiff() (T, error) {
c.mu.Lock()
defer c.mu.Unlock()

var empty T
_, grpcErr := c.erroredResponse()
if grpcErr != nil {
return empty, grpcErr
}

diff, err := c.diff(c.current)
if err != nil {
return empty, err
}

return diff, nil
}

func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
if c.httpStatusCode == http.StatusOK {
return nil, nil
}

// build grpc error and http response
var grpcErr error
if c.httpStatusCode/100 == 5 {
grpcErr = status.Error(codes.Internal, c.httpRespBody)
} else {
grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody)
}
httpResp := &http.Response{
StatusCode: c.httpStatusCode,
Status: http.StatusText(c.httpStatusCode),
Body: io.NopCloser(strings.NewReader(c.httpRespBody)),
}

return httpResp, grpcErr
}

func (c *genericCombiner[R]) StatusCode() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.code
return c.httpStatusCode
}

func (c *genericCombiner[R]) ShouldQuit() bool {
Expand All @@ -92,14 +179,18 @@ func (c *genericCombiner[R]) ShouldQuit() bool {
}

func (c *genericCombiner[R]) shouldQuit() bool {
if c.err != nil {
if c.httpStatusCode/100 == 5 { // Bail on 5xx
return true
}

if c.httpStatusCode/100 == 4 { // Bail on 4xx
return true
}

if c.code/100 == 5 { // Bail on 5xx
if c.quit != nil && c.quit(c.current) {
return true
}

// 2xx and 404 are OK
// 2xx
return false
}
Loading
Loading