diff --git a/breakdown.go b/breakdown.go index 16eef6b6a..5e6175109 100644 --- a/breakdown.go +++ b/breakdown.go @@ -20,6 +20,7 @@ package apm import ( "fmt" "sync" + "sync/atomic" "time" "go.elastic.co/apm/internal/wildcard" @@ -66,8 +67,8 @@ type spanTimingsKey struct { // has occurred (within the context of a transaction group), along with // the sum of the span durations. type spanTiming struct { - count int - duration time.Duration + duration int64 + count uintptr } // spanTimingsMap records span timings for a transaction group. @@ -78,7 +79,7 @@ func (m spanTimingsMap) add(spanType, spanSubtype string, d time.Duration) { k := spanTimingsKey{spanType: spanType, spanSubtype: spanSubtype} timing := m[k] timing.count++ - timing.duration += d + timing.duration += int64(d) m[k] = timing } @@ -99,8 +100,8 @@ type breakdownMetricsKey struct { func newBreakdownMetrics() *breakdownMetrics { return &breakdownMetrics{ - active: make(map[breakdownMetricsKey]breakdownTiming), - inactive: make(map[breakdownMetricsKey]breakdownTiming), + active: make(map[breakdownMetricsKey]*breakdownTiming), + inactive: make(map[breakdownMetricsKey]*breakdownTiming), } } @@ -114,9 +115,11 @@ func newBreakdownMetrics() *breakdownMetrics { type breakdownMetrics struct { flags breakdownMetricsFlags - // mu serializes updates to active, and swapping of active/inactive. - mu sync.RWMutex - active, inactive map[breakdownMetricsKey]breakdownTiming + // mu serializes access to active/inactive, but not concurrent updates to active. + mu sync.RWMutex + // writeMu serializes updates to active. + writeMu sync.RWMutex + active, inactive map[breakdownMetricsKey]*breakdownTiming } // breakdownTiming holds breakdown metrics. @@ -127,7 +130,7 @@ type breakdownTiming struct { // breakdownCount records the number of transactions for which we // have calculated breakdown metrics. If breakdown metrics are // enabled, this will be equal transaction.count. - breakdownCount int + breakdownCount uintptr // span holds the "span.self_time" metric values. span spanTiming @@ -139,8 +142,10 @@ type breakdownTiming struct { // completely recorded, and false if any metrics were not // recorded due to the limit being reached. func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { - m.mu.Lock() - defer m.mu.Unlock() + // TODO(axw) consider using the HdrHistogram WriterReaderPhaser approach + // to avoid contention with the metrics gatherer. + m.mu.RLock() + defer m.mu.RUnlock() k := breakdownMetricsKey{ transactionType: td.Type, @@ -150,17 +155,17 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { var breakdownCount int var transactionSpanTiming spanTiming - var transactionDuration = spanTiming{count: 1, duration: td.Duration} + var transactionDuration = spanTiming{count: 1, duration: int64(td.Duration)} if td.breakdownMetricsEnabled { breakdownCount = 1 endTime := td.timestamp.Add(td.Duration) transactionSelfTime := td.Duration - td.childrenTimer.finalDuration(endTime) - transactionSpanTiming = spanTiming{count: 1, duration: transactionSelfTime} + transactionSpanTiming = spanTiming{count: 1, duration: int64(transactionSelfTime)} } if !m.record(k, breakdownTiming{ transaction: transactionDuration, - breakdownCount: breakdownCount, + breakdownCount: uintptr(breakdownCount), span: transactionSpanTiming, }) { // We couldn't record the transaction's metricset, so we won't @@ -177,16 +182,34 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { } func (m *breakdownMetrics) record(k breakdownMetricsKey, bt breakdownTiming) bool { + m.writeMu.RLock() timing, ok := m.active[k] if !ok && len(m.active) >= breakdownMetricsLimit { + m.writeMu.RUnlock() return false } - timing.transaction.count += bt.transaction.count - timing.transaction.duration += bt.transaction.duration - timing.breakdownCount += bt.breakdownCount - timing.span.count += bt.span.count - timing.span.duration += bt.span.duration - m.active[k] = timing + m.writeMu.RUnlock() + + if !ok { + m.writeMu.Lock() + timing, ok = m.active[k] + if !ok { + if len(m.active) >= breakdownMetricsLimit { + m.writeMu.Unlock() + return false + } + m.active[k] = &bt + m.writeMu.Unlock() + return true + } + m.writeMu.Unlock() + } + + atomic.AddUintptr(&timing.transaction.count, bt.transaction.count) + atomic.AddInt64(&timing.transaction.duration, bt.transaction.duration) + atomic.AddUintptr(&timing.span.count, bt.span.count) + atomic.AddInt64(&timing.span.duration, bt.span.duration) + atomic.AddUintptr(&timing.breakdownCount, bt.breakdownCount) return true } @@ -207,9 +230,15 @@ func (m *breakdownMetrics) gather(out *Metrics) { TransactionType: k.transactionType, TransactionName: k.transactionName, Samples: map[string]model.Metric{ - transactionDurationCountMetricName: {Value: float64(d.transaction.count)}, - transactionDurationSumMetricName: {Value: d.transaction.duration.Seconds()}, - transactionBreakdownCountMetricName: {Value: float64(d.breakdownCount)}, + transactionDurationCountMetricName: { + Value: float64(d.transaction.count), + }, + transactionDurationSumMetricName: { + Value: time.Duration(d.transaction.duration).Seconds(), + }, + transactionBreakdownCountMetricName: { + Value: float64(d.breakdownCount), + }, }, }) } @@ -220,8 +249,12 @@ func (m *breakdownMetrics) gather(out *Metrics) { SpanType: k.spanType, SpanSubtype: k.spanSubtype, Samples: map[string]model.Metric{ - spanSelfTimeCountMetricName: {Value: float64(d.span.count)}, - spanSelfTimeSumMetricName: {Value: d.span.duration.Seconds()}, + spanSelfTimeCountMetricName: { + Value: float64(d.span.count), + }, + spanSelfTimeSumMetricName: { + Value: time.Duration(d.span.duration).Seconds(), + }, }, }) } diff --git a/transaction.go b/transaction.go index 7b312cc2e..605536cf4 100644 --- a/transaction.go +++ b/transaction.go @@ -245,6 +245,8 @@ func (tx *Transaction) enqueue() { default: // Enqueuing a transaction should never block. tx.tracer.breakdownMetrics.recordTransaction(tx.TransactionData) + + // TODO(axw) use an atomic operation to increment. tx.tracer.statsMu.Lock() tx.tracer.stats.TransactionsDropped++ tx.tracer.statsMu.Unlock() diff --git a/transaction_test.go b/transaction_test.go index 386b78130..e929d6ff9 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -18,13 +18,18 @@ package apm_test import ( + "fmt" + "math/rand" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/apm" "go.elastic.co/apm/model" + "go.elastic.co/apm/transport" "go.elastic.co/apm/transport/transporttest" ) @@ -135,6 +140,33 @@ func TestTransactionEnsureParent(t *testing.T) { assert.Equal(t, model.SpanID(parentSpan), payloads.Transactions[0].ParentID) } +func BenchmarkTransaction(b *testing.B) { + tracer, err := apm.NewTracer("service", "") + require.NoError(b, err) + + tracer.Transport = transport.Discard + defer tracer.Close() + + names := []string{} + for i := 0; i < 100; i++ { + names = append(names, fmt.Sprintf("/some/route/%d", i)) + } + + var mu sync.Mutex + globalRand := rand.New(rand.NewSource(time.Now().UnixNano())) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + mu.Lock() + rand := rand.New(rand.NewSource(globalRand.Int63())) + mu.Unlock() + for pb.Next() { + tx := tracer.StartTransaction(names[rand.Intn(len(names))], "type") + tx.End() + } + }) +} + type samplerFunc func(apm.TraceContext) bool func (f samplerFunc) Sample(t apm.TraceContext) bool {