Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[statsd] Disable statsd buffering by default #692

Merged
merged 3 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)
from datadog.dogstatsd.route import get_default_route
from datadog.util.compat import is_p3k, text
from datadog.util.deprecation import deprecated
from datadog.util.format import normalize_tags
from datadog.version import __version__

Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
disable_buffering=False, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
constant_tags=None, # type: Optional[List[str]]
use_ms=False, # type: bool
Expand Down Expand Up @@ -352,11 +351,11 @@ def _dedicated_telemetry_destination(self):
return bool(self.telemetry_socket_path or self.telemetry_host)

def __enter__(self):
self._reset_buffer()
self.open_buffer()
return self

def __exit__(self, exc_type, value, traceback):
self.flush()
self.close_buffer()

@staticmethod
def resolve_host(host, use_default_route):
Expand Down Expand Up @@ -436,10 +435,8 @@ def _get_udp_socket(cls, host, port):

return sock

@deprecated("Statsd module now uses buffering by default.")
def open_buffer(self, max_buffer_size=None):
"""
WARNING: Deprecated method - all operations are now buffered by default.
Open a buffer to send a batch of metrics.

To take advantage of automatic flushing, you should use the context manager instead
Expand All @@ -453,16 +450,16 @@ def open_buffer(self, max_buffer_size=None):

self._manual_buffer_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is not thread-safe. I presume it never really was and I assume that's beyond the expected use-cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@truthbk Generally things should be thread-safe due to various locks on the socket/buffer/context ops but you may get unexpectedly-buffered metrics if some threads are opening/closing buffers while another thread is just writing metrics without a context manager or open_buffer. Overall though, there should be no data loss or exceptions. As for the setter here, the lock + GIL makes this effectively an atomic operation. We do have a test around at least part of this here. If I'm missing something though, I'll fix up the PR for sure.

I presume it never really was and I assume that's beyond the expected use-cases.

Somewhat. The way it was coded originally was not really thread safe but the changes made subsequently over this year have closed up most of the glaring exception issues. The buffering-by-default fixed the self._send = GIL reliance but we're backing that part out with this PR for the time being 😢 .


if max_buffer_size is not None:
log.warning("The parameter max_buffer_size is now deprecated and is not used anymore")

self._reset_buffer()

@deprecated("Statsd module now uses buffering by default.")
def close_buffer(self):
"""
WARNING: Deprecated method - all operations are now buffered by default.

Flush the buffer and switch back to single metric packets.

Note: This method must be called after a matching open_buffer()
Expand All @@ -471,6 +468,9 @@ def close_buffer(self):
try:
self.flush()
finally:
# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_server

self._manual_buffer_lock.release()

def _reset_buffer(self):
Expand Down
34 changes: 2 additions & 32 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,28 +159,8 @@ an instance of :class:`datadog.threadstats.ThreadStats`::

datadog.dogstatsd
=================
:mod:`datadog.dogstatsd` is a Python client for DogStatsd that automatically
buffers submitted metrics and submits them to the server asynchronously or at
maximum sizes for a packet that would prevent fragmentation.

.. note::

To ensure that all the metrics are sent to the server, either use the
context-managed instance of :class:`~datadog.dogstatsd.base.DogStatsd`
or when you are finished with the client perform a manual :code:`flush()`.
Otherwise the buffered data may get de-allocated before being sent.


.. warning::

:class:`~datadog.dogstatsd.base.DogStatsd` instances are not :code:`fork()`-safe
because the automatic buffer flushing occurs in a thread only on the
process that created the :class:`~datadog.dogstatsd.base.DogStatsd`
instance. Because of this, instances of those clients must not be copied
from a parent process to a child process. Instead, the parent process and
each child process must create their own instances of the client or the
buffering must be globally disabled by using the :code:`disable_buffering`
initialization flag.
:mod:`datadog.dogstatsd` is a Python client for DogStatsd that submits metrics
to the Agent.


Usage
Expand All @@ -192,7 +172,6 @@ Usage

