Skip to content

Commit

Permalink
Stop pooling top-level objects
Browse files Browse the repository at this point in the history
Transaction, Span, and Error are no longer pooled.
Instead, new *Data types are introduced which are
pooled, and those are embedded within their related
top-level object types. We add a read-write mutex
to the Transaction and Span types to guard access
to those such that a concurrent StartSpan and
Transaction.End will not panic.

This change introduces an additional allocation for
each of StartTransaction, StartSpan, and NewError.
The time overhead (measured in nanoseconds) is not
enough to warrant living with the lack of safety.

It is now safe to call End() multiple times, and
to call StartSpan with an ended transaction or parent
span. When StartSpan is called with an ended
transaction, a dropped span will be created.
  • Loading branch information
axw committed Nov 19, 2018
1 parent c589eb1 commit a9fff64
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 185 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased](https://github.com/elastic/apm-agent-go/compare/v1.0.0...master)

- Stop pooling Transaction/Span/Error, introduce internal pooled objects (#319)

## [v1.0.0](https://github.com/elastic/apm-agent-go/releases/tag/v1.0.0)

- Implement v2 intake protocol (#180)
Expand Down
15 changes: 7 additions & 8 deletions docs/api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ transaction := apm.DefaultTracer.StartTransactionOptions("GET /", "request", opt
==== `func (*Transaction) End()`

End enqueues the transaction for sending to the Elastic APM server.
The Transaction must not be used after this.
The Transaction must not be modified after this.

The transaction's duration will be calculated as the amount of time
elapsed since the transaction was started until this call. To override
Expand Down Expand Up @@ -239,7 +239,7 @@ span, ctx := apm.StartSpan(ctx, "SELECT FROM foo", "db.mysql.query")
[[span-end]]
==== `func (*Span) End()`

End marks the span as complete; it must not be used after this.
End marks the span as complete. The Span must not be modified after this.

The span's duration will be calculated as the amount of time elapsed
since the span was started until this call. To override this behaviour,
Expand All @@ -250,8 +250,8 @@ the span's Duration field may be set before calling End.
==== `func (*Span) Dropped() bool`

Dropped indicates whether or not the span is dropped, meaning it will not be reported to
the APM server. Spans are dropped when the created via a nil or non-sampled transaction,
or one whose max spans limit has been reached.
the APM server. Spans are dropped when the created with a nil, ended, or non-sampled
transaction, or one whose max spans limit has been reached.

[float]
[[span-tracecontext]]
Expand Down Expand Up @@ -427,21 +427,20 @@ e.Send()
[[error-set-transaction]]
==== `func (*Error) SetTransaction(*Transaction)`

SetTransaction associates the error with the given transaction. The transaction's End method must
not yet have been called.
SetTransaction associates the error with the given transaction.

[float]
[[error-set-span]]
==== `func (*Error) SetSpan(*Span)`

SetSpan associates the error with the given span, and the span's transaction. When calling SetSpan,
it is not necessary to also call SetTransaction. The span's End method must not yet have been called.
it is not necessary to also call SetTransaction.

[float]
[[error-send]]
==== `func (*Error) Send()`

Send enqueues the error for sending to the Elastic APM server. The Error must not be used after this.
Send enqueues the error for sending to the Elastic APM server.

[float]
[[tracer-recovered]]
Expand Down
81 changes: 57 additions & 24 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,29 @@ func (t *Tracer) NewErrorLog(r ErrorLogRecord) *Error {

// newError returns a new Error associated with the Tracer.
func (t *Tracer) newError() *Error {
e, _ := t.errorPool.Get().(*Error)
e, _ := t.errorDataPool.Get().(*ErrorData)
if e == nil {
e = &Error{
e = &ErrorData{
tracer: t,
Context: Context{
captureBodyMask: CaptureBodyErrors,
},
}
}
e.Timestamp = time.Now()
return e
return &Error{ErrorData: e}
}

// Error describes an error occurring in the monitored service.
type Error struct {
// ErrorData holds the error data. This field is set to nil when
// the error's Send method is called.
*ErrorData
}

// ErrorData holds the details for an error, and is embedded inside Error.
// When the error is sent, its ErrorData field will be set to nil.
type ErrorData struct {
model model.Error
tracer *Tracer
stacktrace []stacktrace.Frame
Expand Down Expand Up @@ -163,42 +171,56 @@ type Error struct {

// SetTransaction sets TraceID, TransactionID, and ParentID to the transaction's IDs.
//
// This must be called before tx.End(). After SetTransaction returns, e may be sent
// and tx ended in either order.
// SetTransaction has no effect if called with an ended transaction.
func (e *Error) SetTransaction(tx *Transaction) {
e.TraceID = tx.traceContext.Trace
e.ParentID = tx.traceContext.Span
tx.mu.RLock()
if !tx.ended() {
e.setTransactionData(tx.TransactionData)
}
tx.mu.RUnlock()
}

func (e *Error) setTransactionData(td *TransactionData) {
e.TraceID = td.traceContext.Trace
e.ParentID = td.traceContext.Span
e.TransactionID = e.ParentID
}

// SetSpan sets TraceID, TransactionID, and ParentID to the span's IDs. If you call
// this, it is not necessary to call SetTransaction.
//
// This must be called before s.End(). After SetSpanreturns, e may be sent and e ended
// in either order.
// SetSpan has no effect if called with an ended span.
func (e *Error) SetSpan(s *Span) {
s.mu.RLock()
if !s.ended() {
e.setSpanData(s.SpanData)
}
s.mu.RUnlock()
}

func (e *Error) setSpanData(s *SpanData) {
e.TraceID = s.traceContext.Trace
e.ParentID = s.traceContext.Span
e.TransactionID = s.transactionID
}

func (e *Error) reset() {
*e = Error{
tracer: e.tracer,
stacktrace: e.stacktrace[:0],
modelStacktrace: e.modelStacktrace[:0],
Context: e.Context,
}
e.Context.reset()
e.tracer.errorPool.Put(e)
}

// Send enqueues the error for sending to the Elastic APM server.
// The Error must not be used after this.
//
// Send will set e.ErrorData to nil, so the error must not be
// modified after Send returns.
func (e *Error) Send() {
if e == nil {
if e == nil || e.sent() {
return
}
e.ErrorData.enqueue()
e.ErrorData = nil
}

func (e *Error) sent() bool {
return e.ErrorData == nil
}

func (e *ErrorData) enqueue() {
select {
case e.tracer.errors <- e:
default:
Expand All @@ -210,7 +232,18 @@ func (e *Error) Send() {
}
}

func (e *Error) setStacktrace() {
func (e *ErrorData) reset() {
*e = ErrorData{
tracer: e.tracer,
stacktrace: e.stacktrace[:0],
modelStacktrace: e.modelStacktrace[:0],
Context: e.Context,
}
e.Context.reset()
e.tracer.errorDataPool.Put(e)
}

func (e *ErrorData) setStacktrace() {
if len(e.stacktrace) == 0 {
return
}
Expand All @@ -219,7 +252,7 @@ func (e *Error) setStacktrace() {
e.model.Exception.Stacktrace = e.modelStacktrace
}

func (e *Error) setCulprit() {
func (e *ErrorData) setCulprit() {
if e.Culprit != "" {
e.model.Culprit = e.Culprit
} else if e.modelStacktrace != nil {
Expand Down
30 changes: 19 additions & 11 deletions gocontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,31 @@ func StartSpanOptions(ctx context.Context, name, spanType string, opts SpanOptio
// from err. The Error.Handled field will be set to true, and a stacktrace
// set either from err, or from the caller.
//
// If there is no span or transaction in the context, or the transaction
// is not being sampled, CaptureError returns nil. As a convenience, if
// the provided error is nil, then CaptureError will also return nil.
// If there is no span or transaction in the context, CaptureError returns
// nil. As a convenience, if the provided error is nil, then CaptureError
// will also return nil.
func CaptureError(ctx context.Context, err error) *Error {
if err == nil {
return nil
}
var e *Error
if span := SpanFromContext(ctx); span != nil {
e = span.tracer.NewError(err)
e.SetSpan(span)
} else if tx := TransactionFromContext(ctx); tx != nil && tx.Sampled() {
e = tx.tracer.NewError(err)
e.SetTransaction(tx)
} else {
return nil
span.mu.RLock()
if !span.ended() {
e = span.tracer.NewError(err)
e.setSpanData(span.SpanData)
}
span.mu.RUnlock()
} else if tx := TransactionFromContext(ctx); tx != nil {
tx.mu.RLock()
if !tx.ended() {
e = tx.tracer.NewError(err)
e.setTransactionData(tx.TransactionData)
}
tx.mu.RUnlock()
}
if e != nil {
e.Handled = true
}
e.Handled = true
return e
}
67 changes: 67 additions & 0 deletions gocontext_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package apm_test

import (
"context"
"sync"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"go.elastic.co/apm"
"go.elastic.co/apm/transport/transporttest"
)

func TestContextStartSpanTransactionEnded(t *testing.T) {
tracer, err := apm.NewTracer("tracer_testing", "")
assert.NoError(t, err)
defer tracer.Close()
tracer.Transport = transporttest.Discard

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
tx := tracer.StartTransaction("name", "type")
ctx := apm.ContextWithTransaction(context.Background(), tx)
tx.End()
apm.CaptureError(ctx, errors.New("boom")).Send()
span, _ := apm.StartSpan(ctx, "name", "type")
assert.True(t, span.Dropped())
span.End()
}
}()
}
tracer.Flush(nil)
wg.Wait()
}

func TestContextStartSpanSpanEnded(t *testing.T) {
tracer, err := apm.NewTracer("tracer_testing", "")
assert.NoError(t, err)
defer tracer.Close()
tracer.Transport = transporttest.Discard

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
tx := tracer.StartTransaction("name", "type")
ctx := apm.ContextWithTransaction(context.Background(), tx)
span1, ctx := apm.StartSpan(ctx, "name", "type")
span1.End()
apm.CaptureError(ctx, errors.New("boom")).Send()
span2, _ := apm.StartSpan(ctx, "name", "type")
assert.True(t, span2.Dropped())
span2.End()
tx.End()
}
}()
}
tracer.Flush(nil)
wg.Wait()
}
14 changes: 7 additions & 7 deletions modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type modelWriter struct {
}

// writeTransaction encodes tx as JSON to the buffer, and then resets tx.
func (w *modelWriter) writeTransaction(tx *Transaction) {
func (w *modelWriter) writeTransaction(tx *TransactionData) {
var modelTx model.Transaction
w.buildModelTransaction(&modelTx, tx)
w.json.RawString(`{"transaction":`)
Expand All @@ -39,7 +39,7 @@ func (w *modelWriter) writeTransaction(tx *Transaction) {
}

// writeSpan encodes s as JSON to the buffer, and then resets s.
func (w *modelWriter) writeSpan(s *Span) {
func (w *modelWriter) writeSpan(s *SpanData) {
var modelSpan model.Span
w.buildModelSpan(&modelSpan, s)
w.json.RawString(`{"span":`)
Expand All @@ -51,7 +51,7 @@ func (w *modelWriter) writeSpan(s *Span) {
}

// writeError encodes e as JSON to the buffer, and then resets e.
func (w *modelWriter) writeError(e *Error) {
func (w *modelWriter) writeError(e *ErrorData) {
w.buildModelError(e)
w.json.RawString(`{"error":`)
e.model.MarshalFastJSON(&w.json)
Expand All @@ -73,7 +73,7 @@ func (w *modelWriter) writeMetrics(m *Metrics) {
m.reset()
}

func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transaction) {
func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *TransactionData) {
out.ID = model.SpanID(tx.traceContext.Span)
out.TraceID = model.TraceID(tx.traceContext.Trace)
out.ParentID = model.SpanID(tx.parentSpan)
Expand All @@ -86,7 +86,7 @@ func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transact
out.SpanCount.Started = tx.spansCreated
out.SpanCount.Dropped = tx.spansDropped

if !tx.Sampled() {
if !tx.traceContext.Options.Recorded() {
out.Sampled = &notSampled
}

Expand All @@ -101,7 +101,7 @@ func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transact
}
}

func (w *modelWriter) buildModelSpan(out *model.Span, span *Span) {
func (w *modelWriter) buildModelSpan(out *model.Span, span *SpanData) {
w.modelStacktrace = w.modelStacktrace[:0]
out.ID = model.SpanID(span.traceContext.Span)
out.TraceID = model.TraceID(span.traceContext.Trace)
Expand All @@ -119,7 +119,7 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span) {
w.setStacktraceContext(out.Stacktrace)
}

func (w *modelWriter) buildModelError(e *Error) {
func (w *modelWriter) buildModelError(e *ErrorData) {
// TODO(axw) move the model type outside of Error
e.model.ID = model.TraceID(e.ID)
e.model.TraceID = model.TraceID(e.TraceID)
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req = &reqCopy

traceContext := tx.TraceContext()
if !tx.Sampled() {
if !traceContext.Options.Recorded() {
req.Header.Set(TraceparentHeader, FormatTraceparentHeader(traceContext))
return r.r.RoundTrip(req)
}
Expand Down
Loading

0 comments on commit a9fff64

Please sign in to comment.