Skip to content

Commit

Permalink
stats/opentelemetry: Optimize slice allocations (#7525)
Browse files Browse the repository at this point in the history
  • Loading branch information
ash2k committed Sep 3, 2024
1 parent cd05c9e commit 6147c81
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
25 changes: 17 additions & 8 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
s := status.Convert(err)
callLatency := float64(time.Since(startTime)) / float64(time.Second)
h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -188,7 +192,11 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
return
}

h.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
))
h.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
Expand Down Expand Up @@ -244,10 +252,11 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
}
}

clientAttributeOption := otelmetric.WithAttributes(attributes...)
h.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption)
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption)
// Allocate vararg slice once.
opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))}
h.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
}

const (
Expand Down
13 changes: 6 additions & 7 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func optionFromLabels(labelKeys []string, optionalLabelKeys []string, optionalLa
}
}
}
return otelmetric.WithAttributes(attributes...)
return otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))
}

// registryMetrics implements MetricsRecorder for the client and server stats
Expand Down Expand Up @@ -330,41 +330,40 @@ func (rm *registryMetrics) registerMetrics(metrics *estats.Metrics, meter otelme

func (rm *registryMetrics) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {
desc := handle.Descriptor()
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)

if ic, ok := rm.intCounts[desc]; ok {
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
ic.Add(context.TODO(), incr, ao)
}
}

func (rm *registryMetrics) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
desc := handle.Descriptor()
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
if fc, ok := rm.floatCounts[desc]; ok {
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
fc.Add(context.TODO(), incr, ao)
}
}

func (rm *registryMetrics) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {
desc := handle.Descriptor()
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
if ih, ok := rm.intHistos[desc]; ok {
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
ih.Record(context.TODO(), incr, ao)
}
}

func (rm *registryMetrics) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {
desc := handle.Descriptor()
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
if fh, ok := rm.floatHistos[desc]; ok {
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
fh.Record(context.TODO(), incr, ao)
}
}

func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {
desc := handle.Descriptor()
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
if ig, ok := rm.intGauges[desc]; ok {
ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...)
ig.Record(context.TODO(), incr, ao)
}
}
Expand Down
14 changes: 9 additions & 5 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStat
}
ai.pluginOptionLabels = labels
}
h.serverMetrics.callStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ai.method)))
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ai.method),
))
h.serverMetrics.callStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
Expand All @@ -253,10 +256,11 @@ func (h *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
attributes = append(attributes, otelattribute.String(k, v))
}

serverAttributeOption := otelmetric.WithAttributes(attributes...)
h.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption)
h.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), serverAttributeOption)
h.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), serverAttributeOption)
// Allocate vararg slice once.
opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))}
h.serverMetrics.callDuration.Record(ctx, latency, opts...)
h.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
}

const (
Expand Down

0 comments on commit 6147c81

Please sign in to comment.