Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate handler to modelpb and prepare for decoder migration #62

Merged
merged 4 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

// Values used for populating the model structs
Expand All @@ -44,6 +46,7 @@ type Values struct {
HTTPHeader http.Header
LabelVal model.LabelValue
NumericLabelVal model.NumericLabelValue
MetricType modelpb.MetricType
// N controls how many elements are added to a slice or a map
N int
}
Expand Down Expand Up @@ -129,7 +132,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
switch fKind := f.Kind(); fKind {
case reflect.String:
fieldVal = reflect.ValueOf(values.Str)
case reflect.Int, reflect.Int64:
case reflect.Int, reflect.Int32, reflect.Int64:
fieldVal = reflect.ValueOf(values.Int).Convert(f.Type())
case reflect.Float64:
fieldVal = reflect.ValueOf(values.Float).Convert(f.Type())
Expand Down Expand Up @@ -301,6 +304,8 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
newVal = &val
case *int:
newVal = &values.Int
case int32:
newVal = int32(values.Int)
case uint32:
newVal = uint32(values.Int)
case *uint32:
Expand All @@ -319,10 +324,12 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
newVal = &values.Bool
case http.Header:
newVal = values.HTTPHeader
case time.Time:
newVal = values.Time
case *timestamppb.Timestamp:
newVal = timestamppb.New(values.Time)
case time.Duration:
newVal = values.Duration
case modelpb.MetricType:
newVal = values
default:
// the populator recursively iterates over struct and structPtr
// calling this function for all fields;
Expand Down Expand Up @@ -356,6 +363,9 @@ func IterateStruct(i interface{}, fn func(reflect.Value, string)) {
}

func iterateStruct(v reflect.Value, key string, fn func(f reflect.Value, fKey string)) {
if v.Type().Kind() == reflect.Ptr {
v = v.Elem()
}
t := v.Type()
if t.Kind() != reflect.Struct {
panic(fmt.Sprintf("iterateStruct: invalid type %s", t.Kind()))
Expand Down
28 changes: 21 additions & 7 deletions input/elasticapm/internal/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecoderutil"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

var (
Expand Down Expand Up @@ -95,7 +96,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *model.APMEvent) error {
// DecodeNestedError decodes an error from d, appending it to batch.
//
// DecodeNestedError should be used when the stream in the decoder contains the `error` key
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *model.Batch) error {
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchErrorRoot()
defer releaseErrorRoot(root)
if err := d.Decode(root); err != nil && err != io.EOF {
Expand All @@ -106,15 +107,17 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode
}
event := input.Base
mapToErrorModel(&root.Error, &event)
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
return nil
}

// DecodeNestedTransaction a transaction and zero or more nested spans and
// metricsets, appending them to batch.
//
// DecodeNestedTransaction should be used when the decoder contains the `transaction` key
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *model.Batch) error {
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchTransactionRoot()
defer releaseTransactionRoot(root)
if err := d.Decode(root); err != nil && err != io.EOF {
Expand All @@ -126,7 +129,9 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch

transaction := input.Base
mapToTransactionModel(&root.Transaction, &transaction)
*batch = append(*batch, transaction)
var out modelpb.APMEvent
transaction.ToModelProtobuf(&out)
*batch = append(*batch, &out)

for _, m := range root.Transaction.Metricsets {
event := input.Base
Expand All @@ -135,7 +140,9 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
Type: transaction.Transaction.Type,
}
if mapToTransactionMetricsetModel(&m, &event) {
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
}
}

Expand All @@ -146,12 +153,19 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
event.Transaction = &model.Transaction{ID: transaction.Transaction.ID}
event.Parent.ID = transaction.Transaction.ID // may be overridden later
event.Trace = transaction.Trace
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
}
spans := (*batch)[offset:]
for i, s := range root.Transaction.Spans {
if s.ParentIndex.IsSet() && s.ParentIndex.Val >= 0 && s.ParentIndex.Val < len(spans) {
spans[i].Parent.ID = spans[s.ParentIndex.Val].Span.ID
if e := spans[s.ParentIndex.Val]; e != nil {
if spans[i].Parent == nil {
spans[i].Parent = &modelpb.Parent{}
}
spans[i].Parent.Id = e.Span.Id
}
}
}
return nil
Expand Down
24 changes: 16 additions & 8 deletions input/elasticapm/internal/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecodertest"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetErrorOnRelease(t *testing.T) {
Expand All @@ -43,27 +46,32 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now()
now := time.Now().UTC()
eventBase := initializedMetadata()
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
defaultValues := modeldecodertest.DefaultValues()
defaultValues.Update(time.Unix(1599996822, 281000000).UTC())
modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), defaultValues)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{
Message: "abc",
LoggerName: "default",
},
}, batch[0].Error, protocmp.Transform()))

// if no timestamp is provided, leave base event timestamp unmodified
input = modeldecoder.Input{Base: eventBase}
str = `{"e":{"id":"a-b-c","log":{"mg":"abc"}}}`
dec = decoder.NewJSONDecoder(strings.NewReader(str))
batch = model.Batch{}
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp)
assert.Equal(t, now, batch[0].Timestamp.AsTime())

// test decode
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
Expand All @@ -72,7 +80,7 @@ func TestDecodeNestedError(t *testing.T) {
})

