import abc import dataclasses import datetime import enum import logging from typing import Any, Optional, Sequence import pint import prometheus_client from . import Device, Publisher _ureg = pint.get_application_registry() logger = logging.getLogger('janet.prometheus') Instance = Any _LABELS = ('identifier', ) _UNITS = { _ureg.amp: 'amps', _ureg.volt: 'volts', _ureg.watt: 'watts', _ureg.hour: 'hours', _ureg.degC: 'deg_c', _ureg.watt_hour: 'wh', _ureg.second: 'seconds', _ureg.hertz: 'hz', _ureg.hour: 'hours', } def _to_units(value) -> Optional[str]: if isinstance(value, pint.Quantity): for unit, klass in _UNITS.items(): if unit == value.units: return klass logger.warning(f'Unrecognised units {value.units}') elif isinstance(value, datetime.datetime): return 'timestamp' return None def _to_float(value) -> float: if isinstance(value, pint.Quantity): return round(value.m, 6) elif isinstance(value, datetime.datetime): return value.timestamp() return round(value, 6) class Metric(abc.ABC): pass class Gauge(Metric): def __init__(self, name, field): self.metric = prometheus_client.Gauge(name, 'No description', labelnames=_LABELS) def publish(self, identifier, value) -> None: if value is not None: self.metric.labels(identifier).set(_to_float(value)) class Bytes(Metric): def __init__(self, name, field): self.metric = prometheus_client.Gauge(f'{name}_bytes', 'No description', labelnames=_LABELS) def publish(self, identifier, value) -> None: self.metric.labels(identifier).inc(len(value)) class MultiGauge(Metric): def __init__(self, name, field, value): if value: units = _to_units(value[0]) if units: name = f'{name}_{units}' self.metric = prometheus_client.Gauge(name, 'No description', labelnames=( 'identifier', 'index', )) def publish(self, identifier, values) -> None: for i, value in enumerate(values): self.metric.labels(identifier, str(i)).set(_to_float(value)) class Quantity(Metric): def __init__(self, name, field, value: pint.Quantity): units = _to_units(value) if units: name = f'{name}_{units}' self.metric = prometheus_client.Gauge(name, 'No description', labelnames=_LABELS) def publish(self, identifier, value) -> None: if value is not None: self.metric.labels(identifier).set(_to_float(value)) class Info(Metric): def __init__(self, name, field): self.metric = prometheus_client.Info(name, 'No description', labelnames=_LABELS) def publish(self, identifier, value) -> None: if value: self.metric.labels(identifier).info({'value': value}) class Enum(Metric): def __init__(self, name, field): states = [x.name.lower() for x in field.type] self.metric = prometheus_client.Enum(name, 'No description', labelnames=_LABELS, states=states) def publish(self, identifier, value: enum.Enum) -> None: self.metric.labels(identifier).state(value.name.lower()) def _issubclass(typ, cls) -> bool: try: return issubclass(typ, cls) except TypeError: return False def metric_factory(name: str, field: dataclasses.Field, value: Any) -> Optional[Metric]: if isinstance(value, pint.Quantity): return Quantity(name, field, value) elif field.type in (int, float, bool, Optional[int], Optional[float], Optional[bool]): return Gauge(name, field) elif field.type in (Sequence[float], Sequence[int]): return MultiGauge(name, field, value) elif field.type in (bytes, ): return Bytes(name, field) elif field.type in (str, Optional[str]): return Info(name, field) elif field.type in (Device, Optional[Device]): return None elif _issubclass(field.type, enum.IntEnum): return Enum(name, field) else: logger.warning(f'Dropping {name} due to unsupported type {field.type}') return None class Client(Publisher): def __init__(self): self._metrics = {} def publish(self, device: Device, entity: Any, unused_setter=None) -> None: cls = entity.__class__ kind = device.kind or cls.__module__ name = f'{kind}_{cls.__name__}'.lower().replace('.', '_') identifier = device.identifiers[0] for field in dataclasses.fields(entity): fqn = (name, field.name) value = getattr(entity, field.name) if value is None: continue if fqn not in self._metrics: self._metrics[fqn] = metric_factory(f'{name}_{field.name}', field, value) metric = self._metrics.get(fqn) if metric: metric.publish(identifier, value)