Skip to content

Commit

Permalink
Improve forking support (#804)
Browse files Browse the repository at this point in the history
* Add forking helpers

* Add disable_background_sender method.

* Add stop method.

* Add opt-out

* Simplify locking

Instead of separate locks for buffering and sender modes
configuration, use the same lock.

* Move locks out of _start/_stop_sender_thread methods

* Don't start flush thread if we are about to fork

* Add per-instance opt-out from tracking

* Document global fork hooks

* Remove warning when stopping already stopped thread

Now that disable_buffering and pre_fork may both try to stop the flush thread
at the same time, attempt to stop an already stopped thread does not
indicate a bug in the client code.

* Remove some redundant locking

Single field assignement is atomic according to Python FAQ, so we
don't need to protect reads and writes to self._queue as such. The
only place where we need a lock is when stopping the sender, to ensure
that the Stop marker is the last thing to be placed in there.

https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe

* Add type annotation for mypy

* Silence type error on os.register_at_fork

os.register_at_fork is not available on all python versions, notably
python 2.7. While we check for the availability explicitly, mypy
cannot see through that and complains.

* Fix formatting

* Fix docstring

* Update docs to reference stop() rather than wait_for_pending().

* Update docstring for enable_background_sender

The method is now thread safe, and can be used with forking applications.

* Fix docstring for enable_background_sender

Re-format parameter descriptions so they are rendered correctly in the
html docs.

* Document new environment variables.
  • Loading branch information
vickenty authored Jan 10, 2024
1 parent c80f986 commit a51acab
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 46 deletions.
227 changes: 184 additions & 43 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
import time
from threading import Lock, RLock
import weakref

try:
import queue
Expand Down Expand Up @@ -93,6 +94,35 @@
]
) + "\n"

Stop = object()

SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None)
TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None)

_instances = weakref.WeakSet() # type: weakref.WeakSet


def pre_fork():
"""Prepare all client instances for a process fork.
If SUPPORTS_FORKING is true, this will be called automatically before os.fork().
"""
for c in _instances:
c.pre_fork()


def post_fork():
"""Restore all client instances after a fork.
If SUPPORTS_FORKING is true, this will be called automatically after os.fork().
"""
for c in _instances:
c.post_fork()


if SUPPORTS_FORKING:
os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore


# pylint: disable=useless-object-inheritance,too-many-instance-attributes
# pylint: disable=too-many-arguments,too-many-locals
Expand Down Expand Up @@ -125,6 +155,7 @@ def __init__(
disable_background_sender=True, # type: bool
sender_queue_size=0, # type: int
sender_queue_timeout=0, # type: Optional[float]
track_instance=True, # type: bool
): # type: (...) -> None
"""
Initialize a DogStatsd object.
Expand Down Expand Up @@ -172,6 +203,13 @@ def __init__(
for origin detection.
:type DD_ORIGIN_DETECTION_ENABLED: boolean
:envvar DD_DOGSTATSD_DISABLE_FORK_SUPPORT: Don't install global fork hooks with os.register_at_fork.
Global fork hooks then need to be called manually before and after calling os.fork.
:type DD_DOGSTATSD_DISABLE_FORK_SUPPORT: boolean
:envvar DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: Don't register instances of this class with global fork hooks.
:type DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: boolean
:param host: the host of the DogStatsd server.
:type host: string
Expand Down Expand Up @@ -268,7 +306,7 @@ def __init__(
:param disable_background_sender: Use a background thread to communicate with the dogstatsd server. Optional.
When enabled, a background thread will be used to send metric payloads to the Agent.
Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
Applications should call stop() before exiting to make sure all pending payloads are sent.
Default: True.
:type disable_background_sender: boolean
Expand All @@ -283,6 +321,11 @@ def __init__(
If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait)
:type sender_queue_timeout: float
:param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called,
if supported.
Default: True.
:type track_instance: boolean
"""