t.Run("validate", func(t *testing.T) {
var batch model.Batch
var batch modelpb.Batch
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &batch)
require.Error(t, err)
assert.Contains(t, err.Error(), "validation")
Expand Down
107 changes: 54 additions & 53 deletions input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecodertest"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetTransactionOnRelease(t *testing.T) {
Expand All @@ -44,13 +47,13 @@ func TestResetTransactionOnRelease(t *testing.T) {

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now()
now := time.Now().UTC()
eventBase := initializedMetadata()
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedTransaction(dec, &input, &batch))
require.Len(t, batch, 3) // 1 transaction, 1 metricset, 1 span
require.NotNil(t, batch[0].Transaction)
Expand All @@ -59,38 +62,30 @@ func TestDecodeNestedTransaction(t *testing.T) {

assert.Equal(t, "request", batch[0].Transaction.Type)
// fall back to request time
assert.Equal(t, now, batch[0].Timestamp)
assert.Equal(t, now, batch[0].Timestamp.AsTime())

// Ensure nested metricsets are decoded. RUMv3 only sends
// breakdown metrics, so the Metricsets will be empty and
// metrics will be recorded on the Transaction and Span
// fields.
assert.Equal(t, &model.Metricset{}, batch[1].Metricset)
assert.Equal(t, &model.Transaction{
assert.Empty(t, cmp.Diff(&modelpb.Metricset{}, batch[1].Metricset, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.Transaction{
Name: "tr-a",
Type: "request",
}, batch[1].Transaction)
assert.Equal(t, &model.Span{
}, batch[1].Transaction, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.Span{
Type: "span_type",
Subtype: "span_subtype",
SelfTime: model.AggregatedDuration{Count: 5},
}, batch[1].Span)
assert.Equal(t, now, batch[1].Timestamp)
SelfTime: &modelpb.AggregatedDuration{Count: 5},
}, batch[1].Span, protocmp.Transform()))
assert.Equal(t, now, batch[1].Timestamp.AsTime())

// ensure nested spans are decoded
start := time.Duration(20 * 1000 * 1000)
assert.Equal(t, now.Add(start), batch[2].Timestamp) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.ID)
assert.Equal(t, "1", batch[2].Trace.ID)
assert.Equal(t, "100", batch[2].Parent.ID)

for _, event := range batch {
modeldecodertest.AssertStructValues(
t, &event,
metadataExceptions("Timestamp"), // timestamp checked above
modeldecodertest.DefaultValues(),
)
}
assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.Id)
assert.Equal(t, "1", batch[2].Trace.Id)
assert.Equal(t, "100", batch[2].Parent.Id)

err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
require.Error(t, err)
Expand All @@ -103,46 +98,52 @@ func TestDecodeNestedTransaction(t *testing.T) {
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedTransaction(dec, &input, &batch))
marks := model.TransactionMarks{
"agent": map[string]float64{
"domComplete": 0.1,
"domInteractive": 0.2,
"domContentLoadedEventStart": 0.3,
"domContentLoadedEventEnd": 0.4,
"timeToFirstByte": 0.5,
"firstContentfulPaint": 0.6,
"largestContentfulPaint": 0.7,
"long": 0.8,
marks := map[string]*modelpb.TransactionMark{
"agent": {
Measurements: map[string]float64{
"domComplete": 0.1,
"domInteractive": 0.2,
"domContentLoadedEventStart": 0.3,
"domContentLoadedEventEnd": 0.4,
"timeToFirstByte": 0.5,
"firstContentfulPaint": 0.6,
"largestContentfulPaint": 0.7,
"long": 0.8,
},
},
"navigationTiming": map[string]float64{
"fetchStart": 0.1,
"domainLookupStart": 0.2,
"domainLookupEnd": 0.3,
"connectStart": 0.4,
"connectEnd": 0.5,
"requestStart": 0.6,
"responseStart": 0.7,
"responseEnd": 0.8,
"domLoading": 0.9,
"domInteractive": 0.11,
"domContentLoadedEventStart": 0.21,
"domContentLoadedEventEnd": 0.31,
"domComplete": 0.41,
"loadEventStart": 0.51,
"loadEventEnd": 6,
"long": 0.99,
"navigationTiming": {
Measurements: map[string]float64{
"fetchStart": 0.1,
"domainLookupStart": 0.2,
"domainLookupEnd": 0.3,
"connectStart": 0.4,
"connectEnd": 0.5,
"requestStart": 0.6,
"responseStart": 0.7,
"responseEnd": 0.8,
"domLoading": 0.9,
"domInteractive": 0.11,
"domContentLoadedEventStart": 0.21,
"domContentLoadedEventEnd": 0.31,
"domComplete": 0.41,
"loadEventStart": 0.51,
"loadEventEnd": 6,
"long": 0.99,
},
},
"long": map[string]float64{
"long": 0.1,
"long": {
Measurements: map[string]float64{
"long": 0.1,
},
},
}
assert.Equal(t, marks, batch[0].Transaction.Marks)
})

t.Run("validate", func(t *testing.T) {
var batch model.Batch
var batch modelpb.Batch
err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &batch)
require.Error(t, err)
assert.Contains(t, err.Error(), "validation")
Expand Down
Loading