Skip to content

Commit

Permalink
feat: migrate metadata decoding to protobuf (#11014)
Browse files Browse the repository at this point in the history
* feat: migrate metadata decoding to protobuf

* build: bump apm-data

* build: generate notice file
  • Loading branch information
kruskall committed Jun 19, 2023
1 parent 4894ff3 commit a36fb4b
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 82 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/elastic/apm-data
Version: v0.1.1-0.20230618124432-0acd047d5d2c
Version: v0.1.1-0.20230619091308-3ff577541509
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20230618124432-0acd047d5d2c/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20230619091308-3ff577541509/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b
github.com/dustin/go-humanize v1.0.1
github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c
github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199
github.com/elastic/elastic-agent-client/v7 v7.1.2
github.com/elastic/elastic-agent-libs v0.3.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c h1:742/YkvnG+0PXqH2wR5ewlNLVB9g6XCPNrQcGBYaZAU=
github.com/elastic/apm-data v0.1.1-0.20230618124432-0acd047d5d2c/go.mod h1:2ZtuB/ipMCv+732o4ad/5MadDHSko4Z4CRFCtx2CfSw=
github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509 h1:u6By1eNybxB1OQ12PN5sx/xfwwpxIfLSPWlIjXqs9LA=
github.com/elastic/apm-data v0.1.1-0.20230619091308-3ff577541509/go.mod h1:2ZtuB/ipMCv+732o4ad/5MadDHSko4Z4CRFCtx2CfSw=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199 h1:Ixr8BuP4eus39MPl+kLcDRvQqibWwKkNQJWpOE0QoHI=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20230615211647-139a8bf9e199/go.mod h1:lAPR+jtv6hHCRgDaKkUUL4xRRWVIIgb3ZQRC52cyVZo=
github.com/elastic/elastic-agent-autodiscover v0.6.1 h1:vXR+3QVDL7Ij7IMKul13iIiDmM66HsX6MS6I0T4O8gw=
Expand Down
21 changes: 2 additions & 19 deletions internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package intake

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand All @@ -31,7 +29,6 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
Expand Down Expand Up @@ -59,27 +56,13 @@ var (
errInvalidContentType = errors.New("invalid content type")
)

// StreamHandler is an interface for handling an Elastic APM agent ND-JSON event
// stream, implemented by processor/stream.
type StreamHandler interface {
HandleStream(
ctx context.Context,
async bool,
base model.APMEvent,
stream io.Reader,
batchSize int,
processor modelpb.BatchProcessor,
out *elasticapm.Result,
) error
}

// RequestMetadataFunc is a function type supplied to Handler for extracting
// metadata from the request. This is used for conditionally injecting the
// source IP address as `client.ip` for RUM.
type RequestMetadataFunc func(*request.Context) model.APMEvent
type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent

// Handler returns a request.Handler for managing intake requests for backend and rum events.
func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
return func(c *request.Context) {
if err := validateRequest(c); err != nil {
writeError(c, err)
Expand Down
11 changes: 5 additions & 6 deletions internal/beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"golang.org/x/sync/semaphore"

"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/beater/headers"
Expand Down Expand Up @@ -178,7 +177,7 @@ func TestIntakeHandlerMonitoring(t *testing.T) {
streamHandler := streamHandlerFunc(func(
ctx context.Context,
async bool,
base model.APMEvent,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
processor modelpb.BatchProcessor,
Expand Down Expand Up @@ -290,14 +289,14 @@ func compressedRequest(t *testing.T, compressionType string, compressPayload boo
return req
}

func emptyRequestMetadata(*request.Context) model.APMEvent {
return model.APMEvent{}
func emptyRequestMetadata(*request.Context) *modelpb.APMEvent {
return &modelpb.APMEvent{}
}

type streamHandlerFunc func(
ctx context.Context,
async bool,
base model.APMEvent,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
processor modelpb.BatchProcessor,
Expand All @@ -307,7 +306,7 @@ type streamHandlerFunc func(
func (f streamHandlerFunc) HandleStream(
ctx context.Context,
async bool,
base model.APMEvent,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
processor modelpb.BatchProcessor,
Expand Down
52 changes: 30 additions & 22 deletions internal/beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@ package api
import (
"net/http"
httppprof "net/http/pprof"
"net/netip"
"regexp"
"runtime/pprof"

"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/apm-server/internal/agentcfg"
Expand Down Expand Up @@ -296,43 +295,52 @@ func rootMiddleware(cfg *config.Config, authenticator *auth.Authenticator) []mid
)
}

func baseRequestMetadata(c *request.Context) model.APMEvent {
return model.APMEvent{
Timestamp: c.Timestamp,
func baseRequestMetadata(c *request.Context) *modelpb.APMEvent {
return &modelpb.APMEvent{
Timestamp: timestamppb.New(c.Timestamp),
}
}

func backendRequestMetadataFunc(cfg *config.Config) func(c *request.Context) model.APMEvent {
func backendRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent {
if !cfg.AugmentEnabled {
return baseRequestMetadata
}
return func(c *request.Context) model.APMEvent {
var hostIP []netip.Addr
if c.ClientIP.IsValid() {
hostIP = []netip.Addr{c.ClientIP}
return func(c *request.Context) *modelpb.APMEvent {
e := modelpb.APMEvent{
Timestamp: timestamppb.New(c.Timestamp),
}
return model.APMEvent{
Host: model.Host{IP: hostIP},
Timestamp: c.Timestamp,

if c.ClientIP.IsValid() {
e.Host = &modelpb.Host{Ip: []string{c.ClientIP.String()}}
}
return &e
}
}

func rumRequestMetadataFunc(cfg *config.Config) func(c *request.Context) model.APMEvent {
func rumRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent {
if !cfg.AugmentEnabled {
return baseRequestMetadata
}
return func(c *request.Context) model.APMEvent {
e := model.APMEvent{
Client: model.Client{IP: c.ClientIP},
Source: model.Source{IP: c.SourceIP, Port: c.SourcePort},
Timestamp: c.Timestamp,
UserAgent: model.UserAgent{Original: c.UserAgent},
return func(c *request.Context) *modelpb.APMEvent {
e := modelpb.APMEvent{
Timestamp: timestamppb.New(c.Timestamp),
}
if c.UserAgent != "" {
e.UserAgent = &modelpb.UserAgent{Original: c.UserAgent}
}
if c.ClientIP.IsValid() {
e.Client = &modelpb.Client{Ip: c.ClientIP.String()}
}
if c.SourcePort != 0 || c.SourceIP.IsValid() {
e.Source = &modelpb.Source{Port: uint32(c.SourcePort)}
if c.SourceIP.IsValid() {
e.Source.Ip = c.SourceIP.String()
}
}
if c.SourceNATIP.IsValid() {
e.Source.NAT = &model.NAT{IP: c.SourceNATIP}
e.Source.Nat = &modelpb.NAT{Ip: c.SourceNATIP.String()}
}
return e
return &e
}
}

Expand Down
29 changes: 14 additions & 15 deletions internal/beater/api/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
Expand All @@ -44,36 +43,36 @@ import (
)

func TestBackendRequestMetadata(t *testing.T) {
tNow := time.Now()
tNow := time.Now().UTC()
c := &request.Context{Timestamp: tNow}
cfg := &config.Config{AugmentEnabled: true}
event := backendRequestMetadataFunc(cfg)(c)
assert.Equal(t, tNow, event.Timestamp)
assert.Equal(t, model.Host{}, event.Host)
assert.Equal(t, tNow, event.Timestamp.AsTime())
assert.Nil(t, nil, event.Host)

c.ClientIP = netip.MustParseAddr("127.0.0.1")
event = backendRequestMetadataFunc(cfg)(c)
assert.Equal(t, tNow, event.Timestamp)
assert.NotEqual(t, model.Host{}, event.Host)
assert.Equal(t, tNow, event.Timestamp.AsTime())
assert.Equal(t, &modelpb.Host{Ip: []string{c.ClientIP.String()}}, event.Host)
}

func TestRUMRequestMetadata(t *testing.T) {
tNow := time.Now()
tNow := time.Now().UTC()
c := &request.Context{Timestamp: tNow}
cfg := &config.Config{AugmentEnabled: true}
event := rumRequestMetadataFunc(cfg)(c)
assert.Equal(t, tNow, event.Timestamp)
assert.Equal(t, model.Client{}, event.Client)
assert.Equal(t, model.Source{}, event.Source)
assert.Equal(t, model.UserAgent{}, event.UserAgent)
assert.Equal(t, tNow, event.Timestamp.AsTime())
assert.Nil(t, event.Client)
assert.Nil(t, event.Source)
assert.Nil(t, event.UserAgent)

ip := netip.MustParseAddr("127.0.0.1")
c = &request.Context{Timestamp: tNow, ClientIP: ip, SourceIP: ip, UserAgent: "firefox"}
event = rumRequestMetadataFunc(cfg)(c)
assert.Equal(t, tNow, event.Timestamp)
assert.NotEqual(t, model.Client{}, event.Client)
assert.NotEqual(t, model.Source{}, event.Source)
assert.NotEqual(t, model.UserAgent{}, event.UserAgent)
assert.Equal(t, tNow, event.Timestamp.AsTime())
assert.Equal(t, &modelpb.Client{Ip: c.ClientIP.String()}, event.Client)
assert.Equal(t, &modelpb.Source{Ip: c.SourceIP.String()}, event.Source)
assert.Equal(t, &modelpb.UserAgent{Original: c.UserAgent}, event.UserAgent)
}

func requestToMuxerWithPattern(cfg *config.Config, pattern string) (*httptest.ResponseRecorder, error) {
Expand Down
23 changes: 8 additions & 15 deletions internal/sourcemap/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -227,12 +226,6 @@ func TestBatchProcessor(t *testing.T) {
}, error3.Error, protocmp.Transform()))
}

func toPb(e *model.APMEvent) *modelpb.APMEvent {
var out modelpb.APMEvent
e.ToModelProtobuf(&out)
return &out
}

func TestBatchProcessorElasticsearchUnavailable(t *testing.T) {
client := newUnavailableElasticsearchClient(t)
fetcher := NewElasticsearchFetcher(client, "index")
Expand Down Expand Up @@ -296,19 +289,19 @@ func TestBatchProcessorTimeout(t *testing.T) {
require.NoError(t, err)
fetcher := NewElasticsearchFetcher(client, "index")

frame := model.StacktraceFrame{
frame := modelpb.StacktraceFrame{
AbsPath: "bundle.js",
Lineno: newInt(0),
Colno: newInt(0),
Function: "original function",
}
span := model.APMEvent{
Service: model.Service{
span := modelpb.APMEvent{
Service: &modelpb.Service{
Name: "service_name",
Version: "service_version",
},
Span: &model.Span{
Stacktrace: model.Stacktrace{cloneFrame(frame)},
Span: &modelpb.Span{
Stacktrace: []*modelpb.StacktraceFrame{cloneFrame(&frame)},
},
}

Expand All @@ -318,14 +311,14 @@ func TestBatchProcessorTimeout(t *testing.T) {
Timeout: 100 * time.Millisecond,
Logger: logp.NewLogger(logs.Stacktrace),
}
err = processor.ProcessBatch(context.Background(), &modelpb.Batch{toPb(&span)})
err = processor.ProcessBatch(context.Background(), &modelpb.Batch{&span})
assert.NoError(t, err)
taken := time.Since(before)
assert.Less(t, taken, time.Second)
}

func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame {
return &frame
func cloneFrame(frame *modelpb.StacktraceFrame) *modelpb.StacktraceFrame {
return proto.Clone(frame).(*modelpb.StacktraceFrame)
}

func newInt(v uint32) *uint32 {
Expand Down

0 comments on commit a36fb4b

Please sign in to comment.