Skip to content

Commit

Permalink
add buffered_metrics object type (#853)
Browse files Browse the repository at this point in the history
* add buffered_metrics object type

* update metric_types to include histogram, distribution, timing

* Run tests on any branch
  • Loading branch information
andrewqian2001datadog authored Sep 10, 2024
1 parent 7b721e3 commit c171911
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- master
pull_request:
branches:
- master
- '*' # TODO: Revert when merged to master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }}
Expand Down
65 changes: 65 additions & 0 deletions datadog/dogstatsd/buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import random
from datadog.dogstatsd.metric_types import MetricType


class BufferedMetric(object):
def __init__(self, name, value, tags, metric_type, max_metric_samples=0, specified_rate=1.0):
self.name = name
self.tags = tags
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = [value]
self.stored_metric_samples = 1
self.total_metric_samples = 1

def aggregate(self, value):
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1

def maybe_add_metric(self, value):
if self.max_metric_samples > 0:
if self.stored_metric_samples >= self.max_metric_samples:
i = random.randint(0, self.total_metric_samples - 1)
if i < self.max_metric_samples:
self.data[i] = value
else:
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1
else:
self.aggregate(value)

def skip_metric(self):
self.total_metric_samples += 1

def flush(self):
total_metric_samples = self.total_metric_samples
if self.specified_rate != 1.0:
rate = self.specified_rate
else:
rate = self.stored_metric_samples / total_metric_samples

return {
'name': self.name,
'tags': self.tags,
'metric_type': self.metric_type,
'rate': rate,
'values': self.data[:]
}


class HistogramMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(HistogramMetric, self).__init__(name, value, tags, MetricType.HISTOGRAM, max_metric_samples, rate)


class DistributionMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(DistributionMetric, self).__init__(name, value, tags, MetricType.DISTRIBUTION, max_metric_samples, rate)


class TimingMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(TimingMetric, self).__init__(name, value, tags, MetricType.TIMING, max_metric_samples, rate)
3 changes: 3 additions & 0 deletions datadog/dogstatsd/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ class MetricType:
COUNT = "c"
GAUGE = "g"
SET = "s"
HISTOGRAM = "h"
DISTRIBUTION = "d"
TIMING = "ms"
104 changes: 104 additions & 0 deletions tests/unit/dogstatsd/test_buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import unittest
from datadog.dogstatsd.buffered_metrics import HistogramMetric, DistributionMetric, TimingMetric
from datadog.dogstatsd.metric_types import MetricType

class TestBufferedMetric(unittest.TestCase):

def test_new_histogram_metric(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.specified_rate, 1.0)
self.assertEqual(s.metric_type, MetricType.HISTOGRAM)

def test_histogram_metric_aggregate(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.specified_rate, 1.0)
self.assertEqual(s.metric_type, MetricType.HISTOGRAM)

def test_flush_histogram_metric_aggregate(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.HISTOGRAM)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.HISTOGRAM)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['rate'], 1.0)
self.assertEqual(m['tags'], "tag1,tag2")

def test_new_distribution_metric(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.DISTRIBUTION)

def test_distribution_metric_aggregate(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.DISTRIBUTION)

def test_flush_distribution_metric_aggregate(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.DISTRIBUTION)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.DISTRIBUTION)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

def test_new_timing_metric(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.TIMING)

def test_timing_metric_aggregate(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.TIMING)

def test_flush_timing_metric_aggregate(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.TIMING)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.TIMING)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

if __name__ == '__main__':
unittest.main()

0 comments on commit c171911

Please sign in to comment.