Compare commits

...

36 commits
v0 ... master

Author SHA1 Message Date
Michael Hope cee600be66 blebridge: made more robust, and added systemd stop support
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-26 12:59:52 +00:00
Michael Hope 97f7adb535 janet: switch the device class message to debug
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-21 14:35:56 +01:00
Michael Hope c27b33683d janet: add hours as a unit 2021-03-21 13:42:25 +01:00
Michael Hope 345f329078 janet: add support for seconds and Hz 2021-03-21 11:10:03 +01:00
Michael Hope e4e61a54e2 blebridge: move to async and add AT-09 support 2021-03-21 11:09:20 +01:00
Michael Hope 091cae0d0b janet: switch to drone.io
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-07 19:39:42 +01:00
Michael Hope 047be59a62 janet: switch MQTT to use one state topic with a value template. 2021-02-28 14:49:09 +01:00
Michael Hope f8c21315a3 janet: add units of measurement 2021-02-27 10:31:51 +01:00
Michael Hope cacc14c83a janet: mqtt: revert back to the previous config path 2021-02-26 19:19:27 +01:00
Michael Hope b4ef3f5c4b janet: put the type in the discovery path 2021-02-26 19:13:06 +01:00
Michael Hope 0c2cea0413 mqtt: use the identifier as the UID if valid 2021-02-26 19:08:37 +01:00
Michael Hope c64387339c janet: add the remaining optional device fields. 2021-02-26 19:01:18 +01:00
Michael Hope df102b13fc janet: fixed the dependency format 2021-02-26 17:06:24 +01:00
Michael Hope b413cb8e22 janet: add a drone config for flake8 and mypy 2021-02-26 17:02:28 +01:00
Michael Hope 7c0ba8bec2 janet: moved to requirements.txt 2021-02-26 17:00:10 +01:00
Michael Hope f0153bcb8e janet: begin splitting heyo out 2021-02-26 16:46:54 +01:00
Michael Hope ceb019ce7d janet: tidy up, address mypy, remove obsolete heyo files. 2021-02-25 16:31:18 +01:00
Michael Hope aed02d07f2 janet: mqtt: fix the handling of bool 2021-02-25 12:39:25 +01:00
Michael Hope 6026dd3d59 janet: log the exception info 2021-02-25 12:11:38 +01:00
Michael Hope 8067a21b8f janet: suppress listener exceptions 2021-02-25 12:10:07 +01:00
Michael Hope 6a0afe1cbe janet: make the prometheus port configurable 2021-02-25 11:46:47 +01:00
Michael Hope c00c250134 janet: add more units to the prometheus client 2021-02-25 11:46:23 +01:00
Michael Hope ddf3294434 janet: tidy up the MQTT client 2021-02-25 11:46:02 +01:00
Michael Hope 07d93c0255 janet: added a simple serial bridge 2021-02-25 11:45:39 +01:00
Michael Hope efd5957d7e janet: add a cache directory for building 2021-02-21 16:19:00 +01:00
Michael Hope 3067ac69fc janet: reduce the logging in bravia 2021-02-21 13:40:15 +01:00
Michael Hope 5a87cd915b janet: make janet.app a click CLI 2021-02-21 13:39:50 +01:00
Michael Hope fb11c628b4 janet: add support for timestamps 2021-02-21 13:39:09 +01:00
Michael Hope 96406f6d56 janet: add pint everywhere and use for setting units 2021-02-21 13:21:19 +01:00
Michael Hope 65bfa617fa bravia: add a request timeout 2021-02-21 13:20:20 +01:00
Michael Hope c5637df6e2 janet: added the main entry point 2021-02-21 13:19:34 +01:00
Michael Hope 312aa4c98a janet: updated .gitignore 2021-02-21 13:19:10 +01:00
Michael Hope 77ca6dd4f6 janet: add a Makefile to check the build 2021-02-20 17:15:27 +01:00
Michael Hope 11d1a6f1ed janet: fix up setup.py 2021-02-16 21:58:42 +01:00
Michael Hope 5b812e6253 janet: add a setup file 2021-02-16 21:34:38 +01:00
Michael Hope fd62669ad2 janet: add a setup file 2021-02-16 21:33:22 +01:00
17 changed files with 631 additions and 378 deletions

21
.drone.yml Normal file
View file

@ -0,0 +1,21 @@
# Touch
kind: pipeline
name: default
platform:
os: linux
arch: arm64
steps:
- name: fetch
image: python
commands:
- git fetch --tags
- apt-get update
- apt-get install -y python3-pytest
- name: build
image: python
commands:
- pip3 install -r requirements.txt
- python3 setup.py bdist

7
.gitignore vendored
View file

