janet/janet/prometheus.py

184 lines
5.6 KiB
Python
Raw Permalink Normal View History

2021-02-13 14:54:45 +01:00
import abc
import dataclasses
import datetime
2021-02-13 14:54:45 +01:00
import enum
import logging
from typing import Any, Optional, Sequence
2021-02-13 14:54:45 +01:00
import pint
import prometheus_client
2021-02-26 16:46:54 +01:00
from . import Device, Publisher
2021-02-14 17:31:28 +01:00
_ureg = pint.get_application_registry()
2021-02-21 13:39:09 +01:00
logger = logging.getLogger('janet.prometheus')
2021-02-13 14:54:45 +01:00
Instance = Any
_LABELS = ('identifier', )
2021-02-13 14:54:45 +01:00
_UNITS = {
_ureg.amp: 'amps',
_ureg.volt: 'volts',
_ureg.watt: 'watts',
_ureg.hour: 'hours',
_ureg.degC: 'deg_c',
_ureg.watt_hour: 'wh',
2021-03-21 11:09:47 +01:00
_ureg.second: 'seconds',
_ureg.hertz: 'hz',
2021-03-21 13:42:25 +01:00
_ureg.hour: 'hours',
}
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.warning(f'Unrecognised units {value.units}')
2021-02-21 13:39:09 +01:00
elif isinstance(value, datetime.datetime):
return 'timestamp'
return None
def _to_float(value) -> float:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
2021-02-21 13:39:09 +01:00
elif isinstance(value, datetime.datetime):
return value.timestamp()
return round(value, 6)
2021-02-13 14:54:45 +01:00
class Metric(abc.ABC):
pass
class Gauge(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
2021-02-14 17:31:28 +01:00
if value is not None:
self.metric.labels(identifier).set(_to_float(value))
2021-02-14 17:31:28 +01:00
class Bytes(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Gauge(f'{name}_bytes',
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
self.metric.labels(identifier).inc(len(value))
2021-02-13 14:54:45 +01:00
class MultiGauge(Metric):
def __init__(self, name, field, value):
if value:
units = _to_units(value[0])
if units:
name = f'{name}_{units}'
2021-02-13 14:54:45 +01:00
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=(
'identifier',
2021-02-13 14:54:45 +01:00
'index',
))
def publish(self, identifier, values) -> None:
2021-02-13 14:54:45 +01:00
for i, value in enumerate(values):
self.metric.labels(identifier, str(i)).set(_to_float(value))
2021-02-13 14:54:45 +01:00
class Quantity(Metric):
def __init__(self, name, field, value: pint.Quantity):
units = _to_units(value)
if units:
name = f'{name}_{units}'
2021-02-13 14:54:45 +01:00
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
2021-02-14 17:31:28 +01:00
if value is not None:
self.metric.labels(identifier).set(_to_float(value))
2021-02-13 14:54:45 +01:00
class Info(Metric):
def __init__(self, name, field):
self.metric = prometheus_client.Info(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
2021-02-14 17:31:28 +01:00
if value:
self.metric.labels(identifier).info({'value': value})
2021-02-13 14:54:45 +01:00
class Enum(Metric):
def __init__(self, name, field):
states = [x.name.lower() for x in field.type]
self.metric = prometheus_client.Enum(name,
'No description',
labelnames=_LABELS,
2021-02-13 14:54:45 +01:00
states=states)
def publish(self, identifier, value: enum.Enum) -> None:
self.metric.labels(identifier).state(value.name.lower())
2021-02-13 14:54:45 +01:00
2021-02-14 17:31:28 +01:00
def _issubclass(typ, cls) -> bool:
try:
return issubclass(typ, cls)
except TypeError:
return False
def metric_factory(name: str, field: dataclasses.Field,
value: Any) -> Optional[Metric]:
if isinstance(value, pint.Quantity):
return Quantity(name, field, value)
2021-02-14 17:31:28 +01:00
elif field.type in (int, float, bool, Optional[int], Optional[float],
Optional[bool]):
2021-02-13 14:54:45 +01:00
return Gauge(name, field)
2021-02-14 17:31:28 +01:00
elif field.type in (Sequence[float], Sequence[int]):
return MultiGauge(name, field, value)
2021-02-14 17:31:28 +01:00
elif field.type in (bytes, ):
return Bytes(name, field)
elif field.type in (str, Optional[str]):
2021-02-13 14:54:45 +01:00
return Info(name, field)
2021-02-14 17:31:28 +01:00
elif field.type in (Device, Optional[Device]):
return None
2021-02-14 17:31:28 +01:00
elif _issubclass(field.type, enum.IntEnum):
2021-02-13 14:54:45 +01:00
return Enum(name, field)
else:
logger.warning(f'Dropping {name} due to unsupported type {field.type}')
2021-02-13 14:54:45 +01:00
return None
2021-02-26 16:46:54 +01:00
class Client(Publisher):
2021-02-13 14:54:45 +01:00
def __init__(self):
self._metrics = {}
2021-02-26 16:46:54 +01:00
def publish(self, device: Device, entity: Any, unused_setter=None) -> None:
2021-02-13 14:54:45 +01:00
cls = entity.__class__
2021-02-14 17:31:28 +01:00
kind = device.kind or cls.__module__
2021-02-13 14:54:45 +01:00
name = f'{kind}_{cls.__name__}'.lower().replace('.', '_')
2021-02-14 17:31:28 +01:00
identifier = device.identifiers[0]
2021-02-13 14:54:45 +01:00
for field in dataclasses.fields(entity):
fqn = (name, field.name)
value = getattr(entity, field.name)
if value is None:
continue
2021-02-13 14:54:45 +01:00
if fqn not in self._metrics:
self._metrics[fqn] = metric_factory(f'{name}_{field.name}',
field, value)
2021-02-13 14:54:45 +01:00
metric = self._metrics.get(fqn)
if metric:
metric.publish(identifier, value)