From a36fb4b1a02f7a6a706bfbce33bfe0e6e8cdba87 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 19 Jun 2023 11:31:44 +0200 Subject: [PATCH] feat: migrate metadata decoding to protobuf (#11014) * feat: migrate metadata decoding to protobuf * build: bump apm-data * build: generate notice file --- NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- internal/beater/api/intake/handler.go | 21 +-------- internal/beater/api/intake/handler_test.go | 11 +++-- internal/beater/api/mux.go | 52 +++++++++++++--------- internal/beater/api/mux_test.go | 29 ++++++------ internal/sourcemap/processor_test.go | 23 ++++------ 8 files changed, 64 insertions(+), 82 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 7a36dfba15d..93d69cbb33d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -288,11 +288,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/elastic/apm-data -Version: v0.1.1-0.20230618124432-0acd047d5d2c +Version: v0.1.1-0.20230619091308-3ff577541509 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20230618124432-0acd047d5d2c/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20230619091308-3ff577541509/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 954072aebd2..3cad0daf712 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b github.com/dustin/go-humanize v1.0.1 - github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c + github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509 github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199 github.com/elastic/elastic-agent-client/v7 v7.1.2 github.com/elastic/elastic-agent-libs v0.3.9 diff --git a/go.sum b/go.sum index c0785ec5b18..abc1a7904b5 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9 github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c h1:742/YkvnG+0PXqH2wR5ewlNLVB9g6XCPNrQcGBYaZAU= -github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c/go.mod h1:2ZtuB/ipMCv+732o4ad/5MadDHSko4Z4CRFCtx2CfSw= +github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509 h1:u6By1eNybxB1OQ12PN5sx/xfwwpxIfLSPWlIjXqs9LA= +github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509/go.mod h1:2ZtuB/ipMCv+732o4ad/5MadDHSko4Z4CRFCtx2CfSw= github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199 h1:Ixr8BuP4eus39MPl+kLcDRvQqibWwKkNQJWpOE0QoHI= github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199/go.mod h1:lAPR+jtv6hHCRgDaKkUUL4xRRWVIIgb3ZQRC52cyVZo= github.com/elastic/elastic-agent-autodiscover v0.6.1 h1:vXR+3QVDL7Ij7IMKul13iIiDmM66HsX6MS6I0T4O8gw= diff --git a/internal/beater/api/intake/handler.go b/internal/beater/api/intake/handler.go index 111dc120824..9e1cf391fad 100644 --- a/internal/beater/api/intake/handler.go +++ b/internal/beater/api/intake/handler.go @@ -18,10 +18,8 @@ package intake import ( - "context" "errors" "fmt" - "io" "net/http" "strconv" "strings" @@ -31,7 +29,6 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-data/input/elasticapm" - "github.com/elastic/apm-data/model" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/beater/auth" "github.com/elastic/apm-server/internal/beater/headers" @@ -59,27 +56,13 @@ var ( errInvalidContentType = errors.New("invalid content type") ) -// StreamHandler is an interface for handling an Elastic APM agent ND-JSON event -// stream, implemented by processor/stream. -type StreamHandler interface { - HandleStream( - ctx context.Context, - async bool, - base model.APMEvent, - stream io.Reader, - batchSize int, - processor modelpb.BatchProcessor, - out *elasticapm.Result, - ) error -} - // RequestMetadataFunc is a function type supplied to Handler for extracting // metadata from the request. This is used for conditionally injecting the // source IP address as `client.ip` for RUM. -type RequestMetadataFunc func(*request.Context) model.APMEvent +type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent // Handler returns a request.Handler for managing intake requests for backend and rum events. -func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler { +func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler { return func(c *request.Context) { if err := validateRequest(c); err != nil { writeError(c, err) diff --git a/internal/beater/api/intake/handler_test.go b/internal/beater/api/intake/handler_test.go index 75962fb8e9c..cf15c2e7928 100644 --- a/internal/beater/api/intake/handler_test.go +++ b/internal/beater/api/intake/handler_test.go @@ -35,7 +35,6 @@ import ( "golang.org/x/sync/semaphore" "github.com/elastic/apm-data/input/elasticapm" - "github.com/elastic/apm-data/model" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/apm-server/internal/beater/headers" @@ -178,7 +177,7 @@ func TestIntakeHandlerMonitoring(t *testing.T) { streamHandler := streamHandlerFunc(func( ctx context.Context, async bool, - base model.APMEvent, + base *modelpb.APMEvent, stream io.Reader, batchSize int, processor modelpb.BatchProcessor, @@ -290,14 +289,14 @@ func compressedRequest(t *testing.T, compressionType string, compressPayload boo return req } -func emptyRequestMetadata(*request.Context) model.APMEvent { - return model.APMEvent{} +func emptyRequestMetadata(*request.Context) *modelpb.APMEvent { + return &modelpb.APMEvent{} } type streamHandlerFunc func( ctx context.Context, async bool, - base model.APMEvent, + base *modelpb.APMEvent, stream io.Reader, batchSize int, processor modelpb.BatchProcessor, @@ -307,7 +306,7 @@ type streamHandlerFunc func( func (f streamHandlerFunc) HandleStream( ctx context.Context, async bool, - base model.APMEvent, + base *modelpb.APMEvent, stream io.Reader, batchSize int, processor modelpb.BatchProcessor, diff --git a/internal/beater/api/mux.go b/internal/beater/api/mux.go index 1b4bf6fd99a..f009059f446 100644 --- a/internal/beater/api/mux.go +++ b/internal/beater/api/mux.go @@ -20,20 +20,19 @@ package api import ( "net/http" httppprof "net/http/pprof" - "net/netip" "regexp" "runtime/pprof" "github.com/gorilla/mux" "github.com/pkg/errors" "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/elasticapm" - "github.com/elastic/apm-data/model" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-data/model/modelprocessor" "github.com/elastic/apm-server/internal/agentcfg" @@ -296,43 +295,52 @@ func rootMiddleware(cfg *config.Config, authenticator *auth.Authenticator) []mid ) } -func baseRequestMetadata(c *request.Context) model.APMEvent { - return model.APMEvent{ - Timestamp: c.Timestamp, +func baseRequestMetadata(c *request.Context) *modelpb.APMEvent { + return &modelpb.APMEvent{ + Timestamp: timestamppb.New(c.Timestamp), } } -func backendRequestMetadataFunc(cfg *config.Config) func(c *request.Context) model.APMEvent { +func backendRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent { if !cfg.AugmentEnabled { return baseRequestMetadata } - return func(c *request.Context) model.APMEvent { - var hostIP []netip.Addr - if c.ClientIP.IsValid() { - hostIP = []netip.Addr{c.ClientIP} + return func(c *request.Context) *modelpb.APMEvent { + e := modelpb.APMEvent{ + Timestamp: timestamppb.New(c.Timestamp), } - return model.APMEvent{ - Host: model.Host{IP: hostIP}, - Timestamp: c.Timestamp, + + if c.ClientIP.IsValid() { + e.Host = &modelpb.Host{Ip: []string{c.ClientIP.String()}} } + return &e } } -func rumRequestMetadataFunc(cfg *config.Config) func(c *request.Context) model.APMEvent { +func rumRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent { if !cfg.AugmentEnabled { return baseRequestMetadata } - return func(c *request.Context) model.APMEvent { - e := model.APMEvent{ - Client: model.Client{IP: c.ClientIP}, - Source: model.Source{IP: c.SourceIP, Port: c.SourcePort}, - Timestamp: c.Timestamp, - UserAgent: model.UserAgent{Original: c.UserAgent}, + return func(c *request.Context) *modelpb.APMEvent { + e := modelpb.APMEvent{ + Timestamp: timestamppb.New(c.Timestamp), + } + if c.UserAgent != "" { + e.UserAgent = &modelpb.UserAgent{Original: c.UserAgent} + } + if c.ClientIP.IsValid() { + e.Client = &modelpb.Client{Ip: c.ClientIP.String()} + } + if c.SourcePort != 0 || c.SourceIP.IsValid() { + e.Source = &modelpb.Source{Port: uint32(c.SourcePort)} + if c.SourceIP.IsValid() { + e.Source.Ip = c.SourceIP.String() + } } if c.SourceNATIP.IsValid() { - e.Source.NAT = &model.NAT{IP: c.SourceNATIP} + e.Source.Nat = &modelpb.NAT{Ip: c.SourceNATIP.String()} } - return e + return &e } } diff --git a/internal/beater/api/mux_test.go b/internal/beater/api/mux_test.go index 27f4b3be07b..a5e449f7f90 100644 --- a/internal/beater/api/mux_test.go +++ b/internal/beater/api/mux_test.go @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" - "github.com/elastic/apm-data/model" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/agentcfg" "github.com/elastic/apm-server/internal/beater/auth" @@ -44,36 +43,36 @@ import ( ) func TestBackendRequestMetadata(t *testing.T) { - tNow := time.Now() + tNow := time.Now().UTC() c := &request.Context{Timestamp: tNow} cfg := &config.Config{AugmentEnabled: true} event := backendRequestMetadataFunc(cfg)(c) - assert.Equal(t, tNow, event.Timestamp) - assert.Equal(t, model.Host{}, event.Host) + assert.Equal(t, tNow, event.Timestamp.AsTime()) + assert.Nil(t, nil, event.Host) c.ClientIP = netip.MustParseAddr("127.0.0.1") event = backendRequestMetadataFunc(cfg)(c) - assert.Equal(t, tNow, event.Timestamp) - assert.NotEqual(t, model.Host{}, event.Host) + assert.Equal(t, tNow, event.Timestamp.AsTime()) + assert.Equal(t, &modelpb.Host{Ip: []string{c.ClientIP.String()}}, event.Host) } func TestRUMRequestMetadata(t *testing.T) { - tNow := time.Now() + tNow := time.Now().UTC() c := &request.Context{Timestamp: tNow} cfg := &config.Config{AugmentEnabled: true} event := rumRequestMetadataFunc(cfg)(c) - assert.Equal(t, tNow, event.Timestamp) - assert.Equal(t, model.Client{}, event.Client) - assert.Equal(t, model.Source{}, event.Source) - assert.Equal(t, model.UserAgent{}, event.UserAgent) + assert.Equal(t, tNow, event.Timestamp.AsTime()) + assert.Nil(t, event.Client) + assert.Nil(t, event.Source) + assert.Nil(t, event.UserAgent) ip := netip.MustParseAddr("127.0.0.1") c = &request.Context{Timestamp: tNow, ClientIP: ip, SourceIP: ip, UserAgent: "firefox"} event = rumRequestMetadataFunc(cfg)(c) - assert.Equal(t, tNow, event.Timestamp) - assert.NotEqual(t, model.Client{}, event.Client) - assert.NotEqual(t, model.Source{}, event.Source) - assert.NotEqual(t, model.UserAgent{}, event.UserAgent) + assert.Equal(t, tNow, event.Timestamp.AsTime()) + assert.Equal(t, &modelpb.Client{Ip: c.ClientIP.String()}, event.Client) + assert.Equal(t, &modelpb.Source{Ip: c.SourceIP.String()}, event.Source) + assert.Equal(t, &modelpb.UserAgent{Original: c.UserAgent}, event.UserAgent) } func requestToMuxerWithPattern(cfg *config.Config, pattern string) (*httptest.ResponseRecorder, error) { diff --git a/internal/sourcemap/processor_test.go b/internal/sourcemap/processor_test.go index effe139a2be..07185d15b9d 100644 --- a/internal/sourcemap/processor_test.go +++ b/internal/sourcemap/processor_test.go @@ -31,7 +31,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/apm-data/model" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/logs" @@ -227,12 +226,6 @@ func TestBatchProcessor(t *testing.T) { }, error3.Error, protocmp.Transform())) } -func toPb(e *model.APMEvent) *modelpb.APMEvent { - var out modelpb.APMEvent - e.ToModelProtobuf(&out) - return &out -} - func TestBatchProcessorElasticsearchUnavailable(t *testing.T) { client := newUnavailableElasticsearchClient(t) fetcher := NewElasticsearchFetcher(client, "index") @@ -296,19 +289,19 @@ func TestBatchProcessorTimeout(t *testing.T) { require.NoError(t, err) fetcher := NewElasticsearchFetcher(client, "index") - frame := model.StacktraceFrame{ + frame := modelpb.StacktraceFrame{ AbsPath: "bundle.js", Lineno: newInt(0), Colno: newInt(0), Function: "original function", } - span := model.APMEvent{ - Service: model.Service{ + span := modelpb.APMEvent{ + Service: &modelpb.Service{ Name: "service_name", Version: "service_version", }, - Span: &model.Span{ - Stacktrace: model.Stacktrace{cloneFrame(frame)}, + Span: &modelpb.Span{ + Stacktrace: []*modelpb.StacktraceFrame{cloneFrame(&frame)}, }, } @@ -318,14 +311,14 @@ func TestBatchProcessorTimeout(t *testing.T) { Timeout: 100 * time.Millisecond, Logger: logp.NewLogger(logs.Stacktrace), } - err = processor.ProcessBatch(context.Background(), &modelpb.Batch{toPb(&span)}) + err = processor.ProcessBatch(context.Background(), &modelpb.Batch{&span}) assert.NoError(t, err) taken := time.Since(before) assert.Less(t, taken, time.Second) } -func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame { - return &frame +func cloneFrame(frame *modelpb.StacktraceFrame) *modelpb.StacktraceFrame { + return proto.Clone(frame).(*modelpb.StacktraceFrame) } func newInt(v uint32) *uint32 {