Skip to content

Commit

Permalink
optimise breakdownMetrics.recordTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jun 18, 2019
1 parent 4ebd9f6 commit 419c38b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
83 changes: 58 additions & 25 deletions breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package apm
import (
"fmt"
"sync"
"sync/atomic"
"time"

"go.elastic.co/apm/internal/wildcard"
Expand Down Expand Up @@ -66,8 +67,8 @@ type spanTimingsKey struct {
// has occurred (within the context of a transaction group), along with
// the sum of the span durations.
type spanTiming struct {
count int
duration time.Duration
duration int64
count uintptr
}

// spanTimingsMap records span timings for a transaction group.
Expand All @@ -78,7 +79,7 @@ func (m spanTimingsMap) add(spanType, spanSubtype string, d time.Duration) {
k := spanTimingsKey{spanType: spanType, spanSubtype: spanSubtype}
timing := m[k]
timing.count++
timing.duration += d
timing.duration += int64(d)
m[k] = timing
}

Expand All @@ -99,8 +100,8 @@ type breakdownMetricsKey struct {

func newBreakdownMetrics() *breakdownMetrics {
return &breakdownMetrics{
active: make(map[breakdownMetricsKey]breakdownTiming),
inactive: make(map[breakdownMetricsKey]breakdownTiming),
active: make(map[breakdownMetricsKey]*breakdownTiming),
inactive: make(map[breakdownMetricsKey]*breakdownTiming),
}
}

Expand All @@ -114,9 +115,11 @@ func newBreakdownMetrics() *breakdownMetrics {
type breakdownMetrics struct {
flags breakdownMetricsFlags

// mu serializes updates to active, and swapping of active/inactive.
mu sync.RWMutex
active, inactive map[breakdownMetricsKey]breakdownTiming
// mu serializes access to active/inactive, but not concurrent updates to active.
mu sync.RWMutex
// writeMu serializes updates to active.
writeMu sync.RWMutex
active, inactive map[breakdownMetricsKey]*breakdownTiming
}

// breakdownTiming holds breakdown metrics.
Expand All @@ -127,7 +130,7 @@ type breakdownTiming struct {
// breakdownCount records the number of transactions for which we
// have calculated breakdown metrics. If breakdown metrics are
// enabled, this will be equal transaction.count.
breakdownCount int
breakdownCount uintptr

// span holds the "span.self_time" metric values.
span spanTiming
Expand All @@ -139,8 +142,10 @@ type breakdownTiming struct {
// completely recorded, and false if any metrics were not
// recorded due to the limit being reached.
func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {
m.mu.Lock()
defer m.mu.Unlock()
// TODO(axw) consider using the HdrHistogram WriterReaderPhaser approach
// to avoid contention with the metrics gatherer.
m.mu.RLock()
defer m.mu.RUnlock()

k := breakdownMetricsKey{
transactionType: td.Type,
Expand All @@ -150,17 +155,17 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {

var breakdownCount int
var transactionSpanTiming spanTiming
var transactionDuration = spanTiming{count: 1, duration: td.Duration}
var transactionDuration = spanTiming{count: 1, duration: int64(td.Duration)}
if td.breakdownMetricsEnabled {
breakdownCount = 1
endTime := td.timestamp.Add(td.Duration)
transactionSelfTime := td.Duration - td.childrenTimer.finalDuration(endTime)
transactionSpanTiming = spanTiming{count: 1, duration: transactionSelfTime}
transactionSpanTiming = spanTiming{count: 1, duration: int64(transactionSelfTime)}
}

if !m.record(k, breakdownTiming{
transaction: transactionDuration,
breakdownCount: breakdownCount,
breakdownCount: uintptr(breakdownCount),
span: transactionSpanTiming,
}) {
// We couldn't record the transaction's metricset, so we won't
Expand All @@ -177,16 +182,34 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {
}

func (m *breakdownMetrics) record(k breakdownMetricsKey, bt breakdownTiming) bool {
m.writeMu.RLock()
timing, ok := m.active[k]
if !ok && len(m.active) >= breakdownMetricsLimit {
m.writeMu.RUnlock()
return false
}
timing.transaction.count += bt.transaction.count
timing.transaction.duration += bt.transaction.duration
timing.breakdownCount += bt.breakdownCount
timing.span.count += bt.span.count
timing.span.duration += bt.span.duration
m.active[k] = timing
m.writeMu.RUnlock()

if !ok {
m.writeMu.Lock()
timing, ok = m.active[k]
if !ok {
if len(m.active) >= breakdownMetricsLimit {
m.writeMu.Unlock()
return false
}
m.active[k] = &bt
m.writeMu.Unlock()
return true
}
m.writeMu.Unlock()
}

atomic.AddUintptr(&timing.transaction.count, bt.transaction.count)
atomic.AddInt64(&timing.transaction.duration, bt.transaction.duration)
atomic.AddUintptr(&timing.span.count, bt.span.count)
atomic.AddInt64(&timing.span.duration, bt.span.duration)
atomic.AddUintptr(&timing.breakdownCount, bt.breakdownCount)
return true
}

Expand All @@ -207,9 +230,15 @@ func (m *breakdownMetrics) gather(out *Metrics) {
TransactionType: k.transactionType,
TransactionName: k.transactionName,
Samples: map[string]model.Metric{
transactionDurationCountMetricName: {Value: float64(d.transaction.count)},
transactionDurationSumMetricName: {Value: d.transaction.duration.Seconds()},
transactionBreakdownCountMetricName: {Value: float64(d.breakdownCount)},
transactionDurationCountMetricName: {
Value: float64(d.transaction.count),
},
transactionDurationSumMetricName: {
Value: time.Duration(d.transaction.duration).Seconds(),
},
transactionBreakdownCountMetricName: {
Value: float64(d.breakdownCount),
},
},
})
}
Expand All @@ -220,8 +249,12 @@ func (m *breakdownMetrics) gather(out *Metrics) {
SpanType: k.spanType,
SpanSubtype: k.spanSubtype,
Samples: map[string]model.Metric{
spanSelfTimeCountMetricName: {Value: float64(d.span.count)},
spanSelfTimeSumMetricName: {Value: d.span.duration.Seconds()},
spanSelfTimeCountMetricName: {
Value: float64(d.span.count),
},
spanSelfTimeSumMetricName: {
Value: time.Duration(d.span.duration).Seconds(),
},
},
})
}
Expand Down
2 changes: 2 additions & 0 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func (tx *Transaction) enqueue() {
default:
// Enqueuing a transaction should never block.
tx.tracer.breakdownMetrics.recordTransaction(tx.TransactionData)

// TODO(axw) use an atomic operation to increment.
tx.tracer.statsMu.Lock()
tx.tracer.stats.TransactionsDropped++
tx.tracer.statsMu.Unlock()
Expand Down
32 changes: 32 additions & 0 deletions transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package apm_test

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.elastic.co/apm"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport"
"go.elastic.co/apm/transport/transporttest"
)

Expand Down Expand Up @@ -135,6 +140,33 @@ func TestTransactionEnsureParent(t *testing.T) {
assert.Equal(t, model.SpanID(parentSpan), payloads.Transactions[0].ParentID)
}

func BenchmarkTransaction(b *testing.B) {
tracer, err := apm.NewTracer("service", "")
require.NoError(b, err)

tracer.Transport = transport.Discard
defer tracer.Close()

names := []string{}
for i := 0; i < 100; i++ {
names = append(names, fmt.Sprintf("/some/route/%d", i))
}

var mu sync.Mutex
globalRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
mu.Lock()
rand := rand.New(rand.NewSource(globalRand.Int63()))
mu.Unlock()
for pb.Next() {
tx := tracer.StartTransaction(names[rand.Intn(len(names))], "type")
tx.End()
}
})
}

type samplerFunc func(apm.TraceContext) bool

func (f samplerFunc) Sample(t apm.TraceContext) bool {
Expand Down

0 comments on commit 419c38b

Please sign in to comment.