From dbef5ded1585bcdcf84bbe09f3908210f9775ef6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 19 Aug 2024 17:16:34 +0000 Subject: [PATCH] feat(libp2p): expose libp2p bandwidth metrics (#12402) This exposes bandwidth metrics via async callback to avoid allocating/reporting metrics on any hot-paths. I'm using open telemetry as we've already setup a bridge for F3 and opencensus is deprecated in favor of open telemetry (so we're going to slowly move over anyways). --- CHANGELOG.md | 1 + chain/lf3/f3.go | 17 ----------------- go.mod | 2 +- metrics/otel_bridge.go | 21 +++++++++++++++++++++ node/modules/lp2p/host.go | 6 ------ node/modules/lp2p/metrics.go | 30 ++++++++++++++++++++++++++++++ node/modules/lp2p/transport.go | 34 ++++++++++++++++++++++++++++++++-- 7 files changed, 85 insertions(+), 26 deletions(-) create mode 100644 metrics/otel_bridge.go create mode 100644 node/modules/lp2p/metrics.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 19cd54b8653..799cf0d6483 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ - Some APIs have changed which may impact users consuming Lotus Gateway code as a library. - The default value for the `Events.FilterTTL` config option has been reduced from 24h to 1h. This means that filters will expire on a Lotus node after 1 hour of not being accessed by the client. - feat(f3): F3 has been updated with many performance improvements and additional metrics. +- feat(libp2p): expose libp2p bandwidth metrics via Prometheus. # 1.28.2 / 2024-08-15 diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index 2514320ea7b..eba612e77b1 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -10,9 +10,6 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/fx" "golang.org/x/xerrors" @@ -53,20 +50,6 @@ type F3Params struct { var log = logging.Logger("f3") -func init() { - // Set up otel to prometheus reporting so that F3 metrics are reported via lotus - // prometheus metrics. This bridge eventually gets picked up by opencensus - // exporter as HTTP handler. This by default registers an otel collector against - // the global prometheus registry. In the future, we should clean up metrics in - // Lotus and move it all to use otel. For now, bridge away. - if bridge, err := prometheus.New(); err != nil { - log.Errorf("could not create the otel prometheus exporter: %v", err) - } else { - provider := metric.NewMeterProvider(metric.WithReader(bridge)) - otel.SetMeterProvider(provider) - } -} - func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) { ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3")) diff --git a/go.mod b/go.mod index 4e81565aa27..9e29ce2b3a3 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( go.opentelemetry.io/otel/bridge/opencensus v1.28.0 go.opentelemetry.io/otel/exporters/jaeger v1.14.0 go.opentelemetry.io/otel/exporters/prometheus v0.50.0 + go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 go.uber.org/atomic v1.11.0 @@ -323,7 +324,6 @@ require ( github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/zondax/hid v0.9.2 // indirect github.com/zondax/ledger-go v0.14.3 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/mock v0.4.0 // indirect diff --git a/metrics/otel_bridge.go b/metrics/otel_bridge.go new file mode 100644 index 00000000000..76572f5afb2 --- /dev/null +++ b/metrics/otel_bridge.go @@ -0,0 +1,21 @@ +package metrics + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" +) + +func init() { + // Set up otel to prometheus reporting so that F3 metrics are reported via lotus + // prometheus metrics. This bridge eventually gets picked up by opencensus + // exporter as HTTP handler. This by default registers an otel collector against + // the global prometheus registry. In the future, we should clean up metrics in + // Lotus and move it all to use otel. For now, bridge away. + if bridge, err := prometheus.New(); err != nil { + log.Errorf("could not create the otel prometheus exporter: %v", err) + } else { + provider := metric.NewMeterProvider(metric.WithReader(bridge)) + otel.SetMeterProvider(provider) + } +} diff --git a/node/modules/lp2p/host.go b/node/modules/lp2p/host.go index baea4cf0656..fd723fbb2c4 100644 --- a/node/modules/lp2p/host.go +++ b/node/modules/lp2p/host.go @@ -62,12 +62,6 @@ func Host(mctx helpers.MetricsCtx, buildVersion build.BuildVersion, lc fx.Lifecy return nil, err } - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return h.Close() - }, - }) - return h, nil } diff --git a/node/modules/lp2p/metrics.go b/node/modules/lp2p/metrics.go new file mode 100644 index 00000000000..1d0d6473b52 --- /dev/null +++ b/node/modules/lp2p/metrics.go @@ -0,0 +1,30 @@ +package lp2p + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var otelmeter = otel.Meter("libp2p") + +var attrIdentity = attribute.Key("identity") +var attrProtocolID = attribute.Key("protocol") +var attrDirectionInbound = attribute.String("direction", "inbound") +var attrDirectionOutbound = attribute.String("direction", "outbound") + +var otelmetrics = struct { + bandwidth metric.Int64ObservableCounter +}{ + bandwidth: must(otelmeter.Int64ObservableCounter("lotus_libp2p_bandwidth", + metric.WithDescription("Libp2p stream bandwidth."), + metric.WithUnit("By"), + )), +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/node/modules/lp2p/transport.go b/node/modules/lp2p/transport.go index 536f612b1aa..6a95937d284 100644 --- a/node/modules/lp2p/transport.go +++ b/node/modules/lp2p/transport.go @@ -1,11 +1,16 @@ package lp2p import ( + "context" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/peer" noise "github.com/libp2p/go-libp2p/p2p/security/noise" tls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) var DefaultTransports = simpleOpt(libp2p.DefaultTransports) @@ -31,8 +36,33 @@ func Security(enabled, preferTLS bool) interface{} { } } -func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { +func BandwidthCounter(lc fx.Lifecycle, id peer.ID) (opts Libp2pOpts, reporter metrics.Reporter, err error) { reporter = metrics.NewBandwidthCounter() opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - return opts, reporter + + // Register it with open telemetry. We report by-callback instead of implementing a custom + // bandwidth counter to avoid allocating every time we read/write to a stream (and to stay + // out of the hot path). + // + // Identity is required to ensure this observer observes with unique attributes. + identityAttr := attrIdentity.String(id.String()) + registration, err := otelmeter.RegisterCallback(func(ctx context.Context, obs metric.Observer) error { + for p, bw := range reporter.GetBandwidthByProtocol() { + if p == "" { + p = "" + } + protoAttr := attrProtocolID.String(string(p)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalOut, + metric.WithAttributes(identityAttr, protoAttr, attrDirectionOutbound)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalIn, + metric.WithAttributes(identityAttr, protoAttr, attrDirectionInbound)) + } + return nil + }, otelmetrics.bandwidth) + if err != nil { + return Libp2pOpts{}, nil, err + } + lc.Append(fx.StopHook(registration.Unregister)) + + return opts, reporter, nil }