Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add other metric types and weight (part 2) #1949

Merged
merged 7 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 140 additions & 7 deletions dart/lib/src/metrics/metric.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:math';

import 'package:meta/meta.dart';

import '../../sentry.dart';
Expand Down Expand Up @@ -26,8 +28,31 @@ abstract class Metric {
required this.tags,
});

factory Metric.fromType({
required final MetricType type,
required final String key,
required final num value,
required final SentryMeasurementUnit unit,
required final Map<String, String> tags,
}) {
switch (type) {
case MetricType.counter:
return CounterMetric._(value: value, key: key, unit: unit, tags: tags);
case MetricType.gauge:
return GaugeMetric._(value: value, key: key, unit: unit, tags: tags);
case MetricType.set:
return SetMetric._(value: value, key: key, unit: unit, tags: tags);
case MetricType.distribution:
return DistributionMetric._(
value: value, key: key, unit: unit, tags: tags);
}
}

/// Add a value to the metric.
add(double value);
add(num value);

/// Return the weight of the current metric.
int getWeight();

/// Serialize the value into a list of Objects to be converted into a String.
Iterable<Object> _serializeValue();
Expand Down Expand Up @@ -100,31 +125,139 @@ abstract class Metric {
input.replaceAll(forbiddenUnitCharsRegex, '_');
}

@internal

