janet/janet/serialbridge.py

171 lines
4.3 KiB
Python

import logging
import queue
import socketserver
import threading
from dataclasses import dataclass
from typing import Set, Tuple
import click
import prometheus_client
import serial
from janet import Channel, Device
logger = logging.getLogger('janet.serialbridge')
@dataclass
class State:
client_count: int
sent_bytes: int
recv_bytes: int
class Client(socketserver.BaseRequestHandler):
"""Handles a single socket client."""
def setup(self):
self._tx = queue.Queue(100)
self.server.register(self)
def handle(self):
def sender():
while True:
got = self._tx.get()
if not got:
break
self.request.send(got)
sender = threading.Thread(target=sender)
sender.start()
try:
while True:
got = self.request.recv(16)
if not got:
break
self.server.put(got)
finally:
self._tx.put(None)
sender.join()
def finish(self):
self.server.unregister(self)
def put(self, data: bytes):
self._tx.put_nowait(data)
class Bridge(socketserver.ThreadingTCPServer):
"""Bridges between a BLE based serial adapter and client sockets."""
def __init__(self, server_address: Tuple[str, int], port: str):
super().__init__(server_address, Client)
self._name = port
self._port = serial.Serial(port, 19200, timeout=1)
self._lock = threading.RLock()
self._clients: Set[Client] = set()
self._tx: 'queue.Queue[bytes]' = queue.Queue(100)
self._connected = False
self._sent_bytes = 0
self._recv_bytes = 0
self._listeners = Channel()
def listen(self, listener) -> None:
self._listeners.listen(listener)
def register(self, client: Client):
with self._lock:
self._clients.add(client)
def unregister(self, client: Client):
with self._lock:
self._clients.remove(client)
def _emit(self):
device = Device(name=f'serialbridge {self._name}',
identifiers=[f'serialbridge_{self._name}'],
kind='serialbridge',
available=True)
status = State(
sent_bytes=self._sent_bytes,
recv_bytes=self._recv_bytes,
client_count=len(self._clients),
)
self._listeners.put(device, status)
def run(self):
server = threading.Thread(target=self.serve_forever)
server.start()
writer = threading.Thread(target=self._write)
writer.start()
try:
self._read()
finally:
self.shutdown()
server.join()
self._tx.put(None)
writer.join()
def _write(self):
while True:
tx = self._tx.get()
if tx is None:
return
self._port.write(tx)
self._sent_bytes += len(tx)
self._emit()
def _read(self):
while True:
available = min(1, self._port.in_waiting)
rx = self._port.read(available)
if not rx:
continue
with self._lock:
for client in self._clients:
client.put(rx)
self._recv_bytes += len(rx)
self._emit()
def put(self, data: bytes):
self._tx.put(data)
@click.command()
@click.option('--serial-port',
type=str,
required=True,
help='pyserial port to connect to.')
@click.option('--port',
type=int,
default=7012,
help='Port number to listen on.')
@click.option('--prometheus_port',
type=int,
default=7013,
help='Port number to listen for monitoring on.')
def main(serial_port: str, port: int, prometheus_port: int):
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=logging.INFO)
import janet.prometheus
prometheus_client.start_http_server(prometheus_port)
prom = janet.prometheus.Client()
addr = ('', port)
bridge = Bridge(addr, serial_port)
bridge.listen(prom.publish)
bridge.run()
if __name__ == '__main__':
main()