Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jun 17, 2019
1 parent 65bc3a1 commit 6d58ad2
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 30 deletions.
36 changes: 34 additions & 2 deletions breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package apm

import (
"fmt"
"sync"
"time"

"go.elastic.co/apm/model"
Expand All @@ -35,6 +37,14 @@ const (
appSpanType = "app"
)

var (
breakdownMetricsLimitWarning = fmt.Sprintf(`
The limit of %d breakdown metricsets has been reached, no new metricsets will be created.
Try to name your transactions so that there are less distinct transaction names.`[1:],
breakdownMetricsLimit,
)
)

// spanTimingsKey identifies a span type and subtype, for use as the key in
// spanTimingsMap.
type spanTimingsKey struct {
Expand Down Expand Up @@ -88,7 +98,12 @@ func newBreakdownMetrics() *breakdownMetrics {
// accumulates new breakdown metrics, and is swapped with the "inactive" map
// just prior to when metrics gathering begins. When metrics gathering
// completes, the inactive map will be empty.
//
// breakdownMetrics may be written to concurrently by the tracer, and any
// number of other goroutines when a transaction cannot be enqueued.
type breakdownMetrics struct {
// mu serializes updates to active, and swapping of active/inactive.
mu sync.RWMutex
active, inactive map[breakdownMetricsKey]breakdownTiming
}

Expand All @@ -107,7 +122,14 @@ type breakdownTiming struct {
}

// recordTransaction records breakdown metrics for td into m.
//
// recordTransaction returns true if breakdown metrics were
// completely recorded, and false if any metrics were not
// recorded due to the limit being reached.
func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {
m.mu.Lock()
defer m.mu.Unlock()

k := breakdownMetricsKey{
transactionType: td.Type,
transactionName: td.Name,
Expand All @@ -121,11 +143,17 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {

endTime := td.timestamp.Add(td.Duration)
transactionSelfTime := td.Duration - td.childrenTimer.finalDuration(endTime)
ok := m.record(k, breakdownTiming{
if !m.record(k, breakdownTiming{
transaction: spanTiming{count: 1, duration: td.Duration},
breakdownCount: breakdownCount,
span: spanTiming{count: 1, duration: transactionSelfTime},
})
}) {
// We couldn't record the transaction's metricset, so we won't
// be able to record spans for that transaction either.
return false
}

ok := true
for sk, timing := range td.spanTimings {
k.spanTimingsKey = sk
ok = ok && m.record(k, breakdownTiming{span: timing})
Expand All @@ -149,11 +177,15 @@ func (m *breakdownMetrics) record(k breakdownMetricsKey, bt breakdownTiming) boo

func (m *breakdownMetrics) swap() {
// m.inactive is empty, having been cleared by m.gather.
m.mu.Lock()
m.active, m.inactive = m.inactive, m.active
m.mu.Unlock()
}

// gather is called by builtinMetricsGatherer to gather breakdown metrics.
func (m *breakdownMetrics) gather(out *Metrics) {
// It is not necessary to hold m.mu, since nothing concurrently
// accesses m.inactive while the gatherer is iterating over it.
for k, d := range m.inactive {
if d.transaction.count > 0 {
out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{
Expand Down
96 changes: 79 additions & 17 deletions breakdown_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apm_test

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -34,7 +52,7 @@ func TestBreakdownMetrics_NonSampled(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -65,12 +83,56 @@ func TestBreakdownMetrics_SpanDropped(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}

func TestBreakdownMetrics_MetricsLimit(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()

for i := 0; i < 3000; i++ {
tx := tracer.StartTransaction(fmt.Sprintf("%d", i), "request")
tx.End()
}
tracer.Flush(nil)
tracer.SendMetrics(nil)

// There should be 1000 breakdown metrics keys buckets retained
// in-memory. Transaction count and duration metrics piggy-back
// on the self_time for the "app" bucket, so some of those buckets
// may generate multiple metricsets on the wire.
metrics := payloadsBreakdownMetrics(transport)
assert.Len(t, metrics, 2000)
}

func TestBreakdownMetrics_TransactionDropped(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()

// Send transactions until one gets dropped, so we can check that
// the dropped transactions are included in the breakdown.
var count int
for tracer.Stats().TransactionsDropped == 0 {
for i := 0; i < 1000; i++ {
tx := tracer.StartTransaction("test", "request")
tx.Duration = 10 * time.Millisecond
tx.End()
count++
}
}

tracer.Flush(nil)
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", count, time.Duration(count)*10*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", count, time.Duration(count)*10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}

// total self type
// ██████████████████████████████ 30 30 transaction
// 10 20 30
Expand All @@ -86,7 +148,7 @@ func TestBreakdownMetrics_AcceptanceTest1(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 30*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}
Expand All @@ -111,7 +173,7 @@ func TestBreakdownMetrics_AcceptanceTest2(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -139,7 +201,7 @@ func TestBreakdownMetrics_AcceptanceTest3(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 2, 30*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}
Expand Down Expand Up @@ -168,7 +230,7 @@ func TestBreakdownMetrics_AcceptanceTest4(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -198,7 +260,7 @@ func TestBreakdownMetrics_AcceptanceTest5(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 15*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -228,7 +290,7 @@ func TestBreakdownMetrics_AcceptanceTest6(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -258,7 +320,7 @@ func TestBreakdownMetrics_AcceptanceTest7(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -290,7 +352,7 @@ func TestBreakdownMetrics_AcceptanceTest8(t *testing.T) {
tracer.SendMetrics(nil)

assertBreakdownMetrics(t, []model.Metrics{
transactionDurationMetrics("test", "request", 30*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 30*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 2, 25*time.Millisecond),
spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
Expand Down Expand Up @@ -327,7 +389,7 @@ func TestBreakdownMetrics_AcceptanceTest9(t *testing.T) {
// "app" span should not be included in the self_time value,
// it should only have been used for subtracting from the
// transaction's duration.
transactionDurationMetrics("test", "request", 20*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}
Expand All @@ -354,7 +416,7 @@ func TestBreakdownMetrics_AcceptanceTest10(t *testing.T) {
assertBreakdownMetrics(t, []model.Metrics{
// The db.mysql span should not be included in breakdown,
// as it ended after the transaction ended.
transactionDurationMetrics("test", "request", 20*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 20*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}
Expand All @@ -381,19 +443,19 @@ func TestBreakdownMetrics_AcceptanceTest11(t *testing.T) {
assertBreakdownMetrics(t, []model.Metrics{
// The db.mysql span should not be included in breakdown,
// as it started and ended after the transaction ended.
transactionDurationMetrics("test", "request", 10*time.Millisecond),
transactionDurationMetrics("test", "request", 1, 10*time.Millisecond),
spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond),
}, payloadsBreakdownMetrics(transport))
}

func transactionDurationMetrics(txName, txType string, d time.Duration) model.Metrics {
func transactionDurationMetrics(txName, txType string, count int, sum time.Duration) model.Metrics {
return model.Metrics{
TransactionType: txType,
TransactionName: txName,
Samples: map[string]model.Metric{
"transaction.breakdown.count": {Value: 1},
"transaction.duration.count": {Value: 1},
"transaction.duration.sum": {Value: d.Seconds()},
"transaction.breakdown.count": {Value: float64(count)},
"transaction.duration.count": {Value: float64(count)},
"transaction.duration.sum": {Value: sum.Seconds()},
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion env.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
defaultAPIRequestSize = 750 * apmconfig.KByte
defaultAPIRequestTime = 10 * time.Second
defaultAPIBufferSize = 1 * apmconfig.MByte
defaultMetricsBufferSize = 100 * apmconfig.KByte
defaultMetricsBufferSize = 750 * apmconfig.KByte
defaultMetricsInterval = 30 * time.Second
defaultMaxSpans = 500
defaultCaptureHeaders = true
Expand Down
6 changes: 6 additions & 0 deletions internal/apmlog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func parseLogLevel(s string) (logLevel, error) {
type Logger interface {
Debugf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Warningf(format string, args ...interface{})
}

type levelLogger struct {
Expand All @@ -136,6 +137,11 @@ func (l levelLogger) Errorf(format string, args ...interface{}) {
l.logf(errorLevel, format, args...)
}

// Warningf logs a message with log.Printf, with a WARNING prefix.
func (l levelLogger) Warningf(format string, args ...interface{}) {
l.logf(warnLevel, format, args...)
}

func (l levelLogger) logf(level logLevel, format string, args ...interface{}) {
if level < l.level {
return
Expand Down
26 changes: 26 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,29 @@ type Logger interface {
// Errorf logs a message at error level.
Errorf(format string, args ...interface{})
}

// WarningLogger extends Logger with a Warningf method.
//
// TODO(axw) this will be removed in v2.0.0, and the
// Warningf method will be added directly to Logger.
type WarningLogger interface {
Logger

// Warningf logs a message at warning level.
Warningf(format string, args ...interface{})
}

func makeWarningLogger(l Logger) WarningLogger {
if wl, ok := l.(WarningLogger); ok {
return wl
}
return debugWarningLogger{Logger: l}
}

type debugWarningLogger struct {
Logger
}

func (l debugWarningLogger) Warningf(format string, args ...interface{}) {
l.Debugf(format, args...)
}
Loading

0 comments on commit 6d58ad2

Please sign in to comment.