janet/janet/ssdp.py

210 lines
6.3 KiB
Python

import dataclasses
import datetime
import http.client
import io
import logging
import socket
import struct
import threading
import time
from typing import Dict, Tuple
import prometheus_client
from . import Channel, Device, prometheus
logger = logging.getLogger('janet.ssdp')
_MSEARCH = """M-SEARCH * HTTP/1.1
Host: 239.255.255.250:1900
Man: "ssdp:discover"
ST: urn:schemas-sony-com:service:IRCC:1
MX: 3
User-Agent: Linux/5.8.0-40-generic UPnP/1.0 GSSDP/1.2.3
"""
_GROUP = ('239.255.255.250', 1900)
@dataclasses.dataclass
class Service:
# Unique service name.
usn: str
# Service type.
st: str
# Location of the service.
location: str
# Server identifer, if any.
server: str
# Last time the service was seen.
seen: datetime.datetime
available: bool
class Discoverer:
def __init__(self):
self._services: Dict[Tuple[str, str], Tuple[Device, Service]] = {}
self._lock = threading.Lock()
self._listeners = Channel()
def listen(self, callback):
self._listeners.listen(callback)
def _searcher(self, sock: socket.socket):
while True:
sock.sendto(_MSEARCH.replace('\n', '\r\n').encode(), _GROUP)
time.sleep(10 * 60)
def _reaper(self):
while True:
time.sleep(71)
now = datetime.datetime.now()
expired = []
with self._lock:
for key, (device, service) in self._services.items():
age = now - service.seen
if age.total_seconds() >= 20 * 60:
logger.warning(f'expiring {service} as its {age}s old')
expired.append(key)
service.available = False
self._listeners.put(device, service)
for key in expired:
del self._services[key]
def _listener(self, sock: socket.socket):
while True:
method, headers = self._recv(sock)
if method == 'HTTP/1.1':
self._response(headers)
elif method == 'NOTIFY':
self._notify(headers)
elif method == 'M-SEARCH':
pass
else:
logger.info(f'Dropping unhandled method {method}')
pass
def run(self):
threading.Thread(target=self._reaper).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
threading.Thread(target=self._searcher, args=(sock, )).start()
threading.Thread(target=self._listener, args=(sock, )).start()
mcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
mcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
mcast.bind(('', 1900))
group = socket.inet_aton(_GROUP[0])
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self._listener(mcast)
def _response(self, headers):
logger.debug(f'search response {headers.items()}')
usn = headers.get('USN', None)
nt = headers.get('ST', None)
location = headers.get('LOCATION', None)
server = headers.get('SERVER', '')
self._alive(usn, nt, location, server)
def _notify(self, headers: http.client.HTTPMessage):
logger.debug(f'notify {headers.items()}')
usn = headers.get('USN', '')
if not usn:
logger.warning('Message has no USN, dropping')
return
nts = headers.get('NTS', '')
if not nts:
logger.warning('Message has no NTS, dropping')
return
nt = headers.get('NT', '')
if nts == 'ssdp:byebye':
self._byebye(usn, nt)
elif nts == 'ssdp:alive':
location = headers.get('LOCATION', '')
server = headers.get('SERVER', '')
self._alive(usn, nt, location, server)
else:
logger.warning(f'Unrecognised NTS {nts}, dropping')
def _byebye(self, usn: str, nt: str):
with self._lock:
key = (usn, nt)
if key in self._services:
device, service = self._services[key]
logger.warning(f'byebye {service}')
service.available = False
self._listeners.put(device, service)
del self._services[key]
def _alive(self, usn: str, st: str, location: str, server: str = ''):
if not st or not location:
logger.warning('Message is missing ST/NT or LOCATION, dropping')
return
with self._lock:
key = (usn, st)
if key not in self._services:
device = Device(identifiers=[f'ssdp_{usn}'],
name=st,
manufacturer=server,
kind='ssdp',
available=True)
service = Service(usn=usn,
st=st,
location=location,
server=server,
seen=datetime.datetime.now(),
available=True)
self._services[key] = (device, service)
logger.warning(f'discovered {service}')
else:
device, service = self._services[key]
service.server = service.server or server
service.seen = datetime.datetime.now()
self._listeners.put(device, service)
def _recv(self, sock) -> Tuple[str, http.client.HTTPMessage]:
got = sock.recv(2048)
msg = io.BytesIO(got)
top = msg.readline().decode().rstrip().split()
headers = http.client.parse_headers(msg)
if len(top) != 3:
logger.warning(f'Invalid HTTP first line {top}')
return '', headers
method, _, _ = top
return method, headers
def main():
prometheus_client.start_http_server(9876)
ssdp = Discoverer()
prom = prometheus.Client()
ssdp._listeners.listen(prom.publish)
def log(service):
print(f'Notify {service}')
ssdp._listeners.listen(log)
ssdp.run()
if __name__ == '__main__':
main()