"""Exports a BLE based UART as a socket.""" import asyncio import logging from typing import Set, Sequence, Optional import bleak import prometheus_client import click import txdbus.error import systemd_stopper log = logging.getLogger('janet.bleasync') # Lists the UUIDs used for the receive and TX characteristics. _RX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb', '0000fff1-0000-1000-8000-00805f9b34fb')) _TX_UUIDS = set(('0000ffe1-0000-1000-8000-00805f9b34fb', '0000fff2-0000-1000-8000-00805f9b34fb')) # Maximum number of bytes per write. _TX_CAP = 20 # bleak.connect seems to lock up on concurrent connect() calls. _LOCK = asyncio.Lock() _CONNECTED = prometheus_client.Gauge( 'blebridge_connected', 'Bridge is connected to the Bluetooth device', labelnames=('mac', )) _CLIENTS = prometheus_client.Gauge('blebridge_client_count', 'Number of connected socket clients', labelnames=('mac', )) _SENT = prometheus_client.Counter('blebridge_sent_bytes', 'Number of bytes sent over Bluetooth', labelnames=('mac', )) _RECEIVED = prometheus_client.Counter( 'blebridge_received_bytes', 'Number of bytes received over Bluetooth', labelnames=('mac', )) class Bridge: def __init__(self, address: str, port: int): self._address = address self._port = port self._device = bleak.BleakClient(address) self._writers: Set[asyncio.StreamWriter] = set() self._tx: asyncio.Queue[bytearray] = asyncio.Queue() self._stop = asyncio.Event() self._stopped = asyncio.Event() self._lock = asyncio.Lock() def _notify(self, unused_sender, data): """Handles data that arrives over Bluetooth.""" _RECEIVED.labels(self._address).inc(len(data)) for writer in list(self._writers): try: writer.write(data) except Exception as ex: log.warning( 'Exception while writing data to the client, removing', exc_info=ex) self._writers.remove(writer) async def _client(self, reader, writer): """Handles a single client socket connection.""" with _CLIENTS.labels(self._address).track_inprogress(): self._writers.add(writer) try: while True: got = await reader.read(100) if not got: log.info('client closed') return await self._tx.put(bytearray(got)) except BrokenPipeError as ex: log.info('broken pipe error, closing') finally: self._writers.remove(writer) writer.close() async def run(self): """Runs the main loop.""" server = await asyncio.start_server(self._client, None, self._port) s = asyncio.create_task(server.serve_forever()) try: while not self._stop.is_set(): try: await self.ble() except bleak.exc.BleakError as ex: log.info('Caught BLE exception, retrying', exc_info=ex) except txdbus.error.DBusException as ex: log.info('Caught DBus exception, retrying', exc_info=ex) if not self._stop.is_set(): await asyncio.sleep(5) except Exception as ex: log.warning('Unhandled exception, closing', exc_info=ex) raise finally: server.close() self._stopped.set() def _disconnected(self, unused_client) -> None: log.info('Bridge: notified of disconnect') self._tx.put_nowait(bytearray()) async def ble(self): """Connects and handles the Bluetooth connection.""" if self._stop.is_set(): return async with _LOCK: log.info(f'connecting to {self._address}') await self._device.connect() with _CONNECTED.labels(self._address).track_inprogress(): log.info(f'Bridge: connected to {self._address}') try: self._device.set_disconnected_callback(self._disconnected) tx_uuid = None rx_uuid = None services = await self._device.get_services() for service in services: for ch in service.characteristics: if ch.uuid in _TX_UUIDS: tx_uuid = ch.uuid if ch.uuid in _RX_UUIDS: rx_uuid = ch.uuid if not tx_uuid or not rx_uuid: log.warning('tx_uuid or rx_uuid not found, giving up') return await self._device.start_notify(rx_uuid, self._notify) while True: if self._stop.is_set(): return if not await self._device.is_connected(): return tx = await self._tx.get() if not tx: continue for i in range(0, len(tx), _TX_CAP): await self._device.write_gatt_char( tx_uuid, tx[i:i + _TX_CAP]) _SENT.labels(self._address).inc(len(tx)) finally: if await self._device.is_connected(): await self._device.disconnect() log.info('Bridge: disconnected') async def stop(self): self._stop.set() await self._tx.put(None) await self._stopped.wait() log.info('Bridge: stopped') @click.command() @click.option('--mac', type=str, required=True, multiple=True, help='MAC address to connect to.') @click.option('--port', type=int, default=9876, help='Port number to listen on.') @click.option('--prometheus-port', type=int, default=7011, help='Port number to listen for monitoring on.') def main(mac: Sequence[str], port: int, prometheus_port: Optional[int]): logging.basicConfig( format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', level=logging.INFO) if prometheus_port: prometheus_client.start_http_server(prometheus_port) bridges = [Bridge(mac, port + i) for i, mac in enumerate(mac)] loop = asyncio.get_event_loop() systemd_stopper.install(callback=loop.stop) try: loop.run_until_complete( asyncio.wait([bridge.run() for bridge in bridges])) except (KeyboardInterrupt, RuntimeError): pass loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([bridge.stop() for bridge in bridges])) if __name__ == '__main__': main()