Skip to content

Commit

Permalink
feat(libp2p): expose libp2p bandwidth metrics (filecoin-project#12402)
Browse files Browse the repository at this point in the history
This exposes bandwidth metrics via async callback to avoid
allocating/reporting metrics on any hot-paths. I'm using open telemetry
as we've already setup a bridge for F3 and opencensus is deprecated in
favor of open telemetry (so we're going to slowly move over anyways).
  • Loading branch information
Stebalien committed Aug 19, 2024
1 parent 0225c91 commit dbef5de
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
- Some APIs have changed which may impact users consuming Lotus Gateway code as a library.
- The default value for the `Events.FilterTTL` config option has been reduced from 24h to 1h. This means that filters will expire on a Lotus node after 1 hour of not being accessed by the client.
- feat(f3): F3 has been updated with many performance improvements and additional metrics.
- feat(libp2p): expose libp2p bandwidth metrics via Prometheus.

# 1.28.2 / 2024-08-15

Expand Down
17 changes: 0 additions & 17 deletions chain/lf3/f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"go.uber.org/fx"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -53,20 +50,6 @@ type F3Params struct {

var log = logging.Logger("f3")

func init() {
// Set up otel to prometheus reporting so that F3 metrics are reported via lotus
// prometheus metrics. This bridge eventually gets picked up by opencensus
// exporter as HTTP handler. This by default registers an otel collector against
// the global prometheus registry. In the future, we should clean up metrics in
// Lotus and move it all to use otel. For now, bridge away.
if bridge, err := prometheus.New(); err != nil {
log.Errorf("could not create the otel prometheus exporter: %v", err)
} else {
provider := metric.NewMeterProvider(metric.WithReader(bridge))
otel.SetMeterProvider(provider)
}
}

func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {

ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ require (
go.opentelemetry.io/otel/bridge/opencensus v1.28.0
go.opentelemetry.io/otel/exporters/jaeger v1.14.0
go.opentelemetry.io/otel/exporters/prometheus v0.50.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.uber.org/atomic v1.11.0
Expand Down Expand Up @@ -323,7 +324,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/zondax/hid v0.9.2 // indirect
github.com/zondax/ledger-go v0.14.3 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/mock v0.4.0 // indirect
Expand Down
21 changes: 21 additions & 0 deletions metrics/otel_bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package metrics

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
)

func init() {
// Set up otel to prometheus reporting so that F3 metrics are reported via lotus
// prometheus metrics. This bridge eventually gets picked up by opencensus
// exporter as HTTP handler. This by default registers an otel collector against
// the global prometheus registry. In the future, we should clean up metrics in
// Lotus and move it all to use otel. For now, bridge away.
if bridge, err := prometheus.New(); err != nil {
log.Errorf("could not create the otel prometheus exporter: %v", err)
} else {
provider := metric.NewMeterProvider(metric.WithReader(bridge))
otel.SetMeterProvider(provider)
}
}
6 changes: 0 additions & 6 deletions node/modules/lp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ func Host(mctx helpers.MetricsCtx, buildVersion build.BuildVersion, lc fx.Lifecy
return nil, err
}

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return h.Close()
},
})

return h, nil
}

Expand Down
30 changes: 30 additions & 0 deletions node/modules/lp2p/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package lp2p

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var otelmeter = otel.Meter("libp2p")

var attrIdentity = attribute.Key("identity")
var attrProtocolID = attribute.Key("protocol")
var attrDirectionInbound = attribute.String("direction", "inbound")
var attrDirectionOutbound = attribute.String("direction", "outbound")

var otelmetrics = struct {
bandwidth metric.Int64ObservableCounter
}{
bandwidth: must(otelmeter.Int64ObservableCounter("lotus_libp2p_bandwidth",
metric.WithDescription("Libp2p stream bandwidth."),
metric.WithUnit("By"),
)),
}

func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
34 changes: 32 additions & 2 deletions node/modules/lp2p/transport.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package lp2p

import (
"context"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var DefaultTransports = simpleOpt(libp2p.DefaultTransports)
Expand All @@ -31,8 +36,33 @@ func Security(enabled, preferTLS bool) interface{} {
}
}

func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) {
func BandwidthCounter(lc fx.Lifecycle, id peer.ID) (opts Libp2pOpts, reporter metrics.Reporter, err error) {
reporter = metrics.NewBandwidthCounter()
opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter))
return opts, reporter

// Register it with open telemetry. We report by-callback instead of implementing a custom
// bandwidth counter to avoid allocating every time we read/write to a stream (and to stay
// out of the hot path).
//
// Identity is required to ensure this observer observes with unique attributes.
identityAttr := attrIdentity.String(id.String())
registration, err := otelmeter.RegisterCallback(func(ctx context.Context, obs metric.Observer) error {
for p, bw := range reporter.GetBandwidthByProtocol() {
if p == "" {
p = "<unknown>"
}
protoAttr := attrProtocolID.String(string(p))
obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalOut,
metric.WithAttributes(identityAttr, protoAttr, attrDirectionOutbound))
obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalIn,
metric.WithAttributes(identityAttr, protoAttr, attrDirectionInbound))
}
return nil
}, otelmetrics.bandwidth)
if err != nil {
return Libp2pOpts{}, nil, err
}
lc.Append(fx.StopHook(registration.Unregister))

return opts, reporter, nil
}

0 comments on commit dbef5de

Please sign in to comment.