Skip to content

Commit

Permalink
working but without omit-instances
Browse files Browse the repository at this point in the history
  • Loading branch information
jelly committed Jul 5, 2024
1 parent b811e4c commit e303bd2
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 75 deletions.
2 changes: 1 addition & 1 deletion pkg/metrics/metrics.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ const HISTORY_METRICS = [
{ name: "disk.all.total_bytes", derive: "rate" },

// network utilization
{ name: "network.interface.total.bytes", derive: "rate", "omit-instances": ["lo"] },
{ name: "network.interface.total.bytes", derive: "rate" },
];

function debug() {
Expand Down
143 changes: 76 additions & 67 deletions src/cockpit/channels/pcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sys
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, NamedTuple, Sequence
from typing import TYPE_CHECKING, Any, DefaultDict, Iterable, List, Mapping, NamedTuple, Sequence

from ..channel import AsyncChannel, ChannelError
from ..jsonutil import JsonObject, JsonValue, get_int, get_objv, get_str, get_strv
Expand All @@ -37,7 +37,8 @@
logger = logging.getLogger(__name__)


Sample = Mapping[str, float | list[float] | None]
Sample = Mapping[str, float | List[float] | None]
Instances = DefaultDict[str, list[str]]


class PcpMetricInfo(dict[str, JsonValue]):
Expand Down Expand Up @@ -69,12 +70,14 @@ def try_import_pcp() -> None:

class ArchiveInfo:
metric_descriptions: List[MetricInfo]
instance_descriptions: Instances

def __init__(self, context: 'pmapi.pmContext', start: float, path: str) -> None:
self.context = context
self.start = start
self.path = path
self.metric_descriptions = []
self.instance_descriptions = defaultdict(list)

def sort_key(self) -> float:
return self.start
Expand Down Expand Up @@ -141,7 +144,7 @@ def get_context_and_name(source: str) -> 'tuple[str, str]':
context_type = c_api.PM_CONTEXT_ARCHIVE
elif source == 'pcp-archive':
hostname = platform.node()
archive_dir = f'{pmapi.pmContext.pmGetConfig("PCP_LOG_DIR")}/pmlogger/{hostname}'
archive_dir = pmapi.pmContext.pmGetConfig("PCP_LOG_DIR")
name = f'{archive_dir}/pmlogger/{hostname}'
context_type = c_api.PM_CONTEXT_ARCHIVE
elif source == 'direct':
Expand Down Expand Up @@ -189,20 +192,18 @@ def verify_archives(self, archives):
...

@staticmethod
def convert_metric_description(context: 'pmapi.pmContext', metric: JsonObject):
def convert_metric_description(context: 'pmapi.pmContext', metric: JsonObject) -> MetricInfo:
name = get_str(metric, 'name', '')
if name == '':
raise ChannelError('protocol-error',
message='invalid "metrics" option was specified (no name for metric)')
units = get_str(metric, 'units', '')
derive = get_str(metric, 'derive', '')
print("metrics", metric)

try:
pm_ids = context.pmLookupName(name)
except pmapi.pmErr as exc:
if exc.errno() == c_api.PM_ERR_NAME:
print('err', exc)
raise ChannelError('not-found', message=f'no such metric: {name}') from None
else:
raise ChannelError('internal-error', message=str(exc)) from None
Expand All @@ -220,7 +221,6 @@ def convert_metric_description(context: 'pmapi.pmContext', metric: JsonObject):
if units:
try:
[units_buf, factor] = context.pmParseUnitsStr(units)
print(units_buf, factor)
except pmapi.pmErr as exc:
if exc.errno() == c_api.PM_ERR_NAME:
raise ChannelError('not-found', message=f'no such metric: {name}') from None
Expand Down Expand Up @@ -263,7 +263,7 @@ def semantic_val(sem_id: int):
elif sem_id == c_api.PM_SEM_DISCRETE:
return "discrete"

def send_meta(self, archive) -> None:
def send_meta(self, archive: ArchiveInfo) -> None:
# C build_meta in cockpitpcpmetrics.c
metrics = []
for metric_desc in archive.metric_descriptions:
Expand All @@ -276,9 +276,14 @@ def send_meta(self, archive) -> None:
desc['derive'] = metric_desc.derive

# Instances
# if metric_desc.indom != c_api.PM_INDOM_NULL:
# pass
# # TODO: instances
if metric_desc.desc.indom != c_api.PM_INDOM_NULL:
instances = archive.instance_descriptions.get(metric_desc.id)
if instances is not None:
insts = []
for x in instances:
insts.append(x)
# HACK Set is ordered, but calling `list(set)` is not
desc['instances'] = insts

# Units
if metric_desc.factor == 1.0:
Expand Down Expand Up @@ -320,7 +325,7 @@ def sample(self, archive, total_fetched):
# HACK: This is some pcp weirdness where it only accepts a PCP type list and not a Python list
# PMIDS <pcp.pmapi.c_uint_Array_1 object at 0x7ab92eaddb50>
results = context.pmFetch(pmids)
fetched.append(self.parse_fetched_results(context, results, descs))
fetched.append(self.parse_fetched_results(archive, context, results, descs))
total_fetched += 1

self.send_updates(archive, fetched)
Expand All @@ -337,7 +342,7 @@ def sample(self, archive, total_fetched):

return total_fetched

def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any, descs: Any) -> Sample:
def parse_fetched_results(self, archive, context: 'pmapi.pmContext', results: Any, descs: Any) -> Sample:
metrics = list(self.metrics)
samples: dict[str, float | list[float]] = {}

