Skip to content

Commit

Permalink
Reduce noise from GC-related logging (#8804)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Jul 30, 2024
1 parent 40fcd65 commit c44ad22
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 25 deletions.
1 change: 1 addition & 0 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def _initialize_logging_old_style(config: dict[Any, Any]) -> None:
loggers: dict[str, str | int] = { # default values
"distributed": "info",
"distributed.client": "warning",
"distributed.gc": "warning",
"bokeh": "error",
"tornado": "critical",
"tornado.application": "error",
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ distributed:
# logging:
# distributed: info
# distributed.client: warning
# distributed.gc: warning
# bokeh: error
# # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr
# tornado: critical
Expand Down
33 changes: 19 additions & 14 deletions distributed/utils_perf.py → distributed/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from dask.utils import format_bytes

from distributed.metrics import thread_time
from distributed.utils import RateLimiterFilter

logger = _logger = logging.getLogger(__name__)
logger.addFilter(RateLimiterFilter("full garbage collections took", rate="60s"))


class ThrottledGC:
Expand All @@ -24,7 +26,7 @@ class ThrottledGC:
collect() does nothing when repeated calls are so costly and so frequent
that the thread would spend more than max_in_gc_frac doing GC.
warn_if_longer is a duration in seconds (10s by default) that can be used
warn_if_longer is a duration in seconds (1s by default) that can be used
to log a warning level message whenever an actual call to gc.collect()
lasts too long.
"""
Expand Down Expand Up @@ -160,8 +162,8 @@ class GCDiagnosis:

N_SAMPLES = 30

def __init__(self, warn_over_frac=0.1, info_over_rss_win=10 * 1e6):
self._warn_over_frac = warn_over_frac
def __init__(self, info_over_frac=0.1, info_over_rss_win=10 * 1e6):
self._info_over_frac = info_over_frac
self._info_over_rss_win = info_over_rss_win
self._enabled = False
self._fractional_timer = None
Expand Down Expand Up @@ -206,22 +208,25 @@ def _gc_callback(self, phase, info):
assert phase == "stop"
self._fractional_timer.stop_timing()
frac = self._fractional_timer.running_fraction
if frac is not None and frac >= self._warn_over_frac:
logger.warning(
if frac is not None:
level = logging.INFO if frac >= self._info_over_frac else logging.DEBUG
logger.log(
level,
"full garbage collections took %d%% CPU time "
"recently (threshold: %d%%)",
100 * frac,
100 * self._warn_over_frac,
100 * self._info_over_frac,
)
rss_saved = self._gc_rss_before - rss
if rss_saved >= self._info_over_rss_win:
logger.info(
"full garbage collection released %s "
"from %d reference cycles (threshold: %s)",
format_bytes(rss_saved),
info["collected"],
format_bytes(self._info_over_rss_win),
)
level = logging.INFO if rss_saved >= self._info_over_rss_win else logging.DEBUG
logger.log(
level,
"full garbage collection released %s "
"from %d reference cycles (threshold: %s)",
format_bytes(rss_saved),
info["collected"],
format_bytes(self._info_over_rss_win),
)
if info["uncollectable"] > 0:
# This should ideally never happen on Python 3, but who knows?
logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/scheduler/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

from distributed.core import Status
from distributed.gc import gc_collect_duration
from distributed.http.prometheus import PrometheusCollector
from distributed.http.scheduler.prometheus.semaphore import SemaphoreMetricCollector
from distributed.http.scheduler.prometheus.stealing import WorkStealingMetricCollector
from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES, Scheduler
from distributed.utils_perf import gc_collect_duration


class SchedulerMetricCollector(PrometheusCollector):
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import prometheus_client
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily, Metric

from distributed.gc import gc_collect_duration
from distributed.http.prometheus import PrometheusCollector
from distributed.http.utils import RequestHandler
from distributed.utils_perf import gc_collect_duration
from distributed.worker import Worker

logger = logging.getLogger("distributed.prometheus.worker")
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from distributed.diagnostics.memory_sampler import MemorySamplerExtension
from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name
from distributed.event import EventExtension
from distributed.gc import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.http import get_handlers
from distributed.metrics import time
from distributed.multi_lock import MultiLockExtension
Expand Down Expand Up @@ -136,7 +137,6 @@
scatter_to_workers,
unpack_remotedata,
)
from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.variable import VariableExtension
from distributed.worker import _normalize_task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

import pytest

from distributed.gc import FractionalTimer, GCDiagnosis, disable_gc_diagnosis
from distributed.metrics import thread_time
from distributed.utils_perf import FractionalTimer, GCDiagnosis, disable_gc_diagnosis
from distributed.utils_test import captured_logger, run_for


Expand Down Expand Up @@ -78,7 +78,7 @@ def enable_gc_diagnosis_and_log(diag, level="INFO"):
if gc.callbacks:
print("Unexpected gc.callbacks", gc.callbacks)

with captured_logger("distributed.utils_perf", level=level, propagate=False) as sio:
with captured_logger("distributed.gc", level=level, propagate=False) as sio:
gc.disable()
gc.collect() # drain any leftover from previous tests
diag.enable()
Expand All @@ -89,17 +89,18 @@ def enable_gc_diagnosis_and_log(diag, level="INFO"):
gc.enable()


@pytest.mark.slow
# @pytest.mark.slow
def test_gc_diagnosis_cpu_time():
diag = GCDiagnosis(warn_over_frac=0.75)
diag = GCDiagnosis(info_over_frac=0.75)
diag.N_SAMPLES = 3 # shorten tests

with enable_gc_diagnosis_and_log(diag, level="WARN") as sio:
with enable_gc_diagnosis_and_log(diag, level="INFO") as sio:
# Spend some CPU time doing only full GCs
for _ in range(diag.N_SAMPLES):
gc.collect()
assert not sio.getvalue()
gc.collect()
gc.collect()
lines = sio.getvalue().splitlines()
assert len(lines) == 1
# Between 80% and 100%
Expand All @@ -108,7 +109,7 @@ def test_gc_diagnosis_cpu_time():
lines[0],
)

with enable_gc_diagnosis_and_log(diag, level="WARN") as sio:
with enable_gc_diagnosis_and_log(diag, level="INFO") as sio:
# Spend half the CPU time doing full GCs
for _ in range(diag.N_SAMPLES + 1):
t1 = thread_time()
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
from distributed.diagnostics.plugin import WorkerPlugin, _get_plugin_name
from distributed.diskutils import WorkSpace
from distributed.exceptions import Reschedule
from distributed.gc import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.http import get_handlers
from distributed.metrics import context_meter, thread_time, time
from distributed.node import ServerNode
Expand Down Expand Up @@ -114,7 +115,6 @@
wait_for,
)
from distributed.utils_comm import gather_from_workers, pack_data, retry_operation
from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.versions import get_versions
from distributed.worker_memory import (
DeprecatedMemoryManagerAttribute,
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
from distributed import system
from distributed.compatibility import WINDOWS, PeriodicCallback
from distributed.core import Status
from distributed.gc import ThrottledGC
from distributed.metrics import context_meter, monotonic
from distributed.spill import ManualEvictProto, SpillBuffer
from distributed.utils import RateLimiterFilter, has_arg, log_errors
from distributed.utils_perf import ThrottledGC

if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.10)
Expand Down

0 comments on commit c44ad22

Please sign in to comment.