Michael Hope
cee600be66
All checks were successful
continuous-integration/drone/push Build is passing
207 lines
7 KiB
Python
207 lines
7 KiB
Python
"""Exports a BLE based UART as a socket."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Set, Sequence, Optional
|
|
|
|
import bleak
|
|
import prometheus_client
|
|
import click
|
|
import txdbus.error
|
|
import systemd_stopper
|
|
|
|
log = logging.getLogger('janet.bleasync')
|
|
|
|
# Lists the UUIDs used for the receive and TX characteristics.
|
|
_RX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
|
|
'0000fff1-0000-1000-8000-00805f9b34fb'))
|
|
_TX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb',
|
|
'0000fff2-0000-1000-8000-00805f9b34fb'))
|
|
|
|
# Maximum number of bytes per write.
|
|
_TX_CAP = 20
|
|
# bleak.connect seems to lock up on concurrent connect() calls.
|
|
_LOCK = asyncio.Lock()
|
|
|
|
_CONNECTED = prometheus_client.Gauge(
|
|
'blebridge_connected',
|
|
'Bridge is connected to the Bluetooth device',
|
|
labelnames=('mac', ))
|
|
_CLIENTS = prometheus_client.Gauge('blebridge_client_count',
|
|
'Number of connected socket clients',
|
|
labelnames=('mac', ))
|
|
_SENT = prometheus_client.Counter('blebridge_sent_bytes',
|
|
'Number of bytes sent over Bluetooth',
|
|
labelnames=('mac', ))
|
|
_RECEIVED = prometheus_client.Counter(
|
|
'blebridge_received_bytes',
|
|
'Number of bytes received over Bluetooth',
|
|
labelnames=('mac', ))
|
|
|
|
|
|
class Bridge:
|
|
def __init__(self, address: str, port: int):
|
|
self._address = address
|
|
self._port = port
|
|
|
|
self._device = bleak.BleakClient(address)
|
|
self._writers: Set[asyncio.StreamWriter] = set()
|
|
self._tx: asyncio.Queue[bytearray] = asyncio.Queue()
|
|
self._stop = asyncio.Event()
|
|
self._stopped = asyncio.Event()
|
|
self._lock = asyncio.Lock()
|
|
|
|
def _notify(self, unused_sender, data):
|
|
"""Handles data that arrives over Bluetooth."""
|
|
_RECEIVED.labels(self._address).inc(len(data))
|
|
|
|
for writer in list(self._writers):
|
|
try:
|
|
writer.write(data)
|
|
except Exception as ex:
|
|
log.warning(
|
|
'Exception while writing data to the client, removing',
|
|
exc_info=ex)
|
|
self._writers.remove(writer)
|
|
|
|
async def _client(self, reader, writer):
|
|
"""Handles a single client socket connection."""
|
|
with _CLIENTS.labels(self._address).track_inprogress():
|
|
self._writers.add(writer)
|
|
try:
|
|
while True:
|
|
got = await reader.read(100)
|
|
if not got:
|
|
log.info('client closed')
|
|
return
|
|
await self._tx.put(bytearray(got))
|
|
except BrokenPipeError as ex:
|
|
log.info('broken pipe error, closing')
|
|
finally:
|
|
self._writers.remove(writer)
|
|
writer.close()
|
|
|
|
async def run(self):
|
|
"""Runs the main loop."""
|
|
server = await asyncio.start_server(self._client, None, self._port)
|
|
s = asyncio.create_task(server.serve_forever())
|
|
|
|
try:
|
|
while not self._stop.is_set():
|
|
try:
|
|
await self.ble()
|
|
except bleak.exc.BleakError as ex:
|
|
log.info('Caught BLE exception, retrying', exc_info=ex)
|
|
except txdbus.error.DBusException as ex:
|
|
log.info('Caught DBus exception, retrying', exc_info=ex)
|
|
|
|
if not self._stop.is_set():
|
|
await asyncio.sleep(5)
|
|
|
|
except Exception as ex:
|
|
log.warning('Unhandled exception, closing', exc_info=ex)
|
|
raise
|
|
finally:
|
|
server.close()
|
|
self._stopped.set()
|
|
|
|
def _disconnected(self, unused_client) -> None:
|
|
log.info('Bridge: notified of disconnect')
|
|
self._tx.put_nowait(bytearray())
|
|
|
|
async def ble(self):
|
|
"""Connects and handles the Bluetooth connection."""
|
|
if self._stop.is_set():
|
|
return
|
|
|
|
async with _LOCK:
|
|
log.info(f'connecting to {self._address}')
|
|
await self._device.connect()
|
|
|
|
with _CONNECTED.labels(self._address).track_inprogress():
|
|
log.info(f'Bridge: connected to {self._address}')
|
|
try:
|
|
self._device.set_disconnected_callback(self._disconnected)
|
|
|
|
tx_uuid = None
|
|
rx_uuid = None
|
|
|
|
services = await self._device.get_services()
|
|
for service in services:
|
|
for ch in service.characteristics:
|
|
if ch.uuid in _TX_UUIDS:
|
|
tx_uuid = ch.uuid
|
|
if ch.uuid in _RX_UUIDS:
|
|
rx_uuid = ch.uuid
|
|
|
|
if not tx_uuid or not rx_uuid:
|
|
log.warning('tx_uuid or rx_uuid not found, giving up')
|
|
return
|
|
|
|
await self._device.start_notify(rx_uuid, self._notify)
|
|
while True:
|
|
if self._stop.is_set():
|
|
return
|
|
if not await self._device.is_connected():
|
|
return
|
|
|
|
tx = await self._tx.get()
|
|
if not tx:
|
|
continue
|
|
for i in range(0, len(tx), _TX_CAP):
|
|
await self._device.write_gatt_char(
|
|
tx_uuid, tx[i:i + _TX_CAP])
|
|
_SENT.labels(self._address).inc(len(tx))
|
|
finally:
|
|
if await self._device.is_connected():
|
|
await self._device.disconnect()
|
|
log.info('Bridge: disconnected')
|
|
|
|
async def stop(self):
|
|
self._stop.set()
|
|
await self._tx.put(None)
|
|
await self._stopped.wait()
|
|
log.info('Bridge: stopped')
|
|
|
|
|
|
@click.command()
|
|
@click.option('--mac',
|
|
type=str,
|
|
required=True,
|
|
multiple=True,
|
|
help='MAC address to connect to.')
|
|
@click.option('--port',
|
|
type=int,
|
|
default=9876,
|
|
help='Port number to listen on.')
|
|
@click.option('--prometheus-port',
|
|
type=int,
|
|
default=7011,
|
|
help='Port number to listen for monitoring on.')
|
|
def main(mac: Sequence[str], port: int, prometheus_port: Optional[int]):
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
|
|
level=logging.INFO)
|
|
|
|
if prometheus_port:
|
|
prometheus_client.start_http_server(prometheus_port)
|
|
|
|
bridges = [Bridge(mac, port + i) for i, mac in enumerate(mac)]
|
|
|
|
loop = asyncio.get_event_loop()
|
|
systemd_stopper.install(callback=loop.stop)
|
|
|
|
try:
|
|
loop.run_until_complete(
|
|
asyncio.wait([bridge.run() for bridge in bridges]))
|
|
except (KeyboardInterrupt, RuntimeError):
|
|
pass
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(asyncio.wait([bridge.stop()
|
|
for bridge in bridges]))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|