Expand All @@ -352,97 +357,88 @@ def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any, descs:
pass # continue?
# TODO: don't pass descs, look up via archive.metrics_descriptions?
elif descs[i].indom == c_api.PM_INDOM_NULL: # Single instance
values = self.build_sample(context, results, descs, i, 0)
values = self.build_sample(archive, context, results, descs, i, 0)
else: # Multi instance
vals = []
print("CONTENT", dir(valueset))
print("bonk", valueset.numval, valueset.contents.vlist, descs)
for j in range(numval - 1):
vals.append(self.build_sample(context, results, descs, i, j))
# values = vals
for j in range(numval):
vals.append(self.build_sample(archive, context, results, descs, i, j))
values = vals
# raise NotImplementedError('multi value handling, see C code')

samples[metrics[i].name] = values
# values: dict[str, float] | float = defaultdict()
# instances: list[str] | None = None
# value_count = results.contents.get_numval(i)
#
# if value_count > 1:
# _, instances = context.pmGetInDom(indom=descs[i].contents.indom)
#
# content_type = descs[i].contents.type
# print(value_count, instances, content_type)
# for j in range(value_count):
# atom = context.pmExtractValue(results.contents.get_valfmt(i),
# results.contents.get_vlist(i, j),
# content_type,
# content_type)
#
# if value_count > 1:
# assert isinstance(instances, list)
# assert isinstance(values, dict)
# values[instances[j]] = atom.dref(content_type)
# else:
# # TODO does float() need to be here?
# values = float(atom.dref(content_type))
#
# samples[metrics[i].name] = values

return samples

def build_sample(self, context, results, descs, metric, instance):
def build_sample(self, archive, context, results, descs, metric: int, instance: int):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
try:
desc = descs[metric]
except IndexError:
logging.debug("no description found for metric=%s", metric)
return

instanced = desc.indom != c_api.PM_INDOM_NULL
pmid = results.contents.get_pmid(metric)

logger.debug("build_sample pmid=%d, metric=%d, instance=%d", pmid, metric, instance)
# Unsupported type
content_type = desc.type
# TODO: PM_TYPE_AGGREGATE_FULL? or PM_TYPE_STRING?
if content_type == c_api.PM_TYPE_AGGREGATE or content_type == c_api.PM_TYPE_EVENT:
return

valueset = results.contents.get_vset(metric)
if valueset.numval <= instance:
return
# TODO: This check seems to be there for multi value types as the C code passes `j` along.
# But is it really needed?
# if (result->vset[metric]->numval <= instance)
# return;
valueset = results.contents.get_vset(metric)
if valueset.numval <= instance:
return

