Skip to content

Commit

Permalink
ddtrace/tracer: clean up resources in tests (#1643)
Browse files Browse the repository at this point in the history
Tests have been failing to clean up after themselves and causing memory
usage issues during unit test runs.

One cause of this is failing to shut down a tracer after starting it,
which is solved simply by defering tracer.Close() for those tests.

Other tests use newConfig() to generate a tracer config, which they use
for various purposes. Unfortunately newConfig() actually creates a statsd
client for the new config, which needs to be Close()'d in order to
release its resources. It doesn't make much sense to need to Close() a
config, or have to Close() something *in* a config, so this problem
requires a more complicated solution. Instead, the config still
calculates the statsd address, but the actual instantiation of the statsd
client is moved into newTracer, where the worker routines and other
things that require cleanup are started.

This unfortunately requires several other pieces of code to keep a new
reference to the statsd client, rather than just to the config as they
did previously.
  • Loading branch information
knusbaum committed Dec 29, 2022
1 parent f7a009a commit cb93051
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 60 deletions.
8 changes: 4 additions & 4 deletions ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *tracer) reportRuntimeMetrics(interval time.Duration) {
runtime.ReadMemStats(&ms)
debug.ReadGCStats(&gc)

statsd := t.config.statsd
statsd := t.statsd
// CPU statistics
statsd.Gauge("runtime.go.num_cpu", float64(runtime.NumCPU()), nil, 1)
statsd.Gauge("runtime.go.num_goroutine", float64(runtime.NumGoroutine()), nil, 1)
Expand Down Expand Up @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) {
for {
select {
case <-ticker.C:
t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1)
t.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1)
t.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1)
t.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1)
case <-t.stop:
return
}
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type testStatsdCall struct {

func withStatsdClient(s statsdClient) StartOption {
return func(c *config) {
c.statsd = s
c.statsdClient = s
}
}

Expand Down Expand Up @@ -246,6 +246,7 @@ func (tg *testStatsdClient) Wait(n int, d time.Duration) error {
func TestReportRuntimeMetrics(t *testing.T) {
var tg testStatsdClient
trc := newUnstartedTracer(withStatsdClient(&tg))
defer trc.statsd.Close()

trc.wg.Add(1)
go func() {
Expand Down
29 changes: 18 additions & 11 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ type config struct {
// combination of the environment variables DD_AGENT_HOST and DD_DOGSTATSD_PORT.
dogstatsdAddr string

// statsd is used for tracking metrics associated with the runtime and the tracer.
statsd statsdClient
// statsdClient is set when a user provides a custom statsd client for tracking metrics
// associated with the runtime and the tracer.
statsdClient statsdClient

// spanRules contains user-defined rules to determine the sampling rate to apply
// to trace spans.
Expand Down Expand Up @@ -295,7 +296,7 @@ func newConfig(opts ...StartOption) *config {
log.SetLevel(log.LevelDebug)
}
c.loadAgentFeatures()
if c.statsd == nil {
if c.statsdClient == nil {
// configure statsd client
addr := c.dogstatsdAddr
if addr == "" {
Expand All @@ -316,17 +317,23 @@ func newConfig(opts ...StartOption) *config {
// not a valid TCP address, leave it as it is (could be a socket connection)
}
c.dogstatsdAddr = addr
client, err := statsd.New(addr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(statsTags(c)))
if err != nil {
log.Warn("Runtime and health metrics disabled: %v", err)
c.statsd = &statsd.NoOpClient{}
} else {
c.statsd = client
}
}

return c
}

func newStatsdClient(c *config) (statsdClient, error) {
if c.statsdClient != nil {
return c.statsdClient, nil
}

client, err := statsd.New(c.dogstatsdAddr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(statsTags(c)))
if err != nil {
return &statsd.NoOpClient{}, err
}
return client, nil
}

// defaultHTTPClient returns the default http.Client to start the tracer with.
func defaultHTTPClient() *http.Client {
if _, err := os.Stat(defaultSocketAPM); err == nil {
Expand Down Expand Up @@ -476,7 +483,7 @@ func statsTags(c *config) []string {
// withNoopStats is used for testing to disable statsd client
func withNoopStats() StartOption {
return func(c *config) {
c.statsd = &statsd.NoOpClient{}
c.statsdClient = &statsd.NoOpClient{}
}
}

Expand Down
38 changes: 31 additions & 7 deletions ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func withTickChan(ch <-chan time.Time) StartOption {
// testStatsd asserts that the given statsd.Client can successfully send metrics
// to a UDP listener located at addr.
func testStatsd(t *testing.T, cfg *config, addr string) {
client := cfg.statsd
client, err := newStatsdClient(cfg)
require.NoError(t, err)
defer client.Close()
require.Equal(t, addr, cfg.dogstatsdAddr)
_, err := net.ResolveUDPAddr("udp", addr)
_, err = net.ResolveUDPAddr("udp", addr)
require.NoError(t, err)

client.Count("name", 1, []string{"tag"}, 1)
Expand All @@ -57,7 +59,9 @@ func TestStatsdUDPConnect(t *testing.T) {
cfg := newConfig()
addr := net.JoinHostPort(defaultHostname, "8111")

client := cfg.statsd
client, err := newStatsdClient(cfg)
require.NoError(t, err)
defer client.Close()
require.Equal(t, addr, cfg.dogstatsdAddr)
udpaddr, err := net.ResolveUDPAddr("udp", addr)
require.NoError(t, err)
Expand Down Expand Up @@ -118,8 +122,11 @@ func TestAutoDetectStatsd(t *testing.T) {
conn.SetDeadline(time.Now().Add(5 * time.Second))

cfg := newConfig()
statsd, err := newStatsdClient(cfg)
require.NoError(t, err)
defer statsd.Close()
require.Equal(t, cfg.dogstatsdAddr, "unix://"+addr)
cfg.statsd.Count("name", 1, []string{"tag"}, 1)
statsd.Count("name", 1, []string{"tag"}, 1)

buf := make([]byte, 17)
n, err := conn.Read(buf)
Expand Down Expand Up @@ -246,11 +253,14 @@ func TestTracerOptionsDefaults(t *testing.T) {
defer globalconfig.SetAnalyticsRate(math.NaN())
assert := assert.New(t)
assert.True(math.IsNaN(globalconfig.AnalyticsRate()))
newTracer(WithAnalyticsRate(0.5))
tracer := newTracer(WithAnalyticsRate(0.5))
defer tracer.Stop()
assert.Equal(0.5, globalconfig.AnalyticsRate())
newTracer(WithAnalytics(false))
tracer = newTracer(WithAnalytics(false))
defer tracer.Stop()
assert.True(math.IsNaN(globalconfig.AnalyticsRate()))
newTracer(WithAnalytics(true))
tracer = newTracer(WithAnalytics(true))
defer tracer.Stop()
assert.Equal(1., globalconfig.AnalyticsRate())
})

Expand All @@ -274,6 +284,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
t.Run("dogstatsd", func(t *testing.T) {
t.Run("default", func(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "localhost:8125")
})
Expand All @@ -282,6 +293,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
os.Setenv("DD_AGENT_HOST", "my-host")
defer os.Unsetenv("DD_AGENT_HOST")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "my-host:8125")
})
Expand All @@ -290,6 +302,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
os.Setenv("DD_DOGSTATSD_PORT", "123")
defer os.Unsetenv("DD_DOGSTATSD_PORT")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "localhost:123")
})
Expand All @@ -300,6 +313,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
defer os.Unsetenv("DD_AGENT_HOST")
defer os.Unsetenv("DD_DOGSTATSD_PORT")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "my-host:123")
})
Expand All @@ -308,12 +322,14 @@ func TestTracerOptionsDefaults(t *testing.T) {
os.Setenv("DD_ENV", "testEnv")
defer os.Unsetenv("DD_ENV")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "testEnv", c.env)
})

t.Run("option", func(t *testing.T) {
tracer := newTracer(WithDogstatsdAddress("10.1.0.12:4002"))
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "10.1.0.12:4002")
})
Expand All @@ -323,6 +339,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
os.Setenv("DD_AGENT_HOST", "trace-agent")
defer os.Unsetenv("DD_AGENT_HOST")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "http://trace-agent:8126", c.agentURL)
})
Expand All @@ -331,6 +348,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
t.Run("env", func(t *testing.T) {
t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "https://custom:1234", c.agentURL)
})
Expand All @@ -340,13 +358,15 @@ func TestTracerOptionsDefaults(t *testing.T) {
t.Setenv("DD_TRACE_AGENT_PORT", "3333")
t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "https://custom:1234", c.agentURL)
})

