Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add buffered_metrics object type #853

Merged
Merged
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- master
pull_request:
branches:
- master
- '*' # TODO: Revert when merged to master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }}
Expand Down
65 changes: 65 additions & 0 deletions datadog/dogstatsd/buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import random
from datadog.dogstatsd.metric_types import MetricType


class BufferedMetric(object):
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, name, value, tags, metric_type, max_metric_samples=0, specified_rate=1.0):
self.name = name
self.tags = tags
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = [value]
self.stored_metric_samples = 1
self.total_metric_samples = 1

def aggregate(self, value):
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1

def maybe_add_metric(self, value):
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
if self.max_metric_samples > 0:
if self.stored_metric_samples >= self.max_metric_samples:
i = random.randint(0, self.total_metric_samples - 1)
if i < self.max_metric_samples:
self.data[i] = value
else:
self.data.append(value)
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
self.stored_metric_samples += 1
self.total_metric_samples += 1
else:
self.aggregate(value)

def skip_metric(self):
self.total_metric_samples += 1

def flush(self):
total_metric_samples = self.total_metric_samples
if self.specified_rate != 1.0:
rate = self.specified_rate
else:
rate = self.stored_metric_samples / total_metric_samples

return {
'name': self.name,
'tags': self.tags,
'metric_type': self.metric_type,
'rate': rate,
'values': self.data[:]
}


class HistogramMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(HistogramMetric, self).__init__(name, value, tags, MetricType.HISTOGRAM, max_metric_samples, rate)


class DistributionMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(DistributionMetric, self).__init__(name, value, tags, MetricType.DISTRIBUTION, max_metric_samples, rate)


class TimingMetric(BufferedMetric):
def __init__(self, name, value, tags, max_metric_samples=0, rate=1.0):
super(TimingMetric, self).__init__(name, value, tags, MetricType.TIMING, max_metric_samples, rate)
3 changes: 3 additions & 0 deletions datadog/dogstatsd/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ class MetricType:
COUNT = "c"
GAUGE = "g"
SET = "s"
HISTOGRAM = "h"
DISTRIBUTION = "d"
TIMING = "ms"
104 changes: 104 additions & 0 deletions tests/unit/dogstatsd/test_buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import unittest
from datadog.dogstatsd.buffered_metrics import HistogramMetric, DistributionMetric, TimingMetric
from datadog.dogstatsd.metric_types import MetricType

class TestBufferedMetric(unittest.TestCase):

def test_new_histogram_metric(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.specified_rate, 1.0)
self.assertEqual(s.metric_type, MetricType.HISTOGRAM)

def test_histogram_metric_aggregate(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.specified_rate, 1.0)
self.assertEqual(s.metric_type, MetricType.HISTOGRAM)

def test_flush_histogram_metric_aggregate(self):
s = HistogramMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.HISTOGRAM)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.HISTOGRAM)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['rate'], 1.0)
self.assertEqual(m['tags'], "tag1,tag2")

def test_new_distribution_metric(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.DISTRIBUTION)

def test_distribution_metric_aggregate(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.DISTRIBUTION)

def test_flush_distribution_metric_aggregate(self):
s = DistributionMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.DISTRIBUTION)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.DISTRIBUTION)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

def test_new_timing_metric(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
self.assertEqual(s.data, [1.0])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.TIMING)

def test_timing_metric_aggregate(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
s.aggregate(123.45)
self.assertEqual(s.data, [1.0, 123.45])
self.assertEqual(s.name, "test")
self.assertEqual(s.tags, "tag1,tag2")
self.assertEqual(s.metric_type, MetricType.TIMING)

def test_flush_timing_metric_aggregate(self):
s = TimingMetric(name="test", value=1.0, tags="tag1,tag2", max_metric_samples=0, rate=1.0)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.TIMING)
self.assertEqual(m['values'], [1.0])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

s.aggregate(21)
s.aggregate(123.45)
m = s.flush()
self.assertEqual(m['metric_type'], MetricType.TIMING)
self.assertEqual(m['values'], [1.0, 21.0, 123.45])
self.assertEqual(m['name'], "test")
self.assertEqual(m['tags'], "tag1,tag2")

if __name__ == '__main__':
unittest.main()
Loading