janet/janet/prometheus.py

184 lines
5.6 KiB
Python

import abc
import dataclasses
import datetime
import enum
import logging
from typing import Any, Optional, Sequence
import pint
import prometheus_client
from . import Device, Publisher
_ureg = pint.get_application_registry()
logger = logging.getLogger('janet.prometheus')
Instance = Any
_LABELS = ('identifier', )
_UNITS = {
_ureg.amp: 'amps',
_ureg.volt: 'volts',
_ureg.watt: 'watts',
_ureg.hour: 'hours',
_ureg.degC: 'deg_c',
_ureg.watt_hour: 'wh',
_ureg.second: 'seconds',
_ureg.hertz: 'hz',
_ureg.hour: 'hours',
}
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.warning(f'Unrecognised units {value.units}')
elif isinstance(value, datetime.datetime):
return 'timestamp'
return None
def _to_float(value) -> float:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
elif isinstance(value, datetime.datetime):
return value.timestamp()
return round(value, 6)
class Metric(abc.ABC):
pass
class Gauge(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
if value is not None:
self.metric.labels(identifier).set(_to_float(value))
class Bytes(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Gauge(f'{name}_bytes',
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
self.metric.labels(identifier).inc(len(value))
class MultiGauge(Metric):
def __init__(self, name, field, value):
if value:
units = _to_units(value[0])
if units:
name = f'{name}_{units}'
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=(
'identifier',
'index',
))
def publish(self, identifier, values) -> None:
for i, value in enumerate(values):
self.metric.labels(identifier, str(i)).set(_to_float(value))
class Quantity(Metric):
def __init__(self, name, field, value: pint.Quantity):
units = _to_units(value)
if units:
name = f'{name}_{units}'
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
if value is not None:
self.metric.labels(identifier).set(_to_float(value))
class Info(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Info(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
if value:
self.metric.labels(identifier).info({'value': value})
class Enum(Metric):
def __init__(self, name, field):
states = [x.name.lower() for x in field.type]
self.metric = prometheus_client.Enum(name,
'No description',
labelnames=_LABELS,
states=states)
def publish(self, identifier, value: enum.Enum) -> None:
self.metric.labels(identifier).state(value.name.lower())
def _issubclass(typ, cls) -> bool:
try:
return issubclass(typ, cls)
except TypeError:
return False
def metric_factory(name: str, field: dataclasses.Field,
value: Any) -> Optional[Metric]:
if isinstance(value, pint.Quantity):
return Quantity(name, field, value)
elif field.type in (int, float, bool, Optional[int], Optional[float],
Optional[bool]):
return Gauge(name, field)
elif field.type in (Sequence[float], Sequence[int]):
return MultiGauge(name, field, value)
elif field.type in (bytes, ):
return Bytes(name, field)
elif field.type in (str, Optional[str]):
return Info(name, field)
elif field.type in (Device, Optional[Device]):
return None
elif _issubclass(field.type, enum.IntEnum):
return Enum(name, field)
else:
logger.warning(f'Dropping {name} due to unsupported type {field.type}')
return None
class Client(Publisher):
def __init__(self):
self._metrics = {}
def publish(self, device: Device, entity: Any, unused_setter=None) -> None:
cls = entity.__class__
kind = device.kind or cls.__module__
name = f'{kind}_{cls.__name__}'.lower().replace('.', '_')
identifier = device.identifiers[0]
for field in dataclasses.fields(entity):
fqn = (name, field.name)
value = getattr(entity, field.name)
if value is None:
continue
if fqn not in self._metrics:
self._metrics[fqn] = metric_factory(f'{name}_{field.name}',
field, value)
metric = self._metrics.get(fqn)
if metric:
metric.publish(identifier, value)