t.Run("code-override", func(t *testing.T) {
t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234")
tracer := newTracer(WithAgentAddr("testhost:3333"))
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "http://testhost:3333", c.agentURL)
})
Expand All @@ -358,13 +378,15 @@ func TestTracerOptionsDefaults(t *testing.T) {
assert := assert.New(t)
env := "production"
tracer := newTracer(WithEnv(env))
defer tracer.Stop()
c := tracer.config
assert.Equal(env, c.env)
})

t.Run("trace_enabled", func(t *testing.T) {
t.Run("default", func(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.True(t, c.enabled)
})
Expand All @@ -373,6 +395,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
os.Setenv("DD_TRACE_ENABLED", "false")
defer os.Unsetenv("DD_TRACE_ENABLED")
tracer := newTracer()
defer tracer.Stop()
c := tracer.config
assert.False(t, c.enabled)
})
Expand All @@ -387,6 +410,7 @@ func TestTracerOptionsDefaults(t *testing.T) {
WithDebugMode(true),
WithEnv("testEnv"),
)
defer tracer.Stop()
c := tracer.config
assert.Equal(float64(0.5), c.sampler.(RateSampler).Rate())
assert.Equal("http://ddagent.consul.local:58126", c.agentURL)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ func TestRulesSamplerConcurrency(t *testing.T) {
NameRule("notweb.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
defer tracer.Stop()
span := func(wg *sync.WaitGroup) {
defer wg.Done()
tracer.StartSpan("db.query", ServiceName("postgres.db")).Finish()
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestSpanFinish(t *testing.T) {
assert := assert.New(t)
wait := time.Millisecond * 2
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")

// the finish should set finished and the duration
Expand Down Expand Up @@ -364,13 +365,15 @@ func TestTraceManualKeepAndManualDrop(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/local", scenario.tag), func(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
span := tracer.newRootSpan("root span", "my service", "my resource")
span.SetTag(scenario.tag, true)
assert.Equal(t, scenario.keep, shouldKeep(span))
})

t.Run(fmt.Sprintf("%s/non-local", scenario.tag), func(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
spanCtx := &spanContext{traceID: 42, spanID: 42}
spanCtx.setSamplingPriority(scenario.p, samplernames.RemoteRate)
span := tracer.StartSpan("non-local root span", ChildOf(spanCtx)).(*span)
Expand All @@ -396,6 +399,7 @@ func TestSpanSetDatadogTags(t *testing.T) {
func TestSpanStart(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")

// a new span sets the Start after the initialization
Expand All @@ -405,6 +409,7 @@ func TestSpanStart(t *testing.T) {
func TestSpanString(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")
// don't bother checking the contents, just make sure it works.
assert.NotEqual("", span.String())
Expand Down Expand Up @@ -463,6 +468,7 @@ func TestSpanSetMetric(t *testing.T) {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("http.request", "mux.router", "/")
tt(assert, span)
})
Expand All @@ -472,6 +478,7 @@ func TestSpanSetMetric(t *testing.T) {
func TestSpanError(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")

// check the error is set in the default meta
Expand Down Expand Up @@ -499,6 +506,7 @@ func TestSpanError(t *testing.T) {
func TestSpanError_Typed(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")

// check the error is set in the default meta
Expand All @@ -513,6 +521,7 @@ func TestSpanError_Typed(t *testing.T) {
func TestSpanErrorNil(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()
span := tracer.newRootSpan("pylons.request", "pylons", "/")

// don't set the error if it's nil
Expand Down Expand Up @@ -574,6 +583,7 @@ func TestSpanModifyWhileFlushing(t *testing.T) {
func TestSpanSamplingPriority(t *testing.T) {
assert := assert.New(t)
tracer := newTracer(withTransport(newDefaultTransport()))
defer tracer.Stop()

span := tracer.newRootSpan("my.name", "my.service", "my.resource")
_, ok := span.Metrics[keySamplingPriority]
Expand Down
13 changes: 7 additions & 6 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ type concentrator struct {
// stopped reports whether the concentrator is stopped (when non-zero)
stopped uint32

wg sync.WaitGroup // waits for any active goroutines
bucketSize int64 // the size of a bucket in nanoseconds
stop chan struct{} // closing this channel triggers shutdown
cfg *config // tracer startup configuration
wg sync.WaitGroup // waits for any active goroutines
bucketSize int64 // the size of a bucket in nanoseconds
stop chan struct{} // closing this channel triggers shutdown
cfg *config // tracer startup configuration
statsdClient statsdClient // statsd client for sending metrics.
}

// newConcentrator creates a new concentrator using the given tracer
Expand Down Expand Up @@ -113,10 +114,10 @@ func (c *concentrator) runFlusher(tick <-chan time.Time) {

// statsd returns any tracer configured statsd client, or a no-op.
func (c *concentrator) statsd() statsdClient {
if c.cfg.statsd == nil {
if c.statsdClient == nil {
return &statsd.NoOpClient{}
}
return c.cfg.statsd
return c.statsdClient
}

// runIngester runs the loop which accepts incoming data on the concentrator's In
Expand Down
Loading

0 comments on commit cb93051

Please sign in to comment.