self._socket_lock = Lock()
Expand Down Expand Up @@ -387,10 +430,8 @@ def __init__(

self._reset_buffer()

# This lock is used for all cases where buffering functionality is
# being toggled (by `open_buffer()`, `close_buffer()`, or
# `self._disable_buffering` calls).
self._buffering_toggle_lock = RLock()
# This lock is used for all cases where client configuration is being changed: buffering, sender mode.
self._config_lock = RLock()

# If buffering is disabled, we bypass the buffer function.
self._send = self._send_to_buffer
Expand All @@ -399,17 +440,27 @@ def __init__(
self._send = self._send_to_server
log.debug("Statsd buffering is disabled")

# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False

# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.
self._flush_interval = flush_interval
self._flush_thread_stop = threading.Event()
self._flush_thread = None
self._start_flush_thread(self._flush_interval)

self._queue = None
self._sender_thread = None
self._sender_enabled = False

if not disable_background_sender:
self.enable_background_sender(sender_queue_size, sender_queue_timeout)

if TRACK_INSTANCES and track_instance:
_instances.add(self)

@property
def socket_path(self):
return self._socket_path
Expand All @@ -430,39 +481,43 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
Use a background thread to communicate with the dogstatsd server.
When enabled, a background thread will be used to send metric payloads to the Agent.
Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
This method is not thread safe and should not be called concurrently with other methods on the current object.
Normally, this should be called shortly after process initialization (for example from a post-fork hook in a
forking server).
Applications should call stop() before exiting to make sure all pending payloads are sent.
:param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional
How may packets to queue before blocking or dropping the packet if the packet queue is already full.
Default: 0 (unlimited).
:type sender_queue_size: integer
Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications
arrange to call pre_fork() and post_fork() module functions around calls to os.fork().
:param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional.
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
If set to None, wait forever.
If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait)
:type sender_queue_timeout: float
:param sender_queue_size: Set the maximum number of packets to queue for the sender.
How many packets to queue before blocking or dropping the packet if the packet queue is already full.
Default: 0 (unlimited).
:type sender_queue_size: integer, optional
:param sender_queue_timeout: Set timeout for packet queue operations, in seconds.
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
If set to None, wait forever. If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait).
:type sender_queue_timeout: float, optional
"""

# Avoid a race on _queue with the background buffer flush thread that reads _queue.
with self._buffer_lock:
if self._queue is not None:
return

self._queue = queue.Queue(sender_queue_size)
self._start_sender_thread()
with self._config_lock:
self._sender_enabled = True
self._sender_queue_size = sender_queue_size
if sender_queue_timeout is None:
self._queue_blocking = True
self._queue_timeout = None
else:
self._queue_blocking = sender_queue_timeout > 0
self._queue_timeout = max(0, sender_queue_timeout)

self._start_sender_thread()

def disable_background_sender(self):
"""Disable background sender mode.
This call will block until all previously queued payloads are sent.
"""
with self._config_lock:
self._sender_enabled = False
self._stop_sender_thread()

def disable_telemetry(self):
self._telemetry = False

Expand All @@ -475,6 +530,12 @@ def _start_flush_thread(self, flush_interval):
log.debug("Statsd periodic buffer flush is disabled")
return

if self._forking:
return

if self._flush_thread is not None:
return

def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
Expand All @@ -496,7 +557,6 @@ def _flush_thread_loop(self, flush_interval):
# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
log.warning("No statsd flush thread to stop")
return

try:
Expand Down Expand Up @@ -525,12 +585,12 @@ def __exit__(self, exc_type, value, traceback):

@property
def disable_buffering(self):
with self._buffering_toggle_lock:
with self._config_lock:
return self._disable_buffering

@disable_buffering.setter
def disable_buffering(self, is_disabled):
with self._buffering_toggle_lock:
with self._config_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_buffering == is_disabled:
return
Expand Down Expand Up @@ -672,7 +732,7 @@ def open_buffer(self, max_buffer_size=None):
Note: This method must be called before close_buffer() matching invocation.
"""

self._buffering_toggle_lock.acquire()
self._config_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer
Expand All @@ -696,7 +756,7 @@ def close_buffer(self):
if self._disable_buffering:
self._send = self._send_to_server

