diff --git a/distributed/config.py b/distributed/config.py index b6e5f3d0d3..659f147545 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -97,6 +97,7 @@ def _initialize_logging_old_style(config: dict[Any, Any]) -> None: loggers: dict[str, str | int] = { # default values "distributed": "info", "distributed.client": "warning", + "distributed.gc": "warning", "bokeh": "error", "tornado": "critical", "tornado.application": "error", diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index a7ec037e27..26b88322d0 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -3,6 +3,7 @@ distributed: # logging: # distributed: info # distributed.client: warning + # distributed.gc: warning # bokeh: error # # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr # tornado: critical diff --git a/distributed/utils_perf.py b/distributed/gc.py similarity index 90% rename from distributed/utils_perf.py rename to distributed/gc.py index ad406d6135..5a02411ae3 100644 --- a/distributed/utils_perf.py +++ b/distributed/gc.py @@ -11,8 +11,10 @@ from dask.utils import format_bytes from distributed.metrics import thread_time +from distributed.utils import RateLimiterFilter logger = _logger = logging.getLogger(__name__) +logger.addFilter(RateLimiterFilter("full garbage collections took", rate="60s")) class ThrottledGC: @@ -24,7 +26,7 @@ class ThrottledGC: collect() does nothing when repeated calls are so costly and so frequent that the thread would spend more than max_in_gc_frac doing GC. - warn_if_longer is a duration in seconds (10s by default) that can be used + warn_if_longer is a duration in seconds (1s by default) that can be used to log a warning level message whenever an actual call to gc.collect() lasts too long. """ @@ -160,8 +162,8 @@ class GCDiagnosis: N_SAMPLES = 30 - def __init__(self, warn_over_frac=0.1, info_over_rss_win=10 * 1e6): - self._warn_over_frac = warn_over_frac + def __init__(self, info_over_frac=0.1, info_over_rss_win=10 * 1e6): + self._info_over_frac = info_over_frac self._info_over_rss_win = info_over_rss_win self._enabled = False self._fractional_timer = None @@ -206,22 +208,25 @@ def _gc_callback(self, phase, info): assert phase == "stop" self._fractional_timer.stop_timing() frac = self._fractional_timer.running_fraction - if frac is not None and frac >= self._warn_over_frac: - logger.warning( + if frac is not None: + level = logging.INFO if frac >= self._info_over_frac else logging.DEBUG + logger.log( + level, "full garbage collections took %d%% CPU time " "recently (threshold: %d%%)", 100 * frac, - 100 * self._warn_over_frac, + 100 * self._info_over_frac, ) rss_saved = self._gc_rss_before - rss - if rss_saved >= self._info_over_rss_win: - logger.info( - "full garbage collection released %s " - "from %d reference cycles (threshold: %s)", - format_bytes(rss_saved), - info["collected"], - format_bytes(self._info_over_rss_win), - ) + level = logging.INFO if rss_saved >= self._info_over_rss_win else logging.DEBUG + logger.log( + level, + "full garbage collection released %s " + "from %d reference cycles (threshold: %s)", + format_bytes(rss_saved), + info["collected"], + format_bytes(self._info_over_rss_win), + ) if info["uncollectable"] > 0: # This should ideally never happen on Python 3, but who knows? logger.warning( diff --git a/distributed/http/scheduler/prometheus/core.py b/distributed/http/scheduler/prometheus/core.py index 9240c98d7c..173c0c6200 100644 --- a/distributed/http/scheduler/prometheus/core.py +++ b/distributed/http/scheduler/prometheus/core.py @@ -8,12 +8,12 @@ from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily from distributed.core import Status +from distributed.gc import gc_collect_duration from distributed.http.prometheus import PrometheusCollector from distributed.http.scheduler.prometheus.semaphore import SemaphoreMetricCollector from distributed.http.scheduler.prometheus.stealing import WorkStealingMetricCollector from distributed.http.utils import RequestHandler from distributed.scheduler import ALL_TASK_STATES, Scheduler -from distributed.utils_perf import gc_collect_duration class SchedulerMetricCollector(PrometheusCollector): diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 4f0c2cb100..b6d7172c72 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -8,9 +8,9 @@ import prometheus_client from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily, Metric +from distributed.gc import gc_collect_duration from distributed.http.prometheus import PrometheusCollector from distributed.http.utils import RequestHandler -from distributed.utils_perf import gc_collect_duration from distributed.worker import Worker logger = logging.getLogger("distributed.prometheus.worker") diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4f31ed44aa..7dc30b8893 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -101,6 +101,7 @@ from distributed.diagnostics.memory_sampler import MemorySamplerExtension from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name from distributed.event import EventExtension +from distributed.gc import disable_gc_diagnosis, enable_gc_diagnosis from distributed.http import get_handlers from distributed.metrics import time from distributed.multi_lock import MultiLockExtension @@ -136,7 +137,6 @@ scatter_to_workers, unpack_remotedata, ) -from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis from distributed.variable import VariableExtension from distributed.worker import _normalize_task diff --git a/distributed/tests/test_utils_perf.py b/distributed/tests/test_gc.py similarity index 91% rename from distributed/tests/test_utils_perf.py rename to distributed/tests/test_gc.py index 2e66141b2a..3d8b4b7264 100644 --- a/distributed/tests/test_utils_perf.py +++ b/distributed/tests/test_gc.py @@ -8,8 +8,8 @@ import pytest +from distributed.gc import FractionalTimer, GCDiagnosis, disable_gc_diagnosis from distributed.metrics import thread_time -from distributed.utils_perf import FractionalTimer, GCDiagnosis, disable_gc_diagnosis from distributed.utils_test import captured_logger, run_for @@ -78,7 +78,7 @@ def enable_gc_diagnosis_and_log(diag, level="INFO"): if gc.callbacks: print("Unexpected gc.callbacks", gc.callbacks) - with captured_logger("distributed.utils_perf", level=level, propagate=False) as sio: + with captured_logger("distributed.gc", level=level, propagate=False) as sio: gc.disable() gc.collect() # drain any leftover from previous tests diag.enable() @@ -89,17 +89,18 @@ def enable_gc_diagnosis_and_log(diag, level="INFO"): gc.enable() -@pytest.mark.slow +# @pytest.mark.slow def test_gc_diagnosis_cpu_time(): - diag = GCDiagnosis(warn_over_frac=0.75) + diag = GCDiagnosis(info_over_frac=0.75) diag.N_SAMPLES = 3 # shorten tests - with enable_gc_diagnosis_and_log(diag, level="WARN") as sio: + with enable_gc_diagnosis_and_log(diag, level="INFO") as sio: # Spend some CPU time doing only full GCs for _ in range(diag.N_SAMPLES): gc.collect() assert not sio.getvalue() gc.collect() + gc.collect() lines = sio.getvalue().splitlines() assert len(lines) == 1 # Between 80% and 100% @@ -108,7 +109,7 @@ def test_gc_diagnosis_cpu_time(): lines[0], ) - with enable_gc_diagnosis_and_log(diag, level="WARN") as sio: + with enable_gc_diagnosis_and_log(diag, level="INFO") as sio: # Spend half the CPU time doing full GCs for _ in range(diag.N_SAMPLES + 1): t1 = thread_time() diff --git a/distributed/worker.py b/distributed/worker.py index ac4231a2a6..5778d60dac 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -83,6 +83,7 @@ from distributed.diagnostics.plugin import WorkerPlugin, _get_plugin_name from distributed.diskutils import WorkSpace from distributed.exceptions import Reschedule +from distributed.gc import disable_gc_diagnosis, enable_gc_diagnosis from distributed.http import get_handlers from distributed.metrics import context_meter, thread_time, time from distributed.node import ServerNode @@ -114,7 +115,6 @@ wait_for, ) from distributed.utils_comm import gather_from_workers, pack_data, retry_operation -from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis from distributed.versions import get_versions from distributed.worker_memory import ( DeprecatedMemoryManagerAttribute, diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 7484263a3d..ac2f3c8188 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -40,10 +40,10 @@ from distributed import system from distributed.compatibility import WINDOWS, PeriodicCallback from distributed.core import Status +from distributed.gc import ThrottledGC from distributed.metrics import context_meter, monotonic from distributed.spill import ManualEvictProto, SpillBuffer from distributed.utils import RateLimiterFilter, has_arg, log_errors -from distributed.utils_perf import ThrottledGC if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10)