valfmt = results.contents.get_valfmt(metric)
print("contexts length", valueset.contents.vlist)
value = results.contents.get_vlist(metric, instance)
print("build_sample", valfmt, value, content_type)

# HACK: we return a description of the instances but this requires a handle on results
if instanced:
pmid = results.contents.get_pmid(metric)
instance_desc = context.pmNameInDom(desc, value.inst)
# HACK HACK HACK!
if instance_desc not in archive.instance_descriptions[pmid]:
archive.instance_descriptions[pmid].append(instance_desc)

sample_value = None
if content_type == c_api.PM_TYPE_64:
atom = context.pmExtractValue(valfmt,
value,
c_api.PM_TYPE_64,
c_api.PM_TYPE_64)
sample_value = (atom.ll << 16) >> 16
try:
atom = context.pmExtractValue(valfmt,
value,
c_api.PM_TYPE_64,
c_api.PM_TYPE_64)
sample_value = (atom.ll << 16) >> 16
except Exception as exc:
logger.exception("Unable to extract PCP TYPE_64 value %s", exc)
elif content_type == c_api.PM_TYPE_U64:
atom = context.pmExtractValue(valfmt,
value,
c_api.PM_TYPE_64,
c_api.PM_TYPE_64)
sample_value = (atom.ull << 16) >> 16
try:
atom = context.pmExtractValue(valfmt,
value,
c_api.PM_TYPE_U64,
c_api.PM_TYPE_U64)
sample_value = (atom.ull << 16) >> 16
except Exception as exc:
logger.exception("Unable to extract PCP TYPE_U64 value %s", exc)
else:
try:
atom = context.pmExtractValue(valfmt,
value,
content_type,
c_api.PM_TYPE_DOUBLE)
sample_value = atom.d
except Exception as exc:
print("BORK", exc)

sample_value = atom.d
# print(atom.dref(content_type))
logger.exception("Unable to extract PCP value %s", exc)

# TODO: handle the case where requested units are != pcp given units
# and scale them using pmConvScale
return sample_value

# TODO: copied from internalmetrics
def calculate_sample_rate(self, value: float, old_value: float | None) -> float | bool:
if old_value is not None and self.last_timestamp:
return (value - old_value) / (self.next_timestamp - self.last_timestamp)
Expand All @@ -453,7 +449,6 @@ def send_updates(self, archive, samples: Sequence[Sample]) -> None:
# data: List[List[Union[float, List[Optional[Union[float, bool]]]]]] = []
data: list[list[float | list[float]]] = []
last_samples = self.last_samples or {}
print(samples, self.metrics)

for sample in samples:
assert isinstance(sample['timestamp'], float)
Expand All @@ -467,8 +462,21 @@ def send_updates(self, archive, samples: Sequence[Sample]) -> None:

# COMPLETE WACK
if isinstance(value, list):
# TODO Multi value instances?
pass
if metricinfo.derive == 'rate':
tmp = []
for index, val in enumerate(value):
old_val = None
if old_value is not None:
try:
old_val = old_value[index]
except IndexError:
pass
tmp.append(self.calculate_sample_rate(val, old_val))
sampled_values.append(tmp)
else:
sampled_values.append(value)
# We are multi instances lets go
# TODO Multi value instances!!! Return!
elif isinstance(value, Mapping):
# If the old value wasn't an equivalent a mapping, we need a meta
if not isinstance(old_value, Mapping) or value.keys() != old_value.keys():
Expand All @@ -481,13 +489,14 @@ def send_updates(self, archive, samples: Sequence[Sample]) -> None:
else:
sampled_values.append(tuple(value.values()))
else:
assert isinstance(value, float)
# assert isinstance(value, float)

# If the old value was a mapping, we need a meta
if isinstance(old_value, Mapping):
self.need_meta = True
old_value = None

print(metricinfo, old_value)
if metricinfo.derive == 'rate':
sampled_values.append(self.calculate_sample_rate(value, old_value))
else:
Expand Down
Loading

0 comments on commit e303bd2

Please sign in to comment.