import logging import queue import socketserver import threading from dataclasses import dataclass from typing import Set, Tuple import click import prometheus_client import serial from janet import Channel, Device logger = logging.getLogger('janet.serialbridge') @dataclass class State: client_count: int sent_bytes: int recv_bytes: int class Client(socketserver.BaseRequestHandler): """Handles a single socket client.""" def setup(self): self._tx = queue.Queue(100) self.server.register(self) def handle(self): def sender(): while True: got = self._tx.get() if not got: break self.request.send(got) sender = threading.Thread(target=sender) sender.start() try: while True: got = self.request.recv(16) if not got: break self.server.put(got) finally: self._tx.put(None) sender.join() def finish(self): self.server.unregister(self) def put(self, data: bytes): self._tx.put_nowait(data) class Bridge(socketserver.ThreadingTCPServer): """Bridges between a BLE based serial adapter and client sockets.""" def __init__(self, server_address: Tuple[str, int], port: str): super().__init__(server_address, Client) self._name = port self._port = serial.Serial(port, 19200, timeout=1) self._lock = threading.RLock() self._clients: Set[Client] = set() self._tx: 'queue.Queue[bytes]' = queue.Queue(100) self._connected = False self._sent_bytes = 0 self._recv_bytes = 0 self._listeners = Channel() def listen(self, listener) -> None: self._listeners.listen(listener) def register(self, client: Client): with self._lock: self._clients.add(client) def unregister(self, client: Client): with self._lock: self._clients.remove(client) def _emit(self): device = Device(name=f'serialbridge {self._name}', identifiers=[f'serialbridge_{self._name}'], kind='serialbridge', available=True) status = State( sent_bytes=self._sent_bytes, recv_bytes=self._recv_bytes, client_count=len(self._clients), ) self._listeners.put(device, status) def run(self): server = threading.Thread(target=self.serve_forever) server.start() writer = threading.Thread(target=self._write) writer.start() try: self._read() finally: self.shutdown() server.join() self._tx.put(None) writer.join() def _write(self): while True: tx = self._tx.get() if tx is None: return self._port.write(tx) self._sent_bytes += len(tx) self._emit() def _read(self): while True: available = min(1, self._port.in_waiting) rx = self._port.read(available) if not rx: continue with self._lock: for client in self._clients: client.put(rx) self._recv_bytes += len(rx) self._emit() def put(self, data: bytes): self._tx.put(data) @click.command() @click.option('--serial-port', type=str, required=True, help='pyserial port to connect to.') @click.option('--port', type=int, default=7012, help='Port number to listen on.') @click.option('--prometheus_port', type=int, default=7013, help='Port number to listen for monitoring on.') def main(serial_port: str, port: int, prometheus_port: int): logging.basicConfig( format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', level=logging.INFO) import janet.prometheus prometheus_client.start_http_server(prometheus_port) prom = janet.prometheus.Client() addr = ('', port) bridge = Bridge(addr, serial_port) bridge.listen(prom.publish) bridge.run() if __name__ == '__main__': main()