From 6147c81cd047f0237c9bfa553fb61ce74bfa37aa Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy <126021+ash2k@users.noreply.github.com> Date: Wed, 4 Sep 2024 01:13:28 +1000 Subject: [PATCH] stats/opentelemetry: Optimize slice allocations (#7525) --- stats/opentelemetry/client_metrics.go | 25 +++++++++++++++++-------- stats/opentelemetry/opentelemetry.go | 13 ++++++------- stats/opentelemetry/server_metrics.go | 14 +++++++++----- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index d5b9aa46b1ee..4af7f933c8ba 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -128,9 +128,13 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { - s := status.Convert(err) - callLatency := float64(time.Since(startTime)) / float64(time.Second) - h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code())))) + callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + otelattribute.String("grpc.status", canonicalString(status.Code(err))), + )) + h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) } // TagConn exists to satisfy stats.Handler. @@ -188,7 +192,11 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta return } - h.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target))) + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + )) + h.clientMetrics.attemptStarted.Add(ctx, 1, attrs) case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: @@ -244,10 +252,11 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, } } - clientAttributeOption := otelmetric.WithAttributes(attributes...) - h.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption) - h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption) - h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption) + // Allocate vararg slice once. + opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))} + h.clientMetrics.attemptDuration.Record(ctx, latency, opts...) + h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) + h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) } const ( diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index b0d0cde7b891..cc5ad387fb4c 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -283,7 +283,7 @@ func optionFromLabels(labelKeys []string, optionalLabelKeys []string, optionalLa } } } - return otelmetric.WithAttributes(attributes...) + return otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...)) } // registryMetrics implements MetricsRecorder for the client and server stats @@ -330,41 +330,40 @@ func (rm *registryMetrics) registerMetrics(metrics *estats.Metrics, meter otelme func (rm *registryMetrics) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) { desc := handle.Descriptor() - ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) - if ic, ok := rm.intCounts[desc]; ok { + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) ic.Add(context.TODO(), incr, ao) } } func (rm *registryMetrics) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) { desc := handle.Descriptor() - ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) if fc, ok := rm.floatCounts[desc]; ok { + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) fc.Add(context.TODO(), incr, ao) } } func (rm *registryMetrics) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) { desc := handle.Descriptor() - ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) if ih, ok := rm.intHistos[desc]; ok { + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) ih.Record(context.TODO(), incr, ao) } } func (rm *registryMetrics) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) { desc := handle.Descriptor() - ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) if fh, ok := rm.floatHistos[desc]; ok { + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) fh.Record(context.TODO(), incr, ao) } } func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) { desc := handle.Descriptor() - ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) if ig, ok := rm.intGauges[desc]; ok { + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) ig.Record(context.TODO(), incr, ao) } } diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 528fced8b5c7..75c7c43b69a4 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -227,7 +227,10 @@ func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStat } ai.pluginOptionLabels = labels } - h.serverMetrics.callStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ai.method))) + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ai.method), + )) + h.serverMetrics.callStarted.Add(ctx, 1, attrs) case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: @@ -253,10 +256,11 @@ func (h *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, attributes = append(attributes, otelattribute.String(k, v)) } - serverAttributeOption := otelmetric.WithAttributes(attributes...) - h.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption) - h.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), serverAttributeOption) - h.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), serverAttributeOption) + // Allocate vararg slice once. + opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))} + h.serverMetrics.callDuration.Record(ctx, latency, opts...) + h.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) + h.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) } const (