/// Metric [MetricType.counter] that tracks a value that can only be incremented.
@internal
class CounterMetric extends Metric {
double value;
num value;

CounterMetric({
CounterMetric._({
required this.value,
required super.key,
required super.unit,
required super.tags,
}) : super(type: MetricType.counter);

@override
add(double value) => this.value += value;
add(num value) => this.value += value;

@override
Iterable<Object> _serializeValue() => [value];

@override
int getWeight() => 1;
}

/// Metric [MetricType.gauge] that tracks a value that can go up and down.
@internal
class GaugeMetric extends Metric {
num _last;
num _minimum;
num _maximum;
num _sum;
int _count;

GaugeMetric._({
required num value,
required super.key,
required super.unit,
required super.tags,
}) : _last = value,
_minimum = value,
_maximum = value,
_sum = value,
_count = 1,
super(type: MetricType.gauge);

@override
add(num value) {
_last = value;
_minimum = min(_minimum, value);
_maximum = max(_maximum, value);
_sum += value;
_count++;
}

@override
Iterable<Object> _serializeValue() =>
[_last, _minimum, _maximum, _sum, _count];

@override
int getWeight() => 5;

@visibleForTesting
num get last => _last;
@visibleForTesting
num get minimum => _minimum;
@visibleForTesting
num get maximum => _maximum;
@visibleForTesting
num get sum => _sum;
@visibleForTesting
int get count => _count;
}

/// Metric [MetricType.set] that tracks a set of values on which you can perform
/// aggregations such as count_unique.
@internal
class SetMetric extends Metric {
final Set<int> _values = {};

SetMetric._(
{required num value,
required super.key,
required super.unit,
required super.tags})
: super(type: MetricType.set) {
add(value);
}

@override
add(num value) => _values.add(value.toInt());
denrase marked this conversation as resolved.
Show resolved Hide resolved

@override
Iterable<Object> _serializeValue() => _values;

@override
int getWeight() => _values.length;

@visibleForTesting
Set<num> get values => _values;
}

/// Metric [MetricType.distribution] that tracks a list of values.
@internal
class DistributionMetric extends Metric {
final List<num> _values = [];

DistributionMetric._(
{required num value,
required super.key,
required super.unit,
required super.tags})
: super(type: MetricType.distribution) {
add(value);
}

@override
add(num value) => _values.add(value);

@override
Iterable<Object> _serializeValue() => _values;

@override
int getWeight() => _values.length;

@visibleForTesting
List<num> get values => _values;
}

/// The metric type and its associated statsd encoded value.
@internal
enum MetricType {
counter('c');
counter('c'),
gauge('g'),
distribution('d'),
set('s');

final String statsdType;

Expand Down
120 changes: 74 additions & 46 deletions dart/lib/src/metrics/metrics_aggregator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ import 'metric.dart';
/// Class that aggregates all metrics into time buckets and sends them.
@internal
class MetricsAggregator {
static final _defaultFlushShiftMs =
(Random().nextDouble() * (_rollupInSeconds * 1000)).toInt();
static const _defaultFlushInterval = Duration(seconds: 5);
static const _defaultMaxWeight = 100000;
static const int _rollupInSeconds = 10;

final Duration _flushInterval;
final int _flushShiftMs;
final SentryOptions _options;
final Hub _hub;
final int _maxWeight;
int _totalWeight = 0;
bool _isClosed = false;
@visibleForTesting
Completer<void>? flushCompleter;
Completer<void>? _flushCompleter;
Timer? _flushTimer;

/// The key for this map is the timestamp of the bucket, rounded down to the
/// nearest RollupInSeconds. So it aggregates all the metrics over a certain
Expand All @@ -29,20 +36,22 @@ class MetricsAggregator {
MetricsAggregator({
required SentryOptions options,
Hub? hub,
@visibleForTesting Duration flushInterval = const Duration(seconds: 5),
@visibleForTesting Duration? flushInterval,
@visibleForTesting int? flushShiftMs,
@visibleForTesting int? maxWeight,
}) : _options = options,
_hub = hub ?? HubAdapter(),
_flushInterval = flushInterval,
_flushShiftMs = flushShiftMs ??
(Random().nextDouble() * (_rollupInSeconds * 1000)).toInt();
_flushInterval = flushInterval ?? _defaultFlushInterval,
_flushShiftMs = flushShiftMs ?? _defaultFlushShiftMs,
_maxWeight = maxWeight ?? _defaultMaxWeight;

/// Creates or update an existing Counter metric with [value].
/// The metric to update is identified using [key], [unit] and [tags].
/// The [timestamp] represents when the metric was emitted.
void increment(
void emit(
MetricType metricType,
String key,
double value,
num value,
SentryMeasurementUnit unit,
Map<String, String> tags,
) {
Expand All @@ -52,8 +61,12 @@ class MetricsAggregator {

final bucketKey = _getBucketKey(_options.clock());
final bucket = _buckets.putIfAbsent(bucketKey, () => {});
final metric =
CounterMetric(value: value, key: key, unit: unit, tags: tags);
final metric = Metric.fromType(
type: metricType, key: key, value: value, unit: unit, tags: tags);

final oldWeight = bucket[metric.getCompositeKey()]?.getWeight() ?? 0;
final addedWeight = metric.getWeight();
_totalWeight += addedWeight - oldWeight;

// Update the existing metric in the bucket.
// If absent, add the newly created metric to the bucket.
Expand All @@ -67,56 +80,67 @@ class MetricsAggregator {
_scheduleFlush();
}

Future<void> _scheduleFlush() async {
if (!_isClosed &&
_buckets.isNotEmpty &&
flushCompleter?.isCompleted != false) {
flushCompleter = Completer();

await flushCompleter?.future
.timeout(_flushInterval, onTimeout: _flushMetrics);
void _scheduleFlush() {
if (!_isClosed && _buckets.isNotEmpty) {
if (_isOverWeight()) {
_flushTimer?.cancel();
_flush(false);
return;
}
if (_flushTimer?.isActive != true) {
_flushCompleter = Completer();
_flushTimer = Timer(_flushInterval, () => _flush(false));
}
}
}

/// Flush the metrics, then schedule next flush again.
void _flushMetrics() async {
await _flush();
bool _isOverWeight() => _totalWeight >= _maxWeight;

flushCompleter?.complete(null);
flushCompleter = null;
await _scheduleFlush();
int getBucketWeight(final Map<String, Metric> bucket) {
int weight = 0;
for (final metric in bucket.values) {
weight += metric.getWeight();
}
return weight;
}

/// Flush and sends metrics.
Future<void> _flush() async {
final flushableBucketKeys = _getFlushableBucketKeys();
if (flushableBucketKeys.isEmpty) {
_options.logger(SentryLevel.debug, 'Metrics: nothing to flush');
return;
/// Flush the metrics, then schedule next flush again.
void _flush(bool force) async {
if (!force && _isOverWeight()) {
_options.logger(SentryLevel.info,
"Metrics: total weight exceeded, flushing all buckets");
force = true;
}

final Map<int, Iterable<Metric>> bucketsToFlush = {};
int numMetrics = 0;

for (int flushableBucketKey in flushableBucketKeys) {
final bucket = _buckets.remove(flushableBucketKey);
if (bucket != null) {
numMetrics += bucket.length;
bucketsToFlush[flushableBucketKey] = bucket.values;
final flushableBucketKeys = _getFlushableBucketKeys(force);
if (flushableBucketKeys.isEmpty) {
_options.logger(SentryLevel.debug, 'Metrics: nothing to flush');
} else {
final Map<int, Iterable<Metric>> bucketsToFlush = {};

for (final flushableBucketKey in flushableBucketKeys) {
final bucket = _buckets.remove(flushableBucketKey);
if (bucket != null && bucket.isNotEmpty) {
_totalWeight -= getBucketWeight(bucket);
bucketsToFlush[flushableBucketKey] = bucket.values;
}
}
await _hub.captureMetrics(bucketsToFlush);
}

if (numMetrics == 0) {
_options.logger(SentryLevel.debug, 'Metrics: only empty buckets found');
return;
}

_options.logger(SentryLevel.debug, 'Metrics: capture $numMetrics metrics');
await _hub.captureMetrics(bucketsToFlush);
// Notify flush completed and reschedule flushing
_flushTimer?.cancel();
_flushTimer = null;
flushCompleter?.complete(null);
_flushCompleter = null;
_scheduleFlush();
}

/// Return a list of bucket keys to flush.
List<int> _getFlushableBucketKeys() {
List<int> _getFlushableBucketKeys(bool force) {
if (force) {
return buckets.keys.toList();
}
// Flushable buckets are all buckets with timestamp lower than the current
// one (so now - rollupInSeconds), minus a random duration (flushShiftMs).
final maxTimestampToFlush = _options.clock().subtract(Duration(
Expand All @@ -140,7 +164,11 @@ class MetricsAggregator {
@visibleForTesting
SplayTreeMap<int, Map<String, Metric>> get buckets => _buckets;

@visibleForTesting
Completer<void>? get flushCompleter => _flushCompleter;

void close() {
_flush(true);
_isClosed = true;
}
}
Loading
Loading