diff --git a/thriftbp/client_pool.go b/thriftbp/client_pool.go index e25fa9aeb..d4dcd47bb 100644 --- a/thriftbp/client_pool.go +++ b/thriftbp/client_pool.go @@ -24,6 +24,8 @@ import ( // DefaultPoolGaugeInterval is the fallback value to be used when // ClientPoolConfig.PoolGaugeInterval <= 0. +// +// Deprecated: Prometheus gauges are auto scraped. const DefaultPoolGaugeInterval = time.Second * 10 // PoolError is returned by ClientPool.TClient.Call when it fails to get a @@ -144,6 +146,8 @@ type ClientPoolConfig struct { // Any tags that should be applied to metrics logged by the ClientPool. // This includes the optional pool stats. + // + // Deprecated: We no longer emit any statsd metrics so this has no effect. MetricsTags metricsbp.Tags `yaml:"metricsTags"` // DefaultRetryOptions is the list of retry.Options to apply as the defaults @@ -168,6 +172,9 @@ type ClientPoolConfig struct { // // The reporting goroutine is cancelled when the global metrics client // context is Done. + // + // Deprecated: The statsd metrics are deprecated and the prometheus metrics + // are always reported. ReportPoolStats bool `yaml:"reportPoolStats"` // PoolGaugeInterval indicates how often we should update the active @@ -175,6 +182,8 @@ type ClientPoolConfig struct { // // When PoolGaugeInterval <= 0 and ReportPoolStats is true, // DefaultPoolGaugeInterval will be used instead. + // + // Deprecated: Not used any more. Prometheus gauges are auto scraped. PoolGaugeInterval time.Duration `yaml:"poolGaugeInterval"` // Suppress some of the errors returned by the server before sending them to @@ -396,7 +405,6 @@ func newClientPool( "thrift_pool": cfg.ServiceSlug, }).Set(float64(cfg.MaxConnections)) tConfig := cfg.ToTConfiguration() - tags := cfg.MetricsTags.AsStatsdTags() jitter := DefaultMaxConnectionAgeJitter if cfg.MaxConnectionAgeJitter != nil { jitter = *cfg.MaxConnectionAgeJitter @@ -405,7 +413,6 @@ func newClientPool( return newClient( tConfig, cfg.ServiceSlug, - cfg.MetricsTags, cfg.MaxConnectionAge, jitter, genAddr, @@ -440,33 +447,24 @@ func newClientPool( err, )) } - if cfg.ReportPoolStats { - go reportPoolStats( - metricsbp.M.Ctx(), - cfg.ServiceSlug, - pool, - cfg.PoolGaugeInterval, - tags, - ) - if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{ - slug: cfg.ServiceSlug, - pool: pool, - }); err != nil { - // Register should never fail because clientPoolGaugeExporter.Describe is - // a no-op, but just in case. - - var batch errorsbp.Batch - batch.Add(err) - if err := pool.Close(); err != nil { - batch.AddPrefix("close pool", err) - } - return nil, fmt.Errorf( - "thriftbp: error registering prometheus exporter for client pool %q: %w", - cfg.ServiceSlug, - batch.Compile(), - ) + if err := prometheusbpint.GlobalRegistry.Register(&clientPoolGaugeExporter{ + slug: cfg.ServiceSlug, + pool: pool, + }); err != nil { + // Register should never fail because clientPoolGaugeExporter.Describe is + // a no-op, but just in case. + + var batch errorsbp.Batch + batch.Add(err) + if err := pool.Close(); err != nil { + batch.AddPrefix("close pool", err) } + return nil, fmt.Errorf( + "thriftbp: error registering prometheus exporter for client pool %q: %w", + cfg.ServiceSlug, + batch.Compile(), + ) } // create the base clientPool, this is not ready for use. @@ -489,6 +487,8 @@ func newClientPool( clientPoolExhaustedCounter.With(labels) clientPoolClosedConnectionsCounter.With(labels) clientPoolReleaseErrorCounter.With(labels) + legacyClientPoolExhaustedCounter.With(labels) + legacyClientPoolReleaseErrorCounter.With(labels) return pooledClient, nil } @@ -496,7 +496,6 @@ func newClientPool( func newClient( cfg *thrift.TConfiguration, slug string, - tags metricsbp.Tags, maxConnectionAge time.Duration, maxConnectionAgeJitter float64, genAddr AddressGenerator, @@ -517,27 +516,7 @@ func newClient( protoFactory.GetProtocol(transport), protoFactory.GetProtocol(transport), ), transport, nil - }, maxConnectionAge, maxConnectionAgeJitter, slug, tags) -} - -func reportPoolStats(ctx context.Context, slug string, pool clientpool.Pool, tickerDuration time.Duration, tags []string) { - activeGauge := metricsbp.M.RuntimeGauge(slug + ".pool-active-connections").With(tags...) - allocatedGauge := metricsbp.M.RuntimeGauge(slug + ".pool-allocated-clients").With(tags...) - - if tickerDuration <= 0 { - tickerDuration = DefaultPoolGaugeInterval - } - ticker := time.NewTicker(tickerDuration) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - activeGauge.Set(float64(pool.NumActiveClients())) - allocatedGauge.Set(float64(pool.NumAllocated())) - } - } + }, maxConnectionAge, maxConnectionAgeJitter, slug) } type clientPool struct { @@ -615,6 +594,10 @@ func (p *clientPool) getClient() (_ Client, err error) { clientNameLabel: p.slug, "thrift_pool": p.slug, }).Inc() + legacyClientPoolExhaustedCounter.With(prometheus.Labels{ + clientNameLabel: p.slug, + "thrift_pool": p.slug, + }).Inc() } log.Errorw( "Failed to get client from pool", @@ -637,6 +620,10 @@ func (p *clientPool) releaseClient(c Client) { clientNameLabel: p.slug, "thrift_pool": p.slug, }).Inc() + legacyClientPoolReleaseErrorCounter.With(prometheus.Labels{ + clientNameLabel: p.slug, + "thrift_pool": p.slug, + }).Inc() } } diff --git a/thriftbp/prometheus.go b/thriftbp/prometheus.go index 9a385374c..2b67faf42 100644 --- a/thriftbp/prometheus.go +++ b/thriftbp/prometheus.go @@ -167,10 +167,18 @@ var ( methodLabel, } - panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + // TODO: Remove after the next release (v0.9.12) + legacyPanicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: subsystemServer, Name: "panic_recover_total", + Help: "Deprecated: Use thriftbp_server_recovered_panics_total instead", + }, panicRecoverLabels) + + panicRecoverCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: subsystemServer, + Name: "recovered_panics_total", Help: "The number of panics recovered from thrift server handlers", }, panicRecoverLabels) ) @@ -181,25 +189,35 @@ var ( "thrift_pool", } - clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + // TODO: Remove after the next release (v0.9.12) + legacyClientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: subsystemClientPool, Name: "exhausted_total", - Help: "The number of pool exhaustion for a thrift client pool", + Help: "Deprecated: Use thriftbp_client_pool_exhaustions_total instead", + }, clientPoolLabels) + + clientPoolExhaustedCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + Name: "thriftbp_client_pool_exhaustions_total", + Help: "The number of pool exhaustions for a thrift client pool", }, clientPoolLabels) clientPoolClosedConnectionsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: subsystemClientPool, - Name: "closed_connections_total", - Help: "The number of times we closed the client after used it from the pool", + Name: "thriftbp_client_pool_closed_connections_total", + Help: "The number of times we closed the client after used it from the pool", }, clientPoolLabels) - clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + // TODO: Remove after the next release (v0.9.12) + legacyClientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: subsystemClientPool, Name: "release_error_total", - Help: "The number of times we failed to release a client back to the pool", + Help: "Deprecated: Use thriftbp_client_pool_release_errors_total instead", + }, clientPoolLabels) + + clientPoolReleaseErrorCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ + Name: "thriftbp_client_pool_release_errors_total", + Help: "The number of times we failed to release a client back to the pool", }, clientPoolLabels) clientPoolGetsCounter = promauto.With(prometheusbpint.GlobalRegistry).NewCounterVec(prometheus.CounterOpts{ diff --git a/thriftbp/server.go b/thriftbp/server.go index 1b46a1b06..854ee3198 100644 --- a/thriftbp/server.go +++ b/thriftbp/server.go @@ -11,11 +11,6 @@ import ( //lint:ignore SA1019 This library is internal only, not actually deprecated "github.com/reddit/baseplate.go/internalv2compat" "github.com/reddit/baseplate.go/log" - "github.com/reddit/baseplate.go/metricsbp" -) - -const ( - meterNameThriftSocketErrorCounter = "thrift.socket.timeout" ) // ServerConfig is the arg struct for both NewServer and NewBaseplateServer. @@ -58,6 +53,8 @@ type ServerConfig struct { // // Report the number of clients connected to the server as a runtime gauge // with metric name of 'thrift.connections' + // + // Deprecated: This feature is removed. ReportConnectionCount bool // Optional, used only by NewServer. @@ -108,10 +105,6 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) { transport = cfg.Socket } - if cfg.ReportConnectionCount { - transport = &CountedTServerTransport{transport} - } - server := thrift.NewTSimpleServer4( thrift.WrapProcessor(cfg.Processor, cfg.Middlewares...), transport, @@ -160,10 +153,8 @@ func NewBaseplateServer( } func suppressTimeoutLogger(logger thrift.Logger) thrift.Logger { - c := metricsbp.M.Counter(meterNameThriftSocketErrorCounter) return func(msg string) { if strings.Contains(msg, "i/o timeout") { - c.Add(1) return } diff --git a/thriftbp/server_middlewares.go b/thriftbp/server_middlewares.go index b2833268f..4f5c3cb61 100644 --- a/thriftbp/server_middlewares.go +++ b/thriftbp/server_middlewares.go @@ -15,9 +15,7 @@ import ( "github.com/reddit/baseplate.go/internal/gen-go/reddit/baseplate" "github.com/reddit/baseplate.go/iobp" "github.com/reddit/baseplate.go/log" - "github.com/reddit/baseplate.go/metricsbp" "github.com/reddit/baseplate.go/prometheusbp" - "github.com/reddit/baseplate.go/randbp" "github.com/reddit/baseplate.go/tracing" "github.com/reddit/baseplate.go/transport" ) @@ -281,59 +279,44 @@ func AbandonCanceledRequests(name string, next thrift.TProcessorFunction) thrift // - payload.size.myEndpoint.request // // - payload.size.myEndpoint.response -func ReportPayloadSizeMetrics(rate float64) thrift.ProcessorMiddleware { +func ReportPayloadSizeMetrics(_ float64) thrift.ProcessorMiddleware { return func(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction { return thrift.WrappedTProcessorFunction{ Wrapped: func(ctx context.Context, seqID int32, in, out thrift.TProtocol) (bool, thrift.TException) { - if rate > 0 { - // Only report for THeader requests - if ht, ok := in.Transport().(*thrift.THeaderTransport); ok { - protoID := ht.Protocol() - cfg := &thrift.TConfiguration{ - THeaderProtocolID: &protoID, - } - var itrans, otrans countingTransport - transport := thrift.NewTHeaderTransportConf(&itrans, cfg) - iproto := thrift.NewTHeaderProtocolConf(transport, cfg) - in = &thrift.TDuplicateToProtocol{ - Delegate: in, - DuplicateTo: iproto, - } - transport = thrift.NewTHeaderTransportConf(&otrans, cfg) - oproto := thrift.NewTHeaderProtocolConf(transport, cfg) - out = &thrift.TDuplicateToProtocol{ - Delegate: out, - DuplicateTo: oproto, - } - - defer func() { - iproto.Flush(ctx) - oproto.Flush(ctx) - isize := float64(itrans.Size()) - osize := float64(otrans.Size()) - - proto := "header-" + tHeaderProtocol2String(protoID) - labels := prometheus.Labels{ - methodLabel: name, - protoLabel: proto, - } - payloadSizeRequestBytes.With(labels).Observe(isize) - payloadSizeResponseBytes.With(labels).Observe(osize) - - if randbp.ShouldSampleWithRate(rate) { - metricsbp.M.HistogramWithRate(metricsbp.RateArgs{ - Name: "payload.size." + name + ".request", - Rate: 1, - AlreadySampledAt: metricsbp.Float64Ptr(rate), - }).With("proto", proto).Observe(isize) - metricsbp.M.HistogramWithRate(metricsbp.RateArgs{ - Name: "payload.size." + name + ".response", - Rate: 1, - AlreadySampledAt: metricsbp.Float64Ptr(rate), - }).With("proto", proto).Observe(osize) - } - }() + // Only report for THeader requests + if ht, ok := in.Transport().(*thrift.THeaderTransport); ok { + protoID := ht.Protocol() + cfg := &thrift.TConfiguration{ + THeaderProtocolID: &protoID, + } + var itrans, otrans countingTransport + transport := thrift.NewTHeaderTransportConf(&itrans, cfg) + iproto := thrift.NewTHeaderProtocolConf(transport, cfg) + in = &thrift.TDuplicateToProtocol{ + Delegate: in, + DuplicateTo: iproto, + } + transport = thrift.NewTHeaderTransportConf(&otrans, cfg) + oproto := thrift.NewTHeaderProtocolConf(transport, cfg) + out = &thrift.TDuplicateToProtocol{ + Delegate: out, + DuplicateTo: oproto, } + + defer func() { + iproto.Flush(ctx) + oproto.Flush(ctx) + isize := float64(itrans.Size()) + osize := float64(otrans.Size()) + + proto := "header-" + tHeaderProtocol2String(protoID) + labels := prometheus.Labels{ + methodLabel: name, + protoLabel: proto, + } + payloadSizeRequestBytes.With(labels).Observe(isize) + payloadSizeResponseBytes.With(labels).Observe(osize) + }() } return next.Process(ctx, seqID, in, out) @@ -387,6 +370,10 @@ func RecoverPanik(name string, next thrift.TProcessorFunction) thrift.TProcessor counter := panicRecoverCounter.With(prometheus.Labels{ methodLabel: name, }) + // TODO: Remove after next release (v0.9.12) + legacyCounter := legacyPanicRecoverCounter.With(prometheus.Labels{ + methodLabel: name, + }) return thrift.WrappedTProcessorFunction{ Wrapped: func(ctx context.Context, seqId int32, in, out thrift.TProtocol) (ok bool, err thrift.TException) { defer func() { @@ -402,10 +389,8 @@ func RecoverPanik(name string, next thrift.TProcessorFunction) thrift.TProcessor "err", rErr, "endpoint", name, ) - metricsbp.M.Counter("panic.recover").With( - "name", name, - ).Add(1) counter.Inc() + legacyCounter.Inc() // changed named return values to show that the request failed and // return the panic value error. diff --git a/thriftbp/server_transport.go b/thriftbp/server_transport.go index a45554826..ccf17d859 100644 --- a/thriftbp/server_transport.go +++ b/thriftbp/server_transport.go @@ -13,6 +13,8 @@ const meterNameTransportConnCounter = "thrift.connections" // CountedTServerTransport is a wrapper around thrift.TServerTransport that // emits a gauge of the number of client connections. +// +// Deprecated: This is deprecated with statsd metrics. type CountedTServerTransport struct { thrift.TServerTransport } diff --git a/thriftbp/ttl_client.go b/thriftbp/ttl_client.go index 0981a5add..86989ee7b 100644 --- a/thriftbp/ttl_client.go +++ b/thriftbp/ttl_client.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/kit/metrics" "github.com/prometheus/client_golang/prometheus" - "github.com/reddit/baseplate.go/metricsbp" "github.com/reddit/baseplate.go/prometheusbp" "github.com/reddit/baseplate.go/randbp" ) @@ -106,7 +105,6 @@ func (c *ttlClient) refresh() { // We cannot replace this connection in the background, // leave client and transport be, // this connection will be replaced by the pool upon next use. - c.replaceCounter.With("success", metricsbp.BoolString(false)).Add(1) ttlClientReplaceCounter.With(prometheus.Labels{ clientNameLabel: c.slug, successLabel: prometheusbp.BoolString(false), @@ -132,7 +130,6 @@ func (c *ttlClient) refresh() { state.transport.Close() } state.transport = transport - c.replaceCounter.With("success", metricsbp.BoolString(true)).Add(1) ttlClientReplaceCounter.With(prometheus.Labels{ clientNameLabel: c.slug, successLabel: prometheusbp.BoolString(true), @@ -140,7 +137,7 @@ func (c *ttlClient) refresh() { } // newTTLClient creates a ttlClient with a thrift TTransport and ttl+jitter. -func newTTLClient(generator ttlClientGenerator, ttl time.Duration, jitter float64, slug string, tags metricsbp.Tags) (*ttlClient, error) { +func newTTLClient(generator ttlClientGenerator, ttl time.Duration, jitter float64, slug string) (*ttlClient, error) { client, transport, err := generator() if err != nil { return nil, err @@ -153,9 +150,7 @@ func newTTLClient(generator ttlClientGenerator, ttl time.Duration, jitter float6 c := &ttlClient{ generator: generator, ttl: duration, - - replaceCounter: metricsbp.M.Counter(slug + ".connection-housekeeping").With(tags.AsStatsdTags()...), - slug: slug, + slug: slug, state: make(chan *ttlClientState, 1), } diff --git a/thriftbp/ttl_client_test.go b/thriftbp/ttl_client_test.go index 8b5903400..9ea5f51b7 100644 --- a/thriftbp/ttl_client_test.go +++ b/thriftbp/ttl_client_test.go @@ -32,7 +32,7 @@ func TestTTLClient(t *testing.T) { ttl := time.Millisecond jitter := 0.1 - client, err := newTTLClient(firstSuccessGenerator(transport), ttl, jitter, "", nil) + client, err := newTTLClient(firstSuccessGenerator(transport), ttl, jitter, "") if err != nil { t.Fatalf("newTTLClient returned error: %v", err) } @@ -45,7 +45,7 @@ func TestTTLClient(t *testing.T) { t.Error("Expected IsOpen call after sleep to return false, got true.") } - client, err = newTTLClient(firstSuccessGenerator(transport), ttl, -jitter, "", nil) + client, err = newTTLClient(firstSuccessGenerator(transport), ttl, -jitter, "") if err != nil { t.Fatalf("newTTLClient returned error: %v", err) } @@ -63,7 +63,7 @@ func TestTTLClientNegativeTTL(t *testing.T) { transport := thrift.NewTMemoryBuffer() ttl := time.Millisecond - client, err := newTTLClient(firstSuccessGenerator(transport), -ttl, 0.1, "", nil) + client, err := newTTLClient(firstSuccessGenerator(transport), -ttl, 0.1, "") if err != nil { t.Fatalf("newTTLClient returned error: %v", err) } @@ -160,7 +160,7 @@ func TestTTLClientRefresh(t *testing.T) { ) g := alwaysSuccessGenerator{transport: &transport} - client, err := newTTLClient(g.generator(), ttl, jitter, "", nil) + client, err := newTTLClient(g.generator(), ttl, jitter, "") if err != nil { t.Fatalf("newTTLClient returned error: %v", err) }