Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve reactor_tick_time metric #11724

Merged
merged 9 commits into from
Jan 17, 2022
1 change: 1 addition & 0 deletions changelog.d/11723.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify calculation of prometheus metrics for garbage collection.
3 changes: 2 additions & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics import register_threadpool
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.types import ISynapseReactor
Expand Down Expand Up @@ -159,6 +159,7 @@ def run() -> None:
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
install_gc_manager()
run_command()

# make sure that we run the reactor with the sentinel log context,
Expand Down
186 changes: 3 additions & 183 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import functools
import gc
import itertools
import logging
import os
Expand Down Expand Up @@ -41,7 +40,6 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
)
Expand All @@ -56,13 +54,13 @@
generate_latest,
start_http_server,
)
from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)

METRICS_PREFIX = "/_synapse/metrics"

running_on_pypy = platform.python_implementation() == "PyPy"
all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {}

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
Expand Down Expand Up @@ -369,121 +367,6 @@ def collect(self) -> Iterable[Metric]:

REGISTRY.register(CPUMetrics())

#
# Python GC metrics
#

gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[
0.0025,
0.005,
0.01,
0.025,
0.05,
0.10,
0.25,
0.50,
1.00,
2.50,
5.00,
7.50,
15.00,
30.00,
45.00,
60.00,
],
)


class GCCounts:
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)

yield cm


if not running_on_pypy:
REGISTRY.register(GCCounts())


#
# PyPy GC / memory metrics
#


class PyPyGCStats:
def collect(self) -> Iterable[Metric]:

# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
# unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
stats = gc.get_stats(memory_pressure=False) # type: ignore
# @s contains same fields as @stats, but as actual integers.
s = stats._s # type: ignore

# also note that field naming is completely braindead
# and only vaguely correlates with the pretty-printed table.
# >>>> gc.get_stats(False)
# Total memory consumed:
# GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory
# in arenas: 3.0MB # s.total_arena_memory
# rawmalloced: 1.7MB # s.total_rawmalloced_memory
# nursery: 4.0MB # s.nursery_size
# raw assembler used: 31.0kB # s.jit_backend_used
# -----------------------------
# Total: 8.8MB # stats.memory_used_sum
#
# Total memory allocated:
# GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory
# in arenas: 30.9MB # s.peak_arena_memory
# rawmalloced: 4.1MB # s.peak_rawmalloced_memory
# nursery: 4.0MB # s.nursery_size
# raw assembler allocated: 1.0MB # s.jit_backend_allocated
# -----------------------------
# Total: 39.7MB # stats.memory_allocated_sum
#
# Total time spent in GC: 0.073 # s.total_gc_time

pypy_gc_time = CounterMetricFamily(
"pypy_gc_time_seconds_total",
"Total time spent in PyPy GC",
labels=[],
)
pypy_gc_time.add_metric([], s.total_gc_time / 1000)
yield pypy_gc_time

pypy_mem = GaugeMetricFamily(
"pypy_memory_bytes",
"Memory tracked by PyPy allocator",
labels=["state", "class", "kind"],
)
# memory used by JIT assembler
pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
# memory used by GCed objects
pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
# totals
pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
yield pypy_mem


if running_on_pypy:
REGISTRY.register(PyPyGCStats())


#
# Twisted reactor metrics
Expand All @@ -494,11 +377,6 @@ def collect(self) -> Iterable[Metric]:
"Tick time of the Twisted reactor (sec)",
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
)
pending_calls_metric = Histogram(
"python_twisted_reactor_pending_calls",
"Pending calls",
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
)
squahtx marked this conversation as resolved.
Show resolved Hide resolved

#
# Federation Metrics
Expand Down Expand Up @@ -612,35 +490,12 @@ def collect(self) -> Iterable[Metric]:

REGISTRY.register(ReactorLastSeenMetric())

# The minimum time in seconds between GCs for each generation, regardless of the current GC
# thresholds and counts.
MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)

# The time (in seconds since the epoch) of the last time we did a GC for each generation.
_last_gc = [0.0, 0.0, 0.0]


F = TypeVar("F", bound=Callable[..., Any])


def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
@functools.wraps(func)
def f(*args: Any, **kwargs: Any) -> Any:
now = reactor.seconds()
num_pending = 0

# _newTimedCalls is one long list of *all* pending calls. Below loop
# is based off of impl of reactor.runUntilCurrent
for delayed_call in reactor._newTimedCalls:
if delayed_call.time > now:
break

if delayed_call.delayed_time > 0:
continue

num_pending += 1

num_pending += len(reactor.threadCallQueue)
start = time.time()
ret = func(*args, **kwargs)
end = time.time()
Expand All @@ -651,61 +506,24 @@ def f(*args: Any, **kwargs: Any) -> Any:
# I/O events, but that is harder to capture without rewriting half the
# reactor.
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)

# Update the time we last ticked, for the metric to test whether
# Synapse's reactor has frozen
global last_ticked
last_ticked = end

if running_on_pypy:
return ret

# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
# promote an object into gen 2, and we don't want to handle the same
# object multiple times.
threshold = gc.get_threshold()
counts = gc.get_count()
for i in (2, 1, 0):
# We check if we need to do one based on a straightforward
# comparison between the threshold and count. We also do an extra
# check to make sure that we don't a GC too often.
if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
if i == 0:
logger.debug("Collecting gc %d", i)
else:
logger.info("Collecting gc %d", i)

start = time.time()
unreachable = gc.collect(i)
end = time.time()

_last_gc[i] = end

gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)

return ret

return cast(F, f)


try:
# Ensure the reactor has all the attributes we expect
reactor.seconds # type: ignore
reactor.runUntilCurrent # type: ignore
reactor._newTimedCalls # type: ignore
reactor.threadCallQueue # type: ignore

# runUntilCurrent is called when we have pending calls. It is called once
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore

# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
if not running_on_pypy:
gc.disable()
except AttributeError:
pass

Expand All @@ -717,4 +535,6 @@ def f(*args: Any, **kwargs: Any) -> Any:
"LaterGauge",
"InFlightGauge",
"GaugeBucketCollector",
"MIN_TIME_BETWEEN_GCS",
"install_gc_manager",
]
Loading