Skip to content

Commit

Permalink
Add operating system cgroup stats to node-stats telemetry (#1663)
Browse files Browse the repository at this point in the history
* Enable collection of OS cgroup stats by default
* Add the node-stats-include-cgroup to the documentation
* Update telemetry tests
  • Loading branch information
inqueue committed Mar 9, 2023
1 parent 074d059 commit 47b514f
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ The node-stats telemetry device regularly calls the `cluster node-stats API <htt
* JVM buffer pool stats (key ``jvm.buffer_pools`` in the node-stats API)
* JVM gc stats (key ``jvm.gc`` in the node-stats API)
* OS mem stats (key ``os.mem`` in the node-stats API)
* OS cgroup stats (key ``os.cgroup`` in the node-stats API)
* JVM mem stats (key ``jvm.mem`` in the node-stats API)
* Circuit breaker stats (key ``breakers`` in the node-stats API)
* Network-related stats (key ``transport`` in the node-stats API)
Expand All @@ -132,6 +133,7 @@ Supported telemetry parameters:
* ``node-stats-include-breakers`` (default: ``true``): A boolean indicating whether circuit breaker stats should be included.
* ``node-stats-include-gc`` (default: ``true``): A boolean indicating whether JVM gc stats should be included.
* ``node-stats-include-mem`` (default: ``true``): A boolean indicating whether both JVM heap, and OS mem stats should be included.
* ``node-stats-include-cgroup`` (default: ``true``): A boolean to include operating system cgroup stats. Memory stats are omitted since Elasticsearch emits them as string values. Use ``os_mem_*`` fields instead.
* ``node-stats-include-network`` (default: ``true``): A boolean indicating whether network-related stats should be included.
* ``node-stats-include-process`` (default: ``true``): A boolean indicating whether process cpu stats should be included.
* ``node-stats-include-indexing-pressure`` (default: ``true``): A boolean indicating whether indexing pressuer stats should be included.
Expand Down
13 changes: 13 additions & 0 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ def on_benchmark_stop(self):

class NodeStatsRecorder:
def __init__(self, telemetry_params, cluster_name, client, metrics_store):
self.logger = logging.getLogger(__name__)
self.logger.info("node stats recorder")
self.sample_interval = telemetry_params.get("node-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
Expand Down Expand Up @@ -816,6 +818,7 @@ def __init__(self, telemetry_params, cluster_name, client, metrics_store):
self.include_network = telemetry_params.get("node-stats-include-network", True)
self.include_process = telemetry_params.get("node-stats-include-process", True)
self.include_mem_stats = telemetry_params.get("node-stats-include-mem", True)
self.include_cgroup_stats = telemetry_params.get("node-stats-include-cgroup", True)
self.include_gc_stats = telemetry_params.get("node-stats-include-gc", True)
self.include_indexing_pressure = telemetry_params.get("node-stats-include-indexing-pressure", True)
self.client = client
Expand Down Expand Up @@ -845,6 +848,8 @@ def record(self):
if self.include_mem_stats:
collected_node_stats.update(self.jvm_mem_stats(node_name, node_stats))
collected_node_stats.update(self.os_mem_stats(node_name, node_stats))
if self.include_cgroup_stats:
collected_node_stats.update(self.os_cgroup_stats(node_name, node_stats))
if self.include_gc_stats:
collected_node_stats.update(self.jvm_gc_stats(node_name, node_stats))
if self.include_network:
Expand Down Expand Up @@ -907,6 +912,14 @@ def jvm_mem_stats(self, node_name, node_stats):
def os_mem_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="os_mem", stats=node_stats["os"]["mem"])

def os_cgroup_stats(self, node_name, node_stats):
cgroup_stats = {}
try:
cgroup_stats = self.flatten_stats_fields(prefix="os_cgroup", stats=node_stats["os"]["cgroup"])
except KeyError:
self.logger.debug("Node cgroup stats requested with none present.")
return cgroup_stats

def jvm_gc_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="jvm_gc", stats=node_stats["jvm"]["gc"])

Expand Down
249 changes: 248 additions & 1 deletion tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,12 @@ class TestNodeStatsRecorder:
"jvm_gc_collectors_young_collection_time_in_millis": 309,
"jvm_gc_collectors_old_collection_count": 2,
"jvm_gc_collectors_old_collection_time_in_millis": 229,
"os_cgroup_cpuacct_usage_nanos": 1394207523870751,
"os_cgroup_cpu_cfs_period_micros": 100000,
"os_cgroup_cpu_cfs_quota_micros": 793162,
"os_cgroup_cpu_stat_number_of_elapsed_periods": 41092415,
"os_cgroup_cpu_stat_number_of_times_throttled": 41890,
"os_cgroup_cpu_stat_time_throttled_nanos": 29380593023188,
"os_mem_total_in_bytes": 62277025792,
"os_mem_free_in_bytes": 4934840320,
"os_mem_used_in_bytes": 57342185472,
Expand Down Expand Up @@ -2397,7 +2403,7 @@ def test_stores_all_nodes_stats(self, metrics_store_put_doc):
client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"node-stats-include-indices": True}
telemetry_params = {"node-stats-include-indices": True, "node-stats-include-cgroup": True}
recorder = telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)
recorder.record()

Expand Down Expand Up @@ -2476,6 +2482,12 @@ def test_stores_all_nodes_stats(self, metrics_store_put_doc):
"jvm_gc_collectors_young_collection_time_in_millis": 309,
"jvm_gc_collectors_old_collection_count": 2,
"jvm_gc_collectors_old_collection_time_in_millis": 229,
"os_cgroup_cpuacct_usage_nanos": 1394207523870751,
"os_cgroup_cpu_cfs_period_micros": 100000,
"os_cgroup_cpu_cfs_quota_micros": 793162,
"os_cgroup_cpu_stat_number_of_elapsed_periods": 41092415,
"os_cgroup_cpu_stat_number_of_times_throttled": 41890,
"os_cgroup_cpu_stat_time_throttled_nanos": 29380593023188,
"os_mem_total_in_bytes": 62277025792,
"os_mem_free_in_bytes": 4934840320,
"os_mem_used_in_bytes": 57342185472,
Expand Down Expand Up @@ -2789,6 +2801,12 @@ def test_stores_selected_indices_metrics_from_nodes_stats(self, metrics_store_pu
"jvm_gc_collectors_young_collection_time_in_millis": 309,
"jvm_gc_collectors_old_collection_count": 2,
"jvm_gc_collectors_old_collection_time_in_millis": 229,
"os_cgroup_cpuacct_usage_nanos": 1394207523870751,
"os_cgroup_cpu_cfs_period_micros": 100000,
"os_cgroup_cpu_cfs_quota_micros": 793162,
"os_cgroup_cpu_stat_number_of_elapsed_periods": 41092415,
"os_cgroup_cpu_stat_number_of_times_throttled": 41890,
"os_cgroup_cpu_stat_time_throttled_nanos": 29380593023188,
"os_mem_total_in_bytes": 62277025792,
"os_mem_free_in_bytes": 4934840320,
"os_mem_used_in_bytes": 57342185472,
Expand Down Expand Up @@ -2833,6 +2851,235 @@ def test_exception_when_include_indices_metrics_not_valid(self):
):
telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_logs_debug_on_missing_cgroup_stats(self, metrics_store_put_doc):
node_stats_response = {
"cluster_name": "elasticsearch",
"nodes": {
"Zbl_e8EyRXmiR47gbHgPfg": {
"timestamp": 1524379617017,
"name": "rally0",
"transport_address": "127.0.0.1:9300",
"host": "127.0.0.1",
"ip": "127.0.0.1:9300",
"roles": [
"master",
"data",
"ingest",
],
"indices": {
"docs": {
"count": 76892364,
"deleted": 324530,
},
"store": {
"size_in_bytes": 983409834,
},
"indexing": {
"is_throttled": False,
"throttle_time_in_millis": 0,
},
"search": {
"open_contexts": 0,
"query_total": 0,
"query_time_in_millis": 0,
},
"merges": {
"current": 0,
"current_docs": 0,
"current_size_in_bytes": 0,
},
"refresh": {
"total": 747,
"total_time_in_millis": 277382,
"listeners": 0,
},
"query_cache": {
"memory_size_in_bytes": 0,
"total_count": 0,
"hit_count": 0,
"miss_count": 0,
"cache_size": 0,
"cache_count": 0,
"evictions": 0,
},
"fielddata": {
"memory_size_in_bytes": 6936,
"evictions": 17,
},
"completion": {
"size_in_bytes": 0,
},
"segments": {
"count": 0,
"memory_in_bytes": 0,
"max_unsafe_auto_id_timestamp": -9223372036854775808,
"file_sizes": {},
},
"translog": {
"operations": 0,
"size_in_bytes": 0,
"uncommitted_operations": 0,
"uncommitted_size_in_bytes": 0,
},
"request_cache": {
"memory_size_in_bytes": 0,
"evictions": 0,
"hit_count": 0,
"miss_count": 0,
},
"recovery": {
"current_as_source": 0,
"current_as_target": 0,
"throttle_time_in_millis": 0,
},
},
"jvm": {
"buffer_pools": {
"mapped": {
"count": 7,
"used_in_bytes": 3120,
"total_capacity_in_bytes": 9999,
},
"direct": {
"count": 6,
"used_in_bytes": 73868,
"total_capacity_in_bytes": 73867,
},
},
"classes": {
"current_loaded_count": 9992,
"total_loaded_count": 9992,
"total_unloaded_count": 0,
},
"mem": {
"heap_used_in_bytes": 119073552,
"heap_used_percent": 19,
"heap_committed_in_bytes": 626393088,
"heap_max_in_bytes": 626393088,
"non_heap_used_in_bytes": 110250424,
"non_heap_committed_in_bytes": 118108160,
"pools": {
"young": {
"used_in_bytes": 66378576,
"max_in_bytes": 139591680,
"peak_used_in_bytes": 139591680,
"peak_max_in_bytes": 139591680,
},
"survivor": {
"used_in_bytes": 358496,
"max_in_bytes": 17432576,
"peak_used_in_bytes": 17432576,
"peak_max_in_bytes": 17432576,
},
"old": {
"used_in_bytes": 52336480,
"max_in_bytes": 469368832,
"peak_used_in_bytes": 52336480,
"peak_max_in_bytes": 469368832,
},
},
},
"gc": {
"collectors": {
"young": {
"collection_count": 3,
"collection_time_in_millis": 309,
},
"old": {
"collection_count": 2,
"collection_time_in_millis": 229,
},
}
},
},
"process": {
"timestamp": 1526045135857,
"open_file_descriptors": 312,
"max_file_descriptors": 1048576,
"cpu": {
"percent": 10,
"total_in_millis": 56520,
},
"mem": {
"total_virtual_in_bytes": 2472173568,
},
},
"os": {
"timestamp": 1655950949872,
"cpu": {"percent": 3, "load_average": {"1m": 3.38, "5m": 3.79, "15m": 3.84}},
"mem": {
"total_in_bytes": 62277025792,
"free_in_bytes": 4934840320,
"used_in_bytes": 57342185472,
"free_percent": 8,
"used_percent": 92,
},
"swap": {"total_in_bytes": 0, "free_in_bytes": 0, "used_in_bytes": 0},
},
"thread_pool": {
"generic": {
"threads": 4,
"queue": 0,
"active": 0,
"rejected": 0,
"largest": 4,
"completed": 8,
},
},
"transport": {
"server_open": 12,
"rx_count": 77,
"rx_size_in_bytes": 98723498,
"tx_count": 88,
"tx_size_in_bytes": 23879803,
},
"breakers": {
"parent": {
"limit_size_in_bytes": 726571417,
"limit_size": "692.9mb",
"estimated_size_in_bytes": 0,
"estimated_size": "0b",
"overhead": 1.0,
"tripped": 0,
}
},
"indexing_pressure": {
"memory": {
"current": {
"combined_coordinating_and_primary_in_bytes": 0,
"coordinating_in_bytes": 0,
"primary_in_bytes": 0,
"replica_in_bytes": 0,
"all_in_bytes": 0,
},
"total": {
"combined_coordinating_and_primary_in_bytes": 0,
"coordinating_in_bytes": 0,
"primary_in_bytes": 0,
"replica_in_bytes": 0,
"all_in_bytes": 0,
"coordinating_rejections": 0,
"primary_rejections": 0,
"replica_rejections": 0,
},
}
},
}
},
}

client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
logger = logging.getLogger("esrally.telemetry")
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"node-stats-include-cgroup": True}
recorder = telemetry.NodeStatsRecorder(telemetry_params, cluster_name="remote", client=client, metrics_store=metrics_store)

with mock.patch.object(logger, "debug") as mocked_debug:
recorder.record()
mocked_debug.assert_called_once_with("Node cgroup stats requested with none present.")


class TestTransformStats:
def test_negative_sample_interval_forbidden(self):
Expand Down

0 comments on commit 47b514f

Please sign in to comment.