171 lines
4.3 KiB
Python
171 lines
4.3 KiB
Python
import logging
|
|
import queue
|
|
import socketserver
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from typing import Set, Tuple
|
|
|
|
import click
|
|
import prometheus_client
|
|
import serial
|
|
|
|
from janet import Channel, Device
|
|
|
|
logger = logging.getLogger('janet.serialbridge')
|
|
|
|
|
|
@dataclass
|
|
class State:
|
|
client_count: int
|
|
sent_bytes: int
|
|
recv_bytes: int
|
|
|
|
|
|
class Client(socketserver.BaseRequestHandler):
|
|
"""Handles a single socket client."""
|
|
def setup(self):
|
|
self._tx = queue.Queue(100)
|
|
self.server.register(self)
|
|
|
|
def handle(self):
|
|
def sender():
|
|
while True:
|
|
got = self._tx.get()
|
|
if not got:
|
|
break
|
|
self.request.send(got)
|
|
|
|
sender = threading.Thread(target=sender)
|
|
sender.start()
|
|
try:
|
|
while True:
|
|
got = self.request.recv(16)
|
|
if not got:
|
|
break
|
|
self.server.put(got)
|
|
finally:
|
|
self._tx.put(None)
|
|
sender.join()
|
|
|
|
def finish(self):
|
|
self.server.unregister(self)
|
|
|
|
def put(self, data: bytes):
|
|
self._tx.put_nowait(data)
|
|
|
|
|
|
class Bridge(socketserver.ThreadingTCPServer):
|
|
"""Bridges between a BLE based serial adapter and client sockets."""
|
|
def __init__(self, server_address: Tuple[str, int], port: str):
|
|
|
|
super().__init__(server_address, Client)
|
|
self._name = port
|
|
self._port = serial.Serial(port, 19200, timeout=1)
|
|
|
|
self._lock = threading.RLock()
|
|
self._clients: Set[Client] = set()
|
|
self._tx: 'queue.Queue[bytes]' = queue.Queue(100)
|
|
|
|
self._connected = False
|
|
self._sent_bytes = 0
|
|
self._recv_bytes = 0
|
|
|
|
self._listeners = Channel()
|
|
|
|
def listen(self, listener) -> None:
|
|
self._listeners.listen(listener)
|
|
|
|
def register(self, client: Client):
|
|
with self._lock:
|
|
self._clients.add(client)
|
|
|
|
def unregister(self, client: Client):
|
|
with self._lock:
|
|
self._clients.remove(client)
|
|
|
|
def _emit(self):
|
|
device = Device(name=f'serialbridge {self._name}',
|
|
identifiers=[f'serialbridge_{self._name}'],
|
|
kind='serialbridge',
|
|
available=True)
|
|
|
|
status = State(
|
|
sent_bytes=self._sent_bytes,
|
|
recv_bytes=self._recv_bytes,
|
|
client_count=len(self._clients),
|
|
)
|
|
self._listeners.put(device, status)
|
|
|
|
def run(self):
|
|
server = threading.Thread(target=self.serve_forever)
|
|
server.start()
|
|
writer = threading.Thread(target=self._write)
|
|
writer.start()
|
|
|
|
try:
|
|
self._read()
|
|
finally:
|
|
self.shutdown()
|
|
server.join()
|
|
self._tx.put(None)
|
|
writer.join()
|
|
|
|
def _write(self):
|
|
while True:
|
|
tx = self._tx.get()
|
|
if tx is None:
|
|
return
|
|
|
|
self._port.write(tx)
|
|
self._sent_bytes += len(tx)
|
|
self._emit()
|
|
|
|
def _read(self):
|
|
while True:
|
|
available = min(1, self._port.in_waiting)
|
|
rx = self._port.read(available)
|
|
if not rx:
|
|
continue
|
|
with self._lock:
|
|
for client in self._clients:
|
|
client.put(rx)
|
|
self._recv_bytes += len(rx)
|
|
self._emit()
|
|
|
|
def put(self, data: bytes):
|
|
self._tx.put(data)
|
|
|
|
|
|
@click.command()
|
|
@click.option('--serial-port',
|
|
type=str,
|
|
required=True,
|
|
help='pyserial port to connect to.')
|
|
@click.option('--port',
|
|
type=int,
|
|
default=7012,
|
|
help='Port number to listen on.')
|
|
@click.option('--prometheus_port',
|
|
type=int,
|
|
default=7013,
|
|
help='Port number to listen for monitoring on.')
|
|
def main(serial_port: str, port: int, prometheus_port: int):
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
|
|
level=logging.INFO)
|
|
|
|
import janet.prometheus
|
|
|
|
prometheus_client.start_http_server(prometheus_port)
|
|
prom = janet.prometheus.Client()
|
|
|
|
addr = ('', port)
|
|
bridge = Bridge(addr, serial_port)
|
|
bridge.listen(prom.publish)
|
|
|
|
bridge.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|