janet/janet/blebridge.py
Michael Hope cee600be66
All checks were successful
continuous-integration/drone/push Build is passing
blebridge: made more robust, and added systemd stop support
2021-03-26 12:59:52 +00:00

207 lines
7 KiB
Python

"""Exports a BLE based UART as a socket."""
import asyncio
import logging
from typing import Set, Sequence, Optional
import bleak
import prometheus_client
import click
import txdbus.error
import systemd_stopper
log = logging.getLogger('janet.bleasync')
# Lists the UUIDs used for the receive and TX characteristics.
_RX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
'0000fff1-0000-1000-8000-00805f9b34fb'))
_TX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
'0000fff2-0000-1000-8000-00805f9b34fb'))
# Maximum number of bytes per write.
_TX_CAP = 20
# bleak.connect seems to lock up on concurrent connect() calls.
_LOCK = asyncio.Lock()
_CONNECTED = prometheus_client.Gauge(
'blebridge_connected',
'Bridge is connected to the Bluetooth device',
labelnames=('mac', ))
_CLIENTS = prometheus_client.Gauge('blebridge_client_count',
'Number of connected socket clients',
labelnames=('mac', ))
_SENT = prometheus_client.Counter('blebridge_sent_bytes',
'Number of bytes sent over Bluetooth',
labelnames=('mac', ))
_RECEIVED = prometheus_client.Counter(
'blebridge_received_bytes',
'Number of bytes received over Bluetooth',
labelnames=('mac', ))
class Bridge:
def __init__(self, address: str, port: int):
self._address = address
self._port = port
self._device = bleak.BleakClient(address)
self._writers: Set[asyncio.StreamWriter] = set()
self._tx: asyncio.Queue[bytearray] = asyncio.Queue()
self._stop = asyncio.Event()
self._stopped = asyncio.Event()
self._lock = asyncio.Lock()
def _notify(self, unused_sender, data):
"""Handles data that arrives over Bluetooth."""
_RECEIVED.labels(self._address).inc(len(data))
for writer in list(self._writers):
try:
writer.write(data)
except Exception as ex:
log.warning(
'Exception while writing data to the client, removing',
exc_info=ex)
self._writers.remove(writer)
async def _client(self, reader, writer):
"""Handles a single client socket connection."""
with _CLIENTS.labels(self._address).track_inprogress():
self._writers.add(writer)
try:
while True:
got = await reader.read(100)
if not got:
log.info('client closed')
return
await self._tx.put(bytearray(got))
except BrokenPipeError as ex:
log.info('broken pipe error, closing')
finally:
self._writers.remove(writer)
writer.close()
async def run(self):
"""Runs the main loop."""
server = await asyncio.start_server(self._client, None, self._port)
s = asyncio.create_task(server.serve_forever())
try:
while not self._stop.is_set():
try:
await self.ble()
except bleak.exc.BleakError as ex:
log.info('Caught BLE exception, retrying', exc_info=ex)
except txdbus.error.DBusException as ex:
log.info('Caught DBus exception, retrying', exc_info=ex)
if not self._stop.is_set():
await asyncio.sleep(5)
except Exception as ex:
log.warning('Unhandled exception, closing', exc_info=ex)
raise
finally:
server.close()
self._stopped.set()
def _disconnected(self, unused_client) -> None:
log.info('Bridge: notified of disconnect')
self._tx.put_nowait(bytearray())
async def ble(self):
"""Connects and handles the Bluetooth connection."""
if self._stop.is_set():
return
async with _LOCK:
log.info(f'connecting to {self._address}')
await self._device.connect()
with _CONNECTED.labels(self._address).track_inprogress():
log.info(f'Bridge: connected to {self._address}')
try:
self._device.set_disconnected_callback(self._disconnected)
tx_uuid = None
rx_uuid = None
services = await self._device.get_services()
for service in services:
for ch in service.characteristics:
if ch.uuid in _TX_UUIDS:
tx_uuid = ch.uuid
if ch.uuid in _RX_UUIDS:
rx_uuid = ch.uuid
if not tx_uuid or not rx_uuid:
log.warning('tx_uuid or rx_uuid not found, giving up')
return
await self._device.start_notify(rx_uuid, self._notify)
while True:
if self._stop.is_set():
return
if not await self._device.is_connected():
return
tx = await self._tx.get()
if not tx:
continue
for i in range(0, len(tx), _TX_CAP):
await self._device.write_gatt_char(
tx_uuid, tx[i:i + _TX_CAP])
_SENT.labels(self._address).inc(len(tx))
finally:
if await self._device.is_connected():
await self._device.disconnect()
log.info('Bridge: disconnected')
async def stop(self):
self._stop.set()
await self._tx.put(None)
await self._stopped.wait()
log.info('Bridge: stopped')
@click.command()
@click.option('--mac',
type=str,
required=True,
multiple=True,
help='MAC address to connect to.')
@click.option('--port',
type=int,
default=9876,
help='Port number to listen on.')
@click.option('--prometheus-port',
type=int,
default=7011,
help='Port number to listen for monitoring on.')
def main(mac: Sequence[str], port: int, prometheus_port: Optional[int]):
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
if prometheus_port:
prometheus_client.start_http_server(prometheus_port)
bridges = [Bridge(mac, port + i) for i, mac in enumerate(mac)]
loop = asyncio.get_event_loop()
systemd_stopper.install(callback=loop.stop)
try:
loop.run_until_complete(
asyncio.wait([bridge.run() for bridge in bridges]))
except (KeyboardInterrupt, RuntimeError):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([bridge.stop()
for bridge in bridges]))
if __name__ == '__main__':
main()