@ -13,3 +13,10 @@ third_party
doc
.mpy_cache
.mypy_cache
**/*.egg-info/
experiments/
old/
*~
.eggs/
dist/
.cache/

24
Makefile Normal file
View file

@ -0,0 +1,24 @@
ESPHOMES = $(wildcard etc/*.yaml)
ESPHOMES_BIN = $(ESPHOMES:%.yaml=%.elf)
PLATFORMIO_BUILD_CACHE_DIR = $(PWD)/.cache
export PLATFORMIO_BUILD_CACHE_DIR
build:
pip3 install -r requirements.txt
flake8 janet
-mypy --ignore-missing-imports janet
esphomes: $(ESPHOMES_BIN)
etc/%.elf: etc/%.yaml
pip3 install esphome
cd $(<D) && esphome $(<F) compile
cp $(<D)/$*/.pioenvs/$*/firmware.elf $@
DESTDIR ?= dist
install:
-cp etc/*.elf $(DESTDIR)
python3 setup.py bdist -d $(DESTDIR)
.PHONY: build

View file

@ -6,8 +6,8 @@ After=network.target
Type=simple
User=root
Group=root
ExecStartPre=etcdctl set {{domain_path}}/metrics/%H-blebridge "{\"host\":\"%H.{{domain}}\",\"port\":7009}"
ExecStart=/usr/local/blebridge/bin/python -m janet.heyo.blebridge --mac=02:11:23:34:8b:14 --port=7010 --prometheus_port=7009 --adapter=hci1
ExecStartPre=etcdctl set /skydns/local/juju/metrics/%H-blebridge "{\"host\":\"%H.{{domain}}\",\"port\":7009}"
ExecStart=/usr/local/janet/bin/blebridge --mac=02:11:23:34:8b:14 --port=7010 --prometheus_port=7009 --adapter=hci1
Restart=always
RestartSec=10

View file

@ -1,6 +1,7 @@
from dataclasses import dataclass
from typing import Callable, Optional, Sequence, Any
import abc
import logging
from dataclasses import dataclass
from typing import Any, Callable, Optional, Sequence
logger = logging.getLogger('janet')
@ -23,15 +24,28 @@ class Channel:
def put(self, source: Device, entity: Any) -> None:
remove = set()
for l in self._listeners:
for listener in self._listeners:
try:
if l(source, entity):
remove.add(l)
if listener(source, entity):
remove.add(listener)
except IOError as ex:
logger.warning(f'Listener raised {ex}, removing', exc_info=ex)
remove.add(listener)
except Exception as ex:
logger.info(f'Listener raised {ex}, removing')
remove.add(l)
raise
logger.error(f'Listener raised fatal exception {ex}, removing',
exc_info=ex)
remove.add(listener)
self._listeners ^= remove
def listen(self, listener: Callable[[Device, Any], None]) -> None:
self._listeners.add(listener)
class Publisher(abc.ABC):
@abc.abstractmethod
def publish(self,
device: Device,
entity: Any,
setter: Optional[Callable] = None):
pass

View file

@ -1,51 +1,23 @@
import time
import threading
import time
from typing import Optional
from . import Device
from . import prometheus
from . import mqtt
from . import ssdp
from . import bravia
import janet.heyo.protocol
import janet.heyo.schema
import click
import prometheus_client
import serial
from . import bravia, mqtt, prometheus, ssdp
def _heyo(clients):
host = 'socket://192.168.1.4:9876'
port = serial.serial_for_url(host)
device = Device(identifiers=[f'heyo_{host}'],
kind='heyo',
name='heyo',
available=True)
def poker():
while True:
port.write(janet.heyo.protocol.framer(3, b'\x00\x00\x00\x3e'))
time.sleep(10)
threading.Thread(target=poker, daemon=True).start()
for frame in janet.heyo.protocol.deframer(port):
for client in clients:
client(device, frame)
msg = janet.heyo.schema.decode(frame.payload)
if msg:
for client in clients:
client(device, msg)
def main():
prometheus_client.start_http_server(9874)
@click.command()
@click.option('--mqtt-host', type=str, help='MQTT host name', required=True)
@click.option('--prometheus-port', type=int, help='Prometheus metrics port')
def main(mqtt_host: str, prometheus_port: Optional[int]):
if prometheus_port:
prometheus_client.start_http_server(prometheus_port)
prom = prometheus.Client()
mq = mqtt.Client('192.168.1.4')
mq = mqtt.Client(mqtt_host)
clients = (prom.publish, mq.publish)
threading.Thread(target=_heyo, args=(clients, )).start()
s = ssdp.Discoverer()
for client in clients:
s.listen(client)

206
janet/blebridge.py Normal file
View file

@ -0,0 +1,206 @@
"""Exports a BLE based UART as a socket."""
import asyncio
import logging
from typing import Set, Sequence, Optional
import bleak
import prometheus_client
import click
import txdbus.error
import systemd_stopper
log = logging.getLogger('janet.bleasync')
# Lists the UUIDs used for the receive and TX characteristics.
_RX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
'0000fff1-0000-1000-8000-00805f9b34fb'))
_TX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
'0000fff2-0000-1000-8000-00805f9b34fb'))
# Maximum number of bytes per write.
_TX_CAP = 20
# bleak.connect seems to lock up on concurrent connect() calls.
_LOCK = asyncio.Lock()
_CONNECTED = prometheus_client.Gauge(
'blebridge_connected',
'Bridge is connected to the Bluetooth device',
labelnames=('mac', ))
_CLIENTS = prometheus_client.Gauge('blebridge_client_count',
'Number of connected socket clients',
labelnames=('mac', ))
_SENT = prometheus_client.Counter('blebridge_sent_bytes',
'Number of bytes sent over Bluetooth',
labelnames=('mac', ))
_RECEIVED = prometheus_client.Counter(
'blebridge_received_bytes',
'Number of bytes received over Bluetooth',
labelnames=('mac', ))
class Bridge:
def __init__(self, address: str, port: int):
self._address = address
self._port = port
self._device = bleak.BleakClient(address)
self._writers: Set[asyncio.StreamWriter] = set()
self._tx: asyncio.Queue[bytearray] = asyncio.Queue()
self._stop = asyncio.Event()
self._stopped = asyncio.Event()
self._lock = asyncio.Lock()
def _notify(self, unused_sender, data):
"""Handles data that arrives over Bluetooth."""
_RECEIVED.labels(self._address).inc(len(data))
for writer in list(self._writers):
try:
writer.write(data)
except Exception as ex:
log.warning(
'Exception while writing data to the client, removing',
exc_info=ex)
self._writers.remove(writer)
async def _client(self, reader, writer):
"""Handles a single client socket connection."""
with _CLIENTS.labels(self._address).track_inprogress():
self._writers.add(writer)
try:
while True:
got = await reader.read(100)
if not got:
log.info('client closed')
return
await self._tx.put(bytearray(got))
except BrokenPipeError as ex:
log.info('broken pipe error, closing')
finally:
self._writers.remove(writer)
writer.close()
async def run(self):
"""Runs the main loop."""
server = await asyncio.start_server(self._client, None, self._port)
s = asyncio.create_task(server.serve_forever())
try:
while not self._stop.is_set():
try:
await self.ble()
except bleak.exc.BleakError as ex:
log.info('Caught BLE exception, retrying', exc_info=ex)
except txdbus.error.DBusException as ex:
log.info('Caught DBus exception, retrying', exc_info=ex)
if not self._stop.is_set():
await asyncio.sleep(5)
except Exception as ex:
log.warning('Unhandled exception, closing', exc_info=ex)
raise
finally:
server.close()
self._stopped.set()
def _disconnected(self, unused_client) -> None:
log.info('Bridge: notified of disconnect')
self._tx.put_nowait(bytearray())
async def ble(self):
"""Connects and handles the Bluetooth connection."""
if self._stop.is_set():
return
async with _LOCK:
log.info(f'connecting to {self._address}')
await self._device.connect()
with _CONNECTED.labels(self._address).track_inprogress():
log.info(f'Bridge: connected to {self._address}')
try:
self._device.set_disconnected_callback(self._disconnected)
tx_uuid = None
rx_uuid = None
services = await self._device.get_services()
for service in services:
for ch in service.characteristics:
if ch.uuid in _TX_UUIDS:
tx_uuid = ch.uuid
if ch.uuid in _RX_UUIDS:
rx_uuid = ch.uuid
if not tx_uuid or not rx_uuid:
log.warning('tx_uuid or rx_uuid not found, giving up')
return
await self._device.start_notify(rx_uuid, self._notify)
while True:
if self._stop.is_set():
return
if not await self._device.is_connected():
return
tx = await self._tx.get()
if not tx:
continue
for i in range(0, len(tx), _TX_CAP):
await self._device.write_gatt_char(
tx_uuid, tx[i:i + _TX_CAP])
_SENT.labels(self._address).inc(len(tx))
finally:
if await self._device.is_connected():
await self._device.disconnect()
log.info('Bridge: disconnected')
async def stop(self):
self._stop.set()
await self._tx.put(None)
await self._stopped.wait()
log.info('Bridge: stopped')
@click.command()
@click.option('--mac',
type=str,
required=True,
multiple=True,
help='MAC address to connect to.')
@click.option('--port',
type=int,
default=9876,
help='Port number to listen on.')
@click.option('--prometheus-port',
type=int,
default=7011,
help='Port number to listen for monitoring on.')
def main(mac: Sequence[str], port: int, prometheus_port: Optional[int]):
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
if prometheus_port:
prometheus_client.start_http_server(prometheus_port)
bridges = [Bridge(mac, port + i) for i, mac in enumerate(mac)]
loop = asyncio.get_event_loop()
systemd_stopper.install(callback=loop.stop)
try:
loop.run_until_complete(
asyncio.wait([bridge.run() for bridge in bridges]))
except (KeyboardInterrupt, RuntimeError):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([bridge.stop()
for bridge in bridges]))
if __name__ == '__main__':
main()

View file

@ -4,21 +4,19 @@ Tested with the 2012 KDL-40HX850.
"""
import dataclasses
import hashlib
import socket
import typing
import xml.etree.ElementTree
import logging
import socket
import threading
import typing
import urllib.parse
import xml.etree.ElementTree
from typing import Any, Dict, Iterable, Optional, Sequence
from xml.etree.ElementTree import Element
import threading
import urllib.parse
from . import Channel
from . import Device
from . import ssdp
import requests
from . import Channel, Device, ssdp
logger = logging.getLogger('janet.bravia')
_IRCC = """
@ -118,10 +116,9 @@ def _extract_xml(text: str, schema: Any) -> Any:
for field in dataclasses.fields(schema):
path = _camel_case(field.name)
nodes = root.findall(path)
logger.info(field)
if field.type == str:
values[field.name] = nodes[0].text
elif not field.default_factory is dataclasses.MISSING:
elif field.default_factory is not dataclasses.MISSING:
values[field.name] = field.default_factory(nodes)
else:
logger.warning('Skipping unhandled field type %s', field)
@ -143,9 +140,11 @@ class Client:
'X-CERS-DEVICE-ID': self.device_id,
'X-CERS-DEVICE-INFO': self.device_info,
}
r = requests.get(self.path + path, params=params, headers=headers)
r = requests.get(self.path + path,
params=params,
headers=headers,
timeout=3)
r.raise_for_status()
logger.info('resp %s', r.text)
return r.text
def get_system_information(self) -> SystemInformation:
@ -155,7 +154,8 @@ class Client:
def register(self) -> None:
"""Register with the TV.
This is a no-op if the client has already registered. Otherwise the TV will show a prompt asking to approve the new connection.
This is a no-op if the client has already registered. Otherwise the TV
will show a prompt asking to approve the new connection.
"""
self._get('register',
params={
@ -191,7 +191,6 @@ class Bridge:
self._location = location
url = urllib.parse.urlparse(location)
host = url.netloc.split(':')[0]
logging.info('url %s', url)
self._bravia = Client(f'{url.scheme}://{host}')
self._device = None
@ -252,4 +251,4 @@ class Discoverer:
del self._devices[entity.usn]
def listen(self, listener) -> None:
self._listeners.listen(listener)
self._listeners.listen(listener)

View file

View file

@ -1,97 +0,0 @@
import threading
import time
from dataclasses import dataclass
from typing import Iterator, Optional
import logging
import struct
from janet import Device
import serial
import libscrc
import prometheus_client
from . import schema
logger = logging.getLogger('heyo.blebridge')
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
_SOF = 0xD2
@dataclass
class Frame:
kind: int
payload: bytes
def framer(kind: int, payload: bytes) -> bytes:
"""Encodes the request into a frame.
Note that the protocol is not symmetrical, so a frame generated by `framer` can't be decoded by `deframer`.
"""
header = bytearray((_SOF, kind))
crc = libscrc.modbus(header + payload)
return header + payload + struct.pack('<H', crc)
def deframer(port: serial.Serial) -> Iterator[Frame]:
def reader(port: serial.Serial) -> Iterator[int]:
while True:
for ch in port.read():
yield ch
src = reader(port)
while True:
while next(src) != _SOF:
pass
kind = next(src)
length = next(src)
payload = bytearray()
while len(payload) != length:
payload.append(next(src))
crch, crcl = next(src), next(src)
if libscrc.modbus(
bytes((_SOF, kind, length)) + payload +
bytes((crch, crcl))) != 0:
logger.error('CRC mismatch, dropping frame')
continue
yield Frame(kind, bytes(payload))
def main():
import janet.prometheus
host = 'socket://192.168.1.4:9876'
port = serial.serial_for_url(host)
device = Device(identifiers=[f'heyo_{host}'], kind='heyo', name='heyo')
prometheus_client.start_http_server(9874)
prom = janet.prometheus.Client()
def poker():
while True:
port.write(framer(3, b'\x00\x00\x00\x3e'))
time.sleep(1)
threading.Thread(target=poker, daemon=True).start()
for frame in deframer(port):
frame.device = device
prom.publish(frame)
msg = schema.decode(frame.payload)
if msg:
msg.device = device
prom.publish(msg)
# print(schema.decode(frame.payload))
if __name__ == '__main__':
main()

View file

@ -1,75 +0,0 @@
from dataclasses import dataclass
from typing import Sequence, Optional
import enum
import struct
from janet import Device
V = float
A = float
W = float
class Length(enum.IntEnum):
STATUS = 124
@dataclass
class Status:
"""High level status of the BMS."""
# Voltage of each cell.
cells: Sequence[V]
# Total voltage of the batter.
sum: V
# Discharge or charge current.
current: A
# Overall charge capacity as a ratio.
capacity: float
# Voltage of the most charged cell.
maximum: V
# Voltage of the least charged cell.
minimum: V
# Average voltage across all cells.
average: V
# Difference between the most and least charged cell.
difference: V
# Absolute power being supplied or charging with.
power: W
charge_enabled: bool
discharge_enabled: bool
others: Sequence[int]
def decode(payload: bytes) -> Optional[Status]:
if len(payload) != Length.STATUS:
return None
mV = 1e-3
fields = [
struct.unpack_from('>H', payload, i)[0]
for i in range(0, len(payload), 2)
]
cells = fields[:32]
try:
cells = cells[:cells.index(0)]
except ValueError:
pass
return Status(
cells=tuple(cell * mV for cell in cells),
sum=fields[40] / 10,
current=(fields[41] - 30000) / 10,
capacity=fields[42] / 1000,
maximum=fields[43] * mV,
minimum=fields[44] * mV,
# balancing? 47.
# fault? 52
charge_enabled=fields[53],
discharge_enabled=fields[54],
average=fields[55] * mV,
difference=fields[56] * mV,
power=fields[57],
others=fields[32:],
)

View file

@ -1,21 +1,95 @@
import dataclasses
import datetime
import enum
import hashlib
import json
import dataclasses
import logging
from typing import Any, Sequence, Optional
import re
import threading
from typing import Any, Dict, Optional, Union
import paho.mqtt.client as mqtt
import pint
from . import Device
from . import Device, Publisher
logger = logging.getLogger('janet.mqtt')
_ureg = pint.get_application_registry()
_DEVICE_CLASSES = {
_ureg.amp: 'current',
_ureg.volt: 'voltage',
_ureg.watt: 'power',
_ureg.degC: 'temperature',
_ureg.watt_hour: 'energy',
}
_UNITS = {
_ureg.amp: 'A',
_ureg.volt: 'V',
_ureg.watt: 'W',
_ureg.degC: '°C',
_ureg.watt_hour: 'Wh',
_ureg.second: 's',
_ureg.hertz: 'Hz',
_ureg.hour: 'h',
}
def _to_state(value: Any) -> Union[float, str, None]:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
elif isinstance(value, datetime.datetime):
return value.isoformat()
elif isinstance(value, enum.IntEnum):
return value.name
elif isinstance(value, bool):
return 'ON' if value else 'OFF'
elif isinstance(value, str):
return value
elif isinstance(value, int):
return value
elif isinstance(value, float):
return round(value, 6)
else:
return None
def _to_device_class(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _DEVICE_CLASSES.items():
if unit == value.units:
return klass
logger.debug(f'No device class for units {value.units}')
return None
elif isinstance(value, datetime.datetime):
return 'timestamp'
else:
return None
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.info(f'Unrecognised units {value.units}')
return None
elif isinstance(value, datetime.datetime):
return 's'
else:
return None
def _uid(entity, device: Device) -> str:
cls = entity.__class__
kind = device.kind if device else None
kind = kind or cls.__module__
uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13]
id0 = device.identifiers[0] if len(device.identifiers) >= 1 else None
if id0 and re.match(r'^\w+$', id0):
uid = id0
else:
uid = hashlib.sha1(str(device.identifiers).encode()).hexdigest()[:13]
if entity == device:
return f'{kind}_{uid}'
@ -23,56 +97,104 @@ def _uid(entity, device: Device) -> str:
return f'{kind}_{uid}/{cls.__name__}'.lower().replace('.', '_')
class Client:
def _device(device: Device) -> dict:
dev = {
'identifiers': device.identifiers,
'name': device.name,
}
if device.manufacturer:
dev['manufacturer'] = device.manufacturer
if device.model:
dev['model'] = device.model
if device.sw_version:
dev['sw_version'] = device.sw_version
return dev
@dataclasses.dataclass
class Setter:
field: dataclasses.Field
callback: Any
class Client(Publisher):
def __init__(self, host: str, port: int = 1883):
self._lock = threading.Lock()
self._setters: Dict[str, Setter] = {}
self._client = mqtt.Client()
self._client.on_message = self._on_message
self._client.connect_async(host, port, 60)
self._client.loop_start()
def publish(self, device: Device, entity: Any):
uid = _uid(entity, device)
def _on_message(self, unused_client, unused_userdata,
message: mqtt.MQTTMessage):
with self._lock:
setter = self._setters.get(message.topic, None)
if setter is not None:
setter.callback(setter.field, message.payload)
self._client.publish(
f'test/{uid}/state',
json.dumps(dataclasses.asdict(entity), default=str))
def publish(self, device: Device, entity: Any, setter=None):
uid = _uid(entity, device)
base = f'janet/{uid}'
state_topic = f'{base}/state'
states = {
name: _to_state(value)
for name, value in dataclasses.asdict(entity).items()
if _to_state(value) is not None
}
self._client.publish(state_topic, json.dumps(states))
available = getattr(device, 'available', None)
if available is not None:
self._client.publish(f'test/{uid}/power',
self._client.publish(f'{base}/power',
'ON' if available else 'OFF',
retain=True)
dev = {
'identifiers': device.identifiers,
'name': device.name,
}
if device.manufacturer:
dev['manufacturer'] = device.manufacturer
for field in dataclasses.fields(entity):
value = getattr(entity, field.name)
if value is None:
continue
state = _to_state(value)
if state is None:
logger.warning(
f'Ignoring field {field.name} type {field.type}')
continue
topic = f'{base}/{field.name}'
if field.type in (bool, Optional[bool]):
topic = f'test/{uid}/{field.name}'
self._client.publish(topic, 'ON' if value else 'OFF')
self._client.publish(
f'homeassistant/binary_sensor/{uid}_{field.name}/config',
json.dumps({
'name': uid,
'state_topic': topic,
'unique_id': f'{uid}_{field.name}',
'device': dev,
}))
elif field.type in (int, float):
self._client.publish(f'test/{uid}/{field.name}',
round(value, 3))
elif field.type in (str, ):
self._client.publish(f'test/{uid}/{field.name}', value)
elif field.type in (Sequence[float], Sequence[int], bytes):
pass
elif field.type in (Optional[Device], Device):
pass
config_topic = (
f'homeassistant/binary_sensor/{uid}_{field.name}/config')
else:
logger.info(f'Ignoring {field.type}')
config_topic = (
f'homeassistant/sensor/{uid}_{field.name}/config')
dev = _device(device)
kind = entity.__class__.__name__
name = field.name.replace('_', ' ')
config = {
'name': f'{kind} {name}',
'state_topic': state_topic,
'value_template': f'{{{{value_json.{field.name}}}}}',
'unique_id': f'{uid}_{field.name}',
'device': dev,
'expire_after': 600,
}
device_class = _to_device_class(value)
if device_class:
config['device_class'] = device_class
units = _to_units(value)
if units:
config['unit_of_measurement'] = units
if setter:
command = f'{topic}/set'
with self._lock:
if command not in self._setters:
self._client.subscribe(command)
self._setters[command] = Setter(field, setter)
self._client.publish(config_topic, json.dumps(config))

View file

@ -1,20 +1,53 @@
import abc
import dataclasses
import datetime
import enum
import logging
from typing import Dict, Any, Sequence, Optional
from typing import Any, Optional, Sequence
import prometheus_client
import pint
import prometheus_client
from . import Device
from . import Device, Publisher
logger = logging.getLogger('janet.mqtt')
_ureg = pint.get_application_registry()
logger = logging.getLogger('janet.prometheus')
Instance = Any
_LABELS = ('identifier', )
_UNITS = {
_ureg.amp: 'amps',
_ureg.volt: 'volts',
_ureg.watt: 'watts',
_ureg.hour: 'hours',
_ureg.degC: 'deg_c',
_ureg.watt_hour: 'wh',
_ureg.second: 'seconds',
_ureg.hertz: 'hz',
_ureg.hour: 'hours',
}
def _to_units(value) -> Optional[str]:
if isinstance(value, pint.Quantity):
for unit, klass in _UNITS.items():
if unit == value.units:
return klass
logger.warning(f'Unrecognised units {value.units}')
elif isinstance(value, datetime.datetime):
return 'timestamp'
return None
def _to_float(value) -> float:
if isinstance(value, pint.Quantity):
return round(value.m, 6)
elif isinstance(value, datetime.datetime):
return value.timestamp()
return round(value, 6)
class Metric(abc.ABC):
pass
@ -28,7 +61,7 @@ class Gauge(Metric):
def publish(self, identifier, value) -> None:
if value is not None:
self.metric.labels(identifier).set(round(value, 6))
self.metric.labels(identifier).set(_to_float(value))
class Bytes(Metric):
@ -42,7 +75,11 @@ class Bytes(Metric):
class MultiGauge(Metric):
def __init__(self, name, field):
def __init__(self, name, field, value):
if value:
units = _to_units(value[0])
if units:
name = f'{name}_{units}'
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=(
@ -52,18 +89,21 @@ class MultiGauge(Metric):
def publish(self, identifier, values) -> None:
for i, value in enumerate(values):
self.metric.labels(identifier, str(i)).set(round(value, 6))
self.metric.labels(identifier, str(i)).set(_to_float(value))
class Quantity(Metric):
def __init__(self, name, field):
def __init__(self, name, field, value: pint.Quantity):
units = _to_units(value)
if units:
name = f'{name}_{units}'
self.metric = prometheus_client.Gauge(name,
'No description',
labelnames=_LABELS)
def publish(self, identifier, value) -> None:
if value is not None:
self.metric.labels(identifier).set(value.m)
self.metric.labels(identifier).set(_to_float(value))
class Info(Metric):
@ -82,10 +122,11 @@ class Enum(Metric):
states = [x.name.lower() for x in field.type]
self.metric = prometheus_client.Enum(name,
'No description',
labelnames=_LABELS,
states=states)
def publish(self, value: enum.Enum) -> None:
self.metric.state(value.name.lower())
def publish(self, identifier, value: enum.Enum) -> None:
self.metric.labels(identifier).state(value.name.lower())
def _issubclass(typ, cls) -> bool:
@ -95,14 +136,15 @@ def _issubclass(typ, cls) -> bool:
return False
def metric_factory(name: str, field: dataclasses.Field) -> Optional[Metric]:
if isinstance(field.type, pint.Unit):
return Quantity(name, field)
def metric_factory(name: str, field: dataclasses.Field,
value: Any) -> Optional[Metric]:
if isinstance(value, pint.Quantity):
return Quantity(name, field, value)
elif field.type in (int, float, bool, Optional[int], Optional[float],
Optional[bool]):
return Gauge(name, field)
elif field.type in (Sequence[float], Sequence[int]):
return MultiGauge(name, field)
return MultiGauge(name, field, value)
elif field.type in (bytes, ):
return Bytes(name, field)
elif field.type in (str, Optional[str]):
@ -116,11 +158,11 @@ def metric_factory(name: str, field: dataclasses.Field) -> Optional[Metric]:
return None
class Client:
class Client(Publisher):
def __init__(self):
self._metrics = {}
def publish(self, device: Device, entity: Any):
def publish(self, device: Device, entity: Any, unused_setter=None) -> None:
cls = entity.__class__
kind = device.kind or cls.__module__
name = f'{kind}_{cls.__name__}'.lower().replace('.', '_')
@ -128,10 +170,14 @@ class Client:
for field in dataclasses.fields(entity):
fqn = (name, field.name)
value = getattr(entity, field.name)
if value is None:
continue
if fqn not in self._metrics:
self._metrics[fqn] = metric_factory(f'{name}_{field.name}',
field)
field, value)
metric = self._metrics.get(fqn)
if metric:
metric.publish(identifier, getattr(entity, field.name))
metric.publish(identifier, value)

View file

@ -1,19 +1,17 @@
from dataclasses import dataclass
import threading
import queue
import time
import asyncio
import socketserver
import logging
from typing import Iterator, Optional, Tuple, Set
import queue
import socketserver
import threading
from dataclasses import dataclass
from typing import Set, Tuple
import bleak
import click
import prometheus_client
import serial
from janet import Device, Channel
from janet import Channel, Device
logger = logging.getLogger('janet.heyo.blebridge')
logger = logging.getLogger('janet.serialbridge')
@dataclass
@ -58,18 +56,11 @@ class Client(socketserver.BaseRequestHandler):
class Bridge(socketserver.ThreadingTCPServer):
"""Bridges between a BLE based serial adapter and client sockets."""
def __init__(self,
server_address: Tuple[str, int],
mac: str,
adapter: Optional[str] = None,
write_uuid: str = '0000fff2-0000-1000-8000-00805f9b34fb',
read_uuid: str = '0000fff1-0000-1000-8000-00805f9b34fb'):
def __init__(self, server_address: Tuple[str, int], port: str):
super().__init__(server_address, Client)
self._mac = mac
self._adapter = adapter
self._write_uuid = write_uuid
self._read_uuid = read_uuid
self._name = port
self._port = serial.Serial(port, 19200, timeout=1)
self._lock = threading.RLock()
self._clients: Set[Client] = set()
@ -93,10 +84,10 @@ class Bridge(socketserver.ThreadingTCPServer):
self._clients.remove(client)
def _emit(self):
device = Device(name=f'blebridge {self._mac}',
identifiers=[f'blebridge_{self._mac}'],
available=self._connected,
kind='blebridge')
device = Device(name=f'serialbridge {self._name}',
identifiers=[f'serialbridge_{self._name}'],
kind='serialbridge',
available=True)
status = State(
sent_bytes=self._sent_bytes,
@ -108,73 +99,57 @@ class Bridge(socketserver.ThreadingTCPServer):
def run(self):
server = threading.Thread(target=self.serve_forever)
server.start()
writer = threading.Thread(target=self._write)
writer.start()
try:
self.ble_run()
self._read()
finally:
self.shutdown()
server.join()
self._tx.put(None)
writer.join()
def ble_run(self):
self._emit()
def _write(self):
while True:
tx = self._tx.get()
if tx is None:
return
def recv(n, data: bytes):
self._port.write(tx)
self._sent_bytes += len(tx)
self._emit()
def _read(self):
while True:
available = min(1, self._port.in_waiting)
rx = self._port.read(available)
if not rx:
continue
with self._lock:
for client in self._clients:
client.put(data)
self._recv_bytes += len(data)
self._emit()
async def run(address: str):
async with bleak.BleakClient(address,
adapter=self._adapter) as client:
await client.start_notify(self._read_uuid, recv)
try:
self._connected = True
self._emit()
while True:
await asyncio.sleep(0.2)
try:
tx = self._tx.get_nowait()
await client.write_gatt_char(
self._write_uuid, bytearray(tx))
self._sent_bytes += len(tx)
self._emit()
except queue.Empty:
pass
finally:
await client.stop_notify(self._read_uuid)
self._connected = False
self._emit()
while True:
try:
asyncio.run(run(self._mac))
except (bleak.BleakError, AttributeError) as ex:
logger.warning(f'Got an error "{ex}", retrying')
time.sleep(5)
client.put(rx)
self._recv_bytes += len(rx)
self._emit()
def put(self, data: bytes):
self._tx.put(data)
@click.command()
@click.option('--mac',
@click.option('--serial-port',
type=str,
required=True,
help='MAC address to connect to.')
@click.option('--adapter',
type=str,
default='hci0',
help='Adapter to cnnect via.')
help='pyserial port to connect to.')
@click.option('--port',
type=int,
default=7010,
default=7012,
help='Port number to listen on.')
@click.option('--prometheus_port',
type=int,
default=7011,
default=7013,
help='Port number to listen for monitoring on.')
def main(mac: str, adapter: str, port: int, prometheus_port: int):
def main(serial_port: str, port: int, prometheus_port: int):
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
@ -185,7 +160,7 @@ def main(mac: str, adapter: str, port: int, prometheus_port: int):
prom = janet.prometheus.Client()
addr = ('', port)
bridge = Bridge(addr, mac, adapter)
bridge = Bridge(addr, serial_port)
bridge.listen(prom.publish)
bridge.run()

View file

@ -1,19 +1,17 @@
import datetime
import threading
import logging
import io
import http
import http.client
import dataclasses
import datetime
import http.client
import io
import logging
import socket
import time
from typing import Dict, List, Tuple
import struct
import threading
import time
from typing import Dict, Tuple
import prometheus_client
from . import Channel, Device
from . import prometheus
from . import Channel, Device, prometheus
logger = logging.getLogger('janet.ssdp')
@ -124,11 +122,11 @@ class Discoverer:
logger.debug(f'notify {headers.items()}')
usn = headers.get('USN', '')
if not usn:
logger.warning(f'Message has no USN, dropping')
logger.warning('Message has no USN, dropping')
return
nts = headers.get('NTS', '')
if not nts:
logger.warning(f'Message has no NTS, dropping')
logger.warning('Message has no NTS, dropping')
return
nt = headers.get('NT', '')
@ -208,4 +206,4 @@ def main():
if __name__ == '__main__':
main()
main()

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
requests>=2.25.0
pyserial>=3.4
prometheus-client>=0.8.0
Pint>=0.16.1
paho-mqtt>=1.5.1
https://github.com/hex-in/libscrc/archive/v1.6.tar.gz#egg=libscrc
click>=7.1.2
bleak>=0.10.0
systemd-stopper>=0.1.0

32
setup.py Normal file
View file

@ -0,0 +1,32 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import find_packages, setup
setup(
name='janet',
version_format='{tag}.dev{commitcount}+{gitsha}',
setup_requires=['setuptools-git-version'],
url='https://juju.nz/src/michaelh/janet',
author='Michael Hope',
author_email='michaelh@juju.nz',
description='Janet home automation',
zip_safe=True,
packages=find_packages(),
entry_points='''
[console_scripts]
blebridge=janet.blebridge:main
janet=janet.app:main
''',
)