184 lines
5.6 KiB
Python
184 lines
5.6 KiB
Python
import abc
|
|
import dataclasses
|
|
import datetime
|
|
import enum
|
|
import logging
|
|
from typing import Any, Optional, Sequence
|
|
|
|
import pint
|
|
import prometheus_client
|
|
|
|
from . import Device, Publisher
|
|
|
|
_ureg = pint.get_application_registry()
|
|
logger = logging.getLogger('janet.prometheus')
|
|
|
|
Instance = Any
|
|
|
|
_LABELS = ('identifier', )
|
|
|
|
_UNITS = {
|
|
_ureg.amp: 'amps',
|
|
_ureg.volt: 'volts',
|
|
_ureg.watt: 'watts',
|
|
_ureg.hour: 'hours',
|
|
_ureg.degC: 'deg_c',
|
|
_ureg.watt_hour: 'wh',
|
|
_ureg.second: 'seconds',
|
|
_ureg.hertz: 'hz',
|
|
_ureg.hour: 'hours',
|
|
}
|
|
|
|
|
|
def _to_units(value) -> Optional[str]:
|
|
if isinstance(value, pint.Quantity):
|
|
for unit, klass in _UNITS.items():
|
|
if unit == value.units:
|
|
return klass
|
|
logger.warning(f'Unrecognised units {value.units}')
|
|
elif isinstance(value, datetime.datetime):
|
|
return 'timestamp'
|
|
return None
|
|
|
|
|
|
def _to_float(value) -> float:
|
|
if isinstance(value, pint.Quantity):
|
|
return round(value.m, 6)
|
|
elif isinstance(value, datetime.datetime):
|
|
return value.timestamp()
|
|
return round(value, 6)
|
|
|
|
|
|
class Metric(abc.ABC):
|
|
pass
|
|
|
|
|
|
class Gauge(Metric):
|
|
def __init__(self, name, field):
|
|
self.metric = prometheus_client.Gauge(name,
|
|
'No description',
|
|
labelnames=_LABELS)
|
|
|
|
def publish(self, identifier, value) -> None:
|
|
if value is not None:
|
|
self.metric.labels(identifier).set(_to_float(value))
|
|
|
|
|
|
class Bytes(Metric):
|
|
def __init__(self, name, field):
|
|
self.metric = prometheus_client.Gauge(f'{name}_bytes',
|
|
'No description',
|
|
labelnames=_LABELS)
|
|
|
|
def publish(self, identifier, value) -> None:
|
|
self.metric.labels(identifier).inc(len(value))
|
|
|
|
|
|
class MultiGauge(Metric):
|
|
def __init__(self, name, field, value):
|
|
if value:
|
|
units = _to_units(value[0])
|
|
if units:
|
|
name = f'{name}_{units}'
|
|
self.metric = prometheus_client.Gauge(name,
|
|
'No description',
|
|
labelnames=(
|
|
'identifier',
|
|
'index',
|
|
))
|
|
|
|
def publish(self, identifier, values) -> None:
|
|
for i, value in enumerate(values):
|
|
self.metric.labels(identifier, str(i)).set(_to_float(value))
|
|
|
|
|
|
class Quantity(Metric):
|
|
def __init__(self, name, field, value: pint.Quantity):
|
|
units = _to_units(value)
|
|
if units:
|
|
name = f'{name}_{units}'
|
|
self.metric = prometheus_client.Gauge(name,
|
|
'No description',
|
|
labelnames=_LABELS)
|
|
|
|
def publish(self, identifier, value) -> None:
|
|
if value is not None:
|
|
self.metric.labels(identifier).set(_to_float(value))
|
|
|
|
|
|
class Info(Metric):
|
|
def __init__(self, name, field):
|
|
self.metric = prometheus_client.Info(name,
|
|
'No description',
|
|
labelnames=_LABELS)
|
|
|
|
def publish(self, identifier, value) -> None:
|
|
if value:
|
|
self.metric.labels(identifier).info({'value': value})
|
|
|
|
|
|
class Enum(Metric):
|
|
def __init__(self, name, field):
|
|
states = [x.name.lower() for x in field.type]
|
|
self.metric = prometheus_client.Enum(name,
|
|
'No description',
|
|
labelnames=_LABELS,
|
|
states=states)
|
|
|
|
def publish(self, identifier, value: enum.Enum) -> None:
|
|
self.metric.labels(identifier).state(value.name.lower())
|
|
|
|
|
|
def _issubclass(typ, cls) -> bool:
|
|
try:
|
|
return issubclass(typ, cls)
|
|
except TypeError:
|
|
return False
|
|
|
|
|
|
def metric_factory(name: str, field: dataclasses.Field,
|
|
value: Any) -> Optional[Metric]:
|
|
if isinstance(value, pint.Quantity):
|
|
return Quantity(name, field, value)
|
|
elif field.type in (int, float, bool, Optional[int], Optional[float],
|
|
Optional[bool]):
|
|
return Gauge(name, field)
|
|
elif field.type in (Sequence[float], Sequence[int]):
|
|
return MultiGauge(name, field, value)
|
|
elif field.type in (bytes, ):
|
|
return Bytes(name, field)
|
|
elif field.type in (str, Optional[str]):
|
|
return Info(name, field)
|
|
elif field.type in (Device, Optional[Device]):
|
|
return None
|
|
elif _issubclass(field.type, enum.IntEnum):
|
|
return Enum(name, field)
|
|
else:
|
|
logger.warning(f'Dropping {name} due to unsupported type {field.type}')
|
|
return None
|
|
|
|
|
|
class Client(Publisher):
|
|
def __init__(self):
|
|
self._metrics = {}
|
|
|
|
def publish(self, device: Device, entity: Any, unused_setter=None) -> None:
|
|
cls = entity.__class__
|
|
kind = device.kind or cls.__module__
|
|
name = f'{kind}_{cls.__name__}'.lower().replace('.', '_')
|
|
identifier = device.identifiers[0]
|
|
|
|
for field in dataclasses.fields(entity):
|
|
fqn = (name, field.name)
|
|
value = getattr(entity, field.name)
|
|
if value is None:
|
|
continue
|
|
|
|
if fqn not in self._metrics:
|
|
self._metrics[fqn] = metric_factory(f'{name}_{field.name}',
|
|
field, value)
|
|
|
|
metric = self._metrics.get(fqn)
|
|
if metric:
|
|
metric.publish(identifier, value)
|