client = DogStatsd()
client.increment("home.page.hits")
client.flush()


.. autoclass:: datadog.dogstatsd.base.DogStatsd
Expand All @@ -209,15 +188,6 @@ Usage
>>> initialize(statsd_host="localhost", statsd_port=8125)
>>> statsd.increment("home.page.hits")

.. warning::

Global :class:`~datadog.dogstatsd.base.DogStatsd` is not :code:`fork()`-safe
because the automatic buffer flushing occurs in a thread only on the
process that created the :class:`~datadog.dogstatsd.base.DogStatsd`
instance. Because of this, the parent process and each child process must
create their own instances of the client or the buffering must be globally
disabled by using the :code:`disable_buffering` initialization flag.


Get in Touch
============
Expand Down
38 changes: 27 additions & 11 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,17 @@ def test_batching(self):
)

def test_flush(self):
self.statsd.increment('page.views')
self.assertIsNone(self.recv(no_wait=True))
self.statsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', self.recv(2))
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
dogstatsd = DogStatsd(flush_interval=1, telemetry_min_flush_interval=0)
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

Expand Down Expand Up @@ -940,6 +944,7 @@ def test_disable_buffering(self):

def test_flush_disable(self):
dogstatsd = DogStatsd(
disable_buffering=False,
flush_interval=0,
telemetry_min_flush_interval=0
)
Expand All @@ -955,6 +960,7 @@ def test_flush_disable(self):
time.sleep(0.3)
self.assertIsNone(fake_socket.recv(no_wait=True))

@unittest.skip("Buffering has been disabled again so the deprecation is not valid")
@patch("warnings.warn")
def test_manual_buffer_ops_deprecation(self, mock_warn):
self.assertFalse(mock_warn.called)
Expand Down Expand Up @@ -1108,7 +1114,7 @@ def test_telemetry(self):
self.assertEqual(0, self.statsd.packets_dropped)

def test_telemetry_flush_interval(self):
dogstatsd = DogStatsd()
dogstatsd = DogStatsd(disable_buffering=False)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

Expand Down Expand Up @@ -1173,7 +1179,7 @@ def test_telemetry_flush_interval_alternate_destination(self):
self.assertTrue(time1 < dogstatsd._last_flush_time)

def test_telemetry_flush_interval_batch(self):
dogstatsd = DogStatsd()
dogstatsd = DogStatsd(disable_buffering=False)

fake_socket = FakeSocket()
dogstatsd.socket = fake_socket
Expand Down Expand Up @@ -1279,7 +1285,7 @@ def test_context_manager(self):
def test_batched_buffer_autoflush(self):
fake_socket = FakeSocket()
bytes_sent = 0
with DogStatsd(telemetry_min_flush_interval=0) as dogstatsd:
with DogStatsd(telemetry_min_flush_interval=0, disable_buffering=False) as dogstatsd:
dogstatsd.socket = fake_socket

self.assertEqual(dogstatsd._max_payload_size, UDP_OPTIMAL_PAYLOAD_LENGTH)
Expand Down Expand Up @@ -1453,7 +1459,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
)

def test_default_max_udp_packet_size(self):
dogstatsd = DogStatsd(flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=10000, disable_telemetry=True)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand All @@ -1469,7 +1475,12 @@ def test_default_max_udp_packet_size(self):
payload = dogstatsd.socket.recv()

def test_default_max_uds_packet_size(self):
dogstatsd = DogStatsd(socket_path="fake", flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(
disable_buffering=False,
socket_path="fake",
flush_interval=10000,
disable_telemetry=True,
)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand All @@ -1485,7 +1496,12 @@ def test_default_max_uds_packet_size(self):
payload = dogstatsd.socket.recv()

def test_custom_max_packet_size(self):
dogstatsd = DogStatsd(max_buffer_len=4000, flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(
disable_buffering=False,
max_buffer_len=4000,
flush_interval=10000,
disable_telemetry=True,
)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand Down