self._buffering_toggle_lock.release()
self._config_lock.release()

def _reset_buffer(self):
with self._buffer_lock:
Expand Down Expand Up @@ -986,13 +1046,17 @@ def _is_telemetry_flush_time(self):
self._last_flush_time + self._telemetry_flush_interval < time.time()

def _send_to_server(self, packet):
# Skip the lock if the queue is None. There is no race with enable_background_sender.
if self._queue is not None:
try:
self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout)
except queue.Full:
self.packets_dropped_queue += 1
self.bytes_dropped_queue += 1
return
# Prevent a race with disable_background_sender.
with self._buffer_lock:
if self._queue is not None:
try:
self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout)
except queue.Full:
self.packets_dropped_queue += 1
self.bytes_dropped_queue += 1
return

self._xmit_packet_with_telemetry(packet + '\n')

Expand Down Expand Up @@ -1231,27 +1295,104 @@ def _set_container_id(self, container_id, origin_detection_enabled):
self._container_id = None

def _start_sender_thread(self):
if not self._sender_enabled or self._forking:
return

if self._queue is not None:
return

self._queue = queue.Queue(self._sender_queue_size)

log.debug("Starting background sender thread")
self._sender_thread = threading.Thread(
name="{}_sender_thread".format(self.__class__.__name__),
target=self._sender_main_loop,
args=(self._queue,)
)
self._sender_thread.daemon = True
self._sender_thread.start()

def _sender_main_loop(self):
def _stop_sender_thread(self):
# Lock ensures that nothing gets added to the queue after we disable it.
with self._buffer_lock:
if not self._queue:
return
self._queue.put(Stop)
self._queue = None

self._sender_thread.join()
self._sender_thread = None

def _sender_main_loop(self, queue):
while True:
self._xmit_packet_with_telemetry(self._queue.get())
self._queue.task_done()
item = queue.get()
if item is Stop:
queue.task_done()
return
self._xmit_packet_with_telemetry(item)
queue.task_done()

def wait_for_pending(self):
"""
Flush the buffer and wait for all queued payloads to be written to the server.
"""

self.flush()
if self._queue is not None:
self._queue.join()

# Avoid race with disable_background_sender. We don't need a
# lock, just copy the value so it doesn't change between the
# check and join later.
queue = self._queue

if queue is not None:
queue.join()

def pre_fork(self):
"""Prepare client for a process fork.
Flush any pending payloads, stop all background threads and
close the connection. Once the function returns.
The client should not be used from this point until
post_fork() is called.
"""
log.debug("[%d] pre_fork for %s", os.getpid(), self)

self._forking = True

with self._config_lock:
self._stop_flush_thread()
self._stop_sender_thread()
self.close_socket()

def post_fork(self):
"""Restore the client state after a fork."""

log.debug("[%d] post_fork for %s", os.getpid(), self)

with self._socket_lock:
if self.socket or self.telemetry_socket:
log.warning("Open socket detected after fork. Call pre_fork() before os.fork().")
self.close_socket()

self._forking = False

with self._config_lock:
self._start_flush_thread(self._flush_interval)
self._start_sender_thread()

def stop(self):
"""Stop the client.
Disable buffering, background sender and flush any pending payloads to the server.
Client remains usable after this method, but sending metrics may block if socket_timeout is enabled.
"""

self.disable_background_sender()
self.disable_buffering = True
self.flush()
self.close_socket()


statsd = DogStatsd()
9 changes: 9 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ Usage
>>> initialize(statsd_host="localhost", statsd_port=8125)
>>> statsd.increment("home.page.hits")

.. data:: datadog.dogstatsd.base.SUPPORTS_FORKING

Indicates whether the Python runtime supports os.register_at_fork(). When
true, buffering and background sender can be safely used in applications
that use os.fork().

.. autofunction:: datadog.dogstatsd.base.pre_fork
.. autofunction:: datadog.dogstatsd.base.post_fork


Get in Touch
============
Expand Down
Loading

0 comments on commit a51acab

Please sign in to comment.