import dataclasses import datetime import http.client import io import logging import socket import struct import threading import time from typing import Dict, Tuple import prometheus_client from . import Channel, Device, prometheus logger = logging.getLogger('janet.ssdp') _MSEARCH = """M-SEARCH * HTTP/1.1 Host: 239.255.255.250:1900 Man: "ssdp:discover" ST: urn:schemas-sony-com:service:IRCC:1 MX: 3 User-Agent: Linux/5.8.0-40-generic UPnP/1.0 GSSDP/1.2.3 """ _GROUP = ('239.255.255.250', 1900) @dataclasses.dataclass class Service: # Unique service name. usn: str # Service type. st: str # Location of the service. location: str # Server identifer, if any. server: str # Last time the service was seen. seen: datetime.datetime available: bool class Discoverer: def __init__(self): self._services: Dict[Tuple[str, str], Tuple[Device, Service]] = {} self._lock = threading.Lock() self._listeners = Channel() def listen(self, callback): self._listeners.listen(callback) def _searcher(self, sock: socket.socket): while True: sock.sendto(_MSEARCH.replace('\n', '\r\n').encode(), _GROUP) time.sleep(10 * 60) def _reaper(self): while True: time.sleep(71) now = datetime.datetime.now() expired = [] with self._lock: for key, (device, service) in self._services.items(): age = now - service.seen if age.total_seconds() >= 20 * 60: logger.warning(f'expiring {service} as its {age}s old') expired.append(key) service.available = False self._listeners.put(device, service) for key in expired: del self._services[key] def _listener(self, sock: socket.socket): while True: method, headers = self._recv(sock) if method == 'HTTP/1.1': self._response(headers) elif method == 'NOTIFY': self._notify(headers) elif method == 'M-SEARCH': pass else: logger.info(f'Dropping unhandled method {method}') pass def run(self): threading.Thread(target=self._reaper).start() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) threading.Thread(target=self._searcher, args=(sock, )).start() threading.Thread(target=self._listener, args=(sock, )).start() mcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) mcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mcast.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) mcast.bind(('', 1900)) group = socket.inet_aton(_GROUP[0]) mreq = struct.pack('4sL', group, socket.INADDR_ANY) mcast.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self._listener(mcast) def _response(self, headers): logger.debug(f'search response {headers.items()}') usn = headers.get('USN', None) nt = headers.get('ST', None) location = headers.get('LOCATION', None) server = headers.get('SERVER', '') self._alive(usn, nt, location, server) def _notify(self, headers: http.client.HTTPMessage): logger.debug(f'notify {headers.items()}') usn = headers.get('USN', '') if not usn: logger.warning('Message has no USN, dropping') return nts = headers.get('NTS', '') if not nts: logger.warning('Message has no NTS, dropping') return nt = headers.get('NT', '') if nts == 'ssdp:byebye': self._byebye(usn, nt) elif nts == 'ssdp:alive': location = headers.get('LOCATION', '') server = headers.get('SERVER', '') self._alive(usn, nt, location, server) else: logger.warning(f'Unrecognised NTS {nts}, dropping') def _byebye(self, usn: str, nt: str): with self._lock: key = (usn, nt) if key in self._services: device, service = self._services[key] logger.warning(f'byebye {service}') service.available = False self._listeners.put(device, service) del self._services[key] def _alive(self, usn: str, st: str, location: str, server: str = ''): if not st or not location: logger.warning('Message is missing ST/NT or LOCATION, dropping') return with self._lock: key = (usn, st) if key not in self._services: device = Device(identifiers=[f'ssdp_{usn}'], name=st, manufacturer=server, kind='ssdp', available=True) service = Service(usn=usn, st=st, location=location, server=server, seen=datetime.datetime.now(), available=True) self._services[key] = (device, service) logger.warning(f'discovered {service}') else: device, service = self._services[key] service.server = service.server or server service.seen = datetime.datetime.now() self._listeners.put(device, service) def _recv(self, sock) -> Tuple[str, http.client.HTTPMessage]: got = sock.recv(2048) msg = io.BytesIO(got) top = msg.readline().decode().rstrip().split() headers = http.client.parse_headers(msg) if len(top) != 3: logger.warning(f'Invalid HTTP first line {top}') return '', headers method, _, _ = top return method, headers def main(): prometheus_client.start_http_server(9876) ssdp = Discoverer() prom = prometheus.Client() ssdp._listeners.listen(prom.publish) def log(service): print(f'Notify {service}') ssdp._listeners.listen(log) ssdp.run() if __name__ == '__main__': main()