Skip to content

Commit

Permalink
Add option to use socket timeouts.
Browse files Browse the repository at this point in the history
settimeout(0) is equivalent to setblocking(0), so need only one call.

Neither UDP or nor unix datagram sockets block on connect, so handling
timeout in get_socket is not necessary.
  • Loading branch information
vickenty committed Jul 13, 2023
1 parent 42d7678 commit 72c5918
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def __init__(
max_buffer_len=0, # type: int
container_id=None, # type: Optional[Text]
origin_detection_enabled=True, # type: bool
socket_timeout=0, # type: Optional[float]
telemetry_socket_timeout=0, # type: Optional[float]
): # type: (...) -> None
"""
Initialize a DogStatsd object.
Expand Down Expand Up @@ -239,6 +241,17 @@ def __init__(
Default: True.
More on this: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp
:type origin_detection_enabled: boolean
:param socket_timeout: Set timeout for socket operations, in seconds. Optional.
If sets to zero, never wait if operation can not be completed immediately. If set to None, wait forever.
This option does not affect hostname resolution when using UDP.
:type socket_timeout: float
:param telemetry_socket_timeout: Set timeout for the telemetry socket operations. Optional.
Effective only if either telemetry_host or telemetry_socket_path are set.
If sets to zero, never wait if operation can not be completed immediately. If set to None, wait forever.
This option does not affect hostname resolution when using UDP.
:type socket_timeout: float
"""

self._socket_lock = Lock()
Expand Down Expand Up @@ -276,6 +289,7 @@ def __init__(

# Connection
self._max_payload_size = max_buffer_len
self.socket_timeout = socket_timeout
if socket_path is not None:
self.socket_path = socket_path # type: Optional[text]
self.host = None
Expand All @@ -294,6 +308,7 @@ def __init__(
self.telemetry_socket_path = telemetry_socket_path
self.telemetry_host = None
self.telemetry_port = None
self.telemetry_socket_timeout = telemetry_socket_timeout
if not telemetry_socket_path and telemetry_host:
self.telemetry_socket_path = None
self.telemetry_host = self.resolve_host(telemetry_host, use_default_route)
Expand Down Expand Up @@ -479,22 +494,25 @@ def get_socket(self, telemetry=False):
if self.telemetry_socket_path is not None:
self.telemetry_socket = self._get_uds_socket(
self.telemetry_socket_path,
self.telemetry_socket_timeout,
)
else:
self.telemetry_socket = self._get_udp_socket(
self.telemetry_host,
self.telemetry_port,
self.telemetry_socket_timeout,
)

return self.telemetry_socket

if not self.socket:
if self.socket_path is not None:
self.socket = self._get_uds_socket(self.socket_path)
self.socket = self._get_uds_socket(self.socket_path, self.socket_timeout)
else:
self.socket = self._get_udp_socket(
self.host,
self.port,
self.socket_timeout,
)

return self.socket
Expand All @@ -513,17 +531,17 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
pass

@classmethod
def _get_uds_socket(cls, socket_path):
def _get_uds_socket(cls, socket_path, timeout):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
return sock

@classmethod
def _get_udp_socket(cls, host, port):
def _get_udp_socket(cls, host, port, timeout):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect((host, port))

Expand Down

0 comments on commit 72c5918

Please sign in to comment.