From 6d58ad2a1053c203154146c8745a5c671660a94c Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 17 Jun 2019 17:10:16 +0800 Subject: [PATCH] WIP --- breakdown.go | 36 ++++++++++++++- breakdown_test.go | 96 ++++++++++++++++++++++++++++++++------- env.go | 2 +- internal/apmlog/logger.go | 6 +++ logger.go | 26 +++++++++++ tracer.go | 26 +++++++---- transaction.go | 1 + 7 files changed, 163 insertions(+), 30 deletions(-) diff --git a/breakdown.go b/breakdown.go index 8c8575644..bcd1d59d6 100644 --- a/breakdown.go +++ b/breakdown.go @@ -18,6 +18,8 @@ package apm import ( + "fmt" + "sync" "time" "go.elastic.co/apm/model" @@ -35,6 +37,14 @@ const ( appSpanType = "app" ) +var ( + breakdownMetricsLimitWarning = fmt.Sprintf(` +The limit of %d breakdown metricsets has been reached, no new metricsets will be created. +Try to name your transactions so that there are less distinct transaction names.`[1:], + breakdownMetricsLimit, + ) +) + // spanTimingsKey identifies a span type and subtype, for use as the key in // spanTimingsMap. type spanTimingsKey struct { @@ -88,7 +98,12 @@ func newBreakdownMetrics() *breakdownMetrics { // accumulates new breakdown metrics, and is swapped with the "inactive" map // just prior to when metrics gathering begins. When metrics gathering // completes, the inactive map will be empty. +// +// breakdownMetrics may be written to concurrently by the tracer, and any +// number of other goroutines when a transaction cannot be enqueued. type breakdownMetrics struct { + // mu serializes updates to active, and swapping of active/inactive. + mu sync.RWMutex active, inactive map[breakdownMetricsKey]breakdownTiming } @@ -107,7 +122,14 @@ type breakdownTiming struct { } // recordTransaction records breakdown metrics for td into m. +// +// recordTransaction returns true if breakdown metrics were +// completely recorded, and false if any metrics were not +// recorded due to the limit being reached. func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { + m.mu.Lock() + defer m.mu.Unlock() + k := breakdownMetricsKey{ transactionType: td.Type, transactionName: td.Name, @@ -121,11 +143,17 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { endTime := td.timestamp.Add(td.Duration) transactionSelfTime := td.Duration - td.childrenTimer.finalDuration(endTime) - ok := m.record(k, breakdownTiming{ + if !m.record(k, breakdownTiming{ transaction: spanTiming{count: 1, duration: td.Duration}, breakdownCount: breakdownCount, span: spanTiming{count: 1, duration: transactionSelfTime}, - }) + }) { + // We couldn't record the transaction's metricset, so we won't + // be able to record spans for that transaction either. + return false + } + + ok := true for sk, timing := range td.spanTimings { k.spanTimingsKey = sk ok = ok && m.record(k, breakdownTiming{span: timing}) @@ -149,11 +177,15 @@ func (m *breakdownMetrics) record(k breakdownMetricsKey, bt breakdownTiming) boo func (m *breakdownMetrics) swap() { // m.inactive is empty, having been cleared by m.gather. + m.mu.Lock() m.active, m.inactive = m.inactive, m.active + m.mu.Unlock() } // gather is called by builtinMetricsGatherer to gather breakdown metrics. func (m *breakdownMetrics) gather(out *Metrics) { + // It is not necessary to hold m.mu, since nothing concurrently + // accesses m.inactive while the gatherer is iterating over it. for k, d := range m.inactive { if d.transaction.count > 0 { out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{ diff --git a/breakdown_test.go b/breakdown_test.go index 28d5500bb..72ce80379 100644 --- a/breakdown_test.go +++ b/breakdown_test.go @@ -1,7 +1,25 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package apm_test import ( "context" + "fmt" "testing" "time" @@ -34,7 +52,7 @@ func TestBreakdownMetrics_NonSampled(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -65,12 +83,56 @@ func TestBreakdownMetrics_SpanDropped(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } +func TestBreakdownMetrics_MetricsLimit(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + for i := 0; i < 3000; i++ { + tx := tracer.StartTransaction(fmt.Sprintf("%d", i), "request") + tx.End() + } + tracer.Flush(nil) + tracer.SendMetrics(nil) + + // There should be 1000 breakdown metrics keys buckets retained + // in-memory. Transaction count and duration metrics piggy-back + // on the self_time for the "app" bucket, so some of those buckets + // may generate multiple metricsets on the wire. + metrics := payloadsBreakdownMetrics(transport) + assert.Len(t, metrics, 2000) +} + +func TestBreakdownMetrics_TransactionDropped(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + // Send transactions until one gets dropped, so we can check that + // the dropped transactions are included in the breakdown. + var count int + for tracer.Stats().TransactionsDropped == 0 { + for i := 0; i < 1000; i++ { + tx := tracer.StartTransaction("test", "request") + tx.Duration = 10 * time.Millisecond + tx.End() + count++ + } + } + + tracer.Flush(nil) + tracer.SendMetrics(nil) + + assertBreakdownMetrics(t, []model.Metrics{ + transactionDurationMetrics("test", "request", count, time.Duration(count)*10*time.Millisecond), + spanSelfTimeMetrics("test", "request", "app", "", count, time.Duration(count)*10*time.Millisecond), + }, payloadsBreakdownMetrics(transport)) +} + // total self type // ██████████████████████████████ 30 30 transaction // 10 20 30 @@ -86,7 +148,7 @@ func TestBreakdownMetrics_AcceptanceTest1(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 30*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } @@ -111,7 +173,7 @@ func TestBreakdownMetrics_AcceptanceTest2(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -139,7 +201,7 @@ func TestBreakdownMetrics_AcceptanceTest3(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 2, 30*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } @@ -168,7 +230,7 @@ func TestBreakdownMetrics_AcceptanceTest4(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -198,7 +260,7 @@ func TestBreakdownMetrics_AcceptanceTest5(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 15*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -228,7 +290,7 @@ func TestBreakdownMetrics_AcceptanceTest6(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 20*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -258,7 +320,7 @@ func TestBreakdownMetrics_AcceptanceTest7(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 2, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -290,7 +352,7 @@ func TestBreakdownMetrics_AcceptanceTest8(t *testing.T) { tracer.SendMetrics(nil) assertBreakdownMetrics(t, []model.Metrics{ - transactionDurationMetrics("test", "request", 30*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 30*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 2, 25*time.Millisecond), spanSelfTimeMetrics("test", "request", "db", "mysql", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) @@ -327,7 +389,7 @@ func TestBreakdownMetrics_AcceptanceTest9(t *testing.T) { // "app" span should not be included in the self_time value, // it should only have been used for subtracting from the // transaction's duration. - transactionDurationMetrics("test", "request", 20*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } @@ -354,7 +416,7 @@ func TestBreakdownMetrics_AcceptanceTest10(t *testing.T) { assertBreakdownMetrics(t, []model.Metrics{ // The db.mysql span should not be included in breakdown, // as it ended after the transaction ended. - transactionDurationMetrics("test", "request", 20*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 20*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } @@ -381,19 +443,19 @@ func TestBreakdownMetrics_AcceptanceTest11(t *testing.T) { assertBreakdownMetrics(t, []model.Metrics{ // The db.mysql span should not be included in breakdown, // as it started and ended after the transaction ended. - transactionDurationMetrics("test", "request", 10*time.Millisecond), + transactionDurationMetrics("test", "request", 1, 10*time.Millisecond), spanSelfTimeMetrics("test", "request", "app", "", 1, 10*time.Millisecond), }, payloadsBreakdownMetrics(transport)) } -func transactionDurationMetrics(txName, txType string, d time.Duration) model.Metrics { +func transactionDurationMetrics(txName, txType string, count int, sum time.Duration) model.Metrics { return model.Metrics{ TransactionType: txType, TransactionName: txName, Samples: map[string]model.Metric{ - "transaction.breakdown.count": {Value: 1}, - "transaction.duration.count": {Value: 1}, - "transaction.duration.sum": {Value: d.Seconds()}, + "transaction.breakdown.count": {Value: float64(count)}, + "transaction.duration.count": {Value: float64(count)}, + "transaction.duration.sum": {Value: sum.Seconds()}, }, } } diff --git a/env.go b/env.go index efdc62c7e..f4c6624b5 100644 --- a/env.go +++ b/env.go @@ -55,7 +55,7 @@ const ( defaultAPIRequestSize = 750 * apmconfig.KByte defaultAPIRequestTime = 10 * time.Second defaultAPIBufferSize = 1 * apmconfig.MByte - defaultMetricsBufferSize = 100 * apmconfig.KByte + defaultMetricsBufferSize = 750 * apmconfig.KByte defaultMetricsInterval = 30 * time.Second defaultMaxSpans = 500 defaultCaptureHeaders = true diff --git a/internal/apmlog/logger.go b/internal/apmlog/logger.go index e9284337d..d5cb16893 100644 --- a/internal/apmlog/logger.go +++ b/internal/apmlog/logger.go @@ -119,6 +119,7 @@ func parseLogLevel(s string) (logLevel, error) { type Logger interface { Debugf(format string, args ...interface{}) Errorf(format string, args ...interface{}) + Warningf(format string, args ...interface{}) } type levelLogger struct { @@ -136,6 +137,11 @@ func (l levelLogger) Errorf(format string, args ...interface{}) { l.logf(errorLevel, format, args...) } +// Warningf logs a message with log.Printf, with a WARNING prefix. +func (l levelLogger) Warningf(format string, args ...interface{}) { + l.logf(warnLevel, format, args...) +} + func (l levelLogger) logf(level logLevel, format string, args ...interface{}) { if level < l.level { return diff --git a/logger.go b/logger.go index f51c7f5d5..8e30e5918 100644 --- a/logger.go +++ b/logger.go @@ -26,3 +26,29 @@ type Logger interface { // Errorf logs a message at error level. Errorf(format string, args ...interface{}) } + +// WarningLogger extends Logger with a Warningf method. +// +// TODO(axw) this will be removed in v2.0.0, and the +// Warningf method will be added directly to Logger. +type WarningLogger interface { + Logger + + // Warningf logs a message at warning level. + Warningf(format string, args ...interface{}) +} + +func makeWarningLogger(l Logger) WarningLogger { + if wl, ok := l.(WarningLogger); ok { + return wl + } + return debugWarningLogger{Logger: l} +} + +type debugWarningLogger struct { + Logger +} + +func (l debugWarningLogger) Warningf(format string, args ...interface{}) { + l.Debugf(format, args...) +} diff --git a/tracer.go b/tracer.go index b17931fc6..d9ab75359 100644 --- a/tracer.go +++ b/tracer.go @@ -326,7 +326,7 @@ type tracerConfig struct { requestSize int requestDuration time.Duration metricsInterval time.Duration - logger Logger + logger WarningLogger metricsGatherers []MetricsGatherer contextSetter stacktrace.ContextSetter preContext, postContext int @@ -398,12 +398,15 @@ func (t *Tracer) SetContextSetter(setter stacktrace.ContextSetter) { // SetLogger sets the Logger to be used for logging the operation of // the tracer. // +// If logger implements WarningLogger, its Warningf method will be used +// for logging warnings. Otherwise, warnings will logged using Debugf. +// // The tracer is initialized with a default logger configured with the // environment variables ELASTIC_APM_LOG_FILE and ELASTIC_APM_LOG_LEVEL. // Calling SetLogger will replace the default logger. func (t *Tracer) SetLogger(logger Logger) { t.sendConfigCommand(func(cfg *tracerConfig) { - cfg.logger = logger + cfg.logger = makeWarningLogger(logger) }) } @@ -577,6 +580,7 @@ func (t *Tracer) loop() { } }() + var breakdownMetricsLimitWarningLogged bool var stats TracerStats var metrics Metrics var sentMetrics chan<- struct{} @@ -644,10 +648,11 @@ func (t *Tracer) loop() { case event := <-t.events: switch event.eventType { case transactionEvent: - if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) && cfg.logger != nil { - // TODO(axw) use exact message from breakdown doc - // TODO(axw) introduce Logger.Warningf - cfg.logger.Debugf("could not record breakdown metrics for transaction") + if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) { + if !breakdownMetricsLimitWarningLogged && cfg.logger != nil { + cfg.logger.Warningf("%s", breakdownMetricsLimitWarning) + breakdownMetricsLimitWarningLogged = true + } } modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData) case spanEvent: @@ -685,10 +690,11 @@ func (t *Tracer) loop() { event := <-t.events switch event.eventType { case transactionEvent: - if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) && cfg.logger != nil { - // TODO(axw) use exact message from breakdown doc - // TODO(axw) introduce Logger.Warningf - cfg.logger.Debugf("could not record breakdown metrics for transaction") + if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) { + if !breakdownMetricsLimitWarningLogged && cfg.logger != nil { + cfg.logger.Warningf("%s", breakdownMetricsLimitWarning) + breakdownMetricsLimitWarningLogged = true + } } modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData) case spanEvent: diff --git a/transaction.go b/transaction.go index 96ace8bf3..108935e9e 100644 --- a/transaction.go +++ b/transaction.go @@ -245,6 +245,7 @@ func (tx *Transaction) enqueue() { case tx.tracer.events <- event: default: // Enqueuing a transaction should never block. + tx.tracer.breakdownMetrics.recordTransaction(tx.TransactionData) tx.tracer.statsMu.Lock() tx.tracer.stats.TransactionsDropped++ tx.tracer.statsMu.Unlock()