210 lines
6.3 KiB
Python
210 lines
6.3 KiB
Python
import dataclasses
|
|
import datetime
|
|
import http.client
|
|
import io
|
|
import logging
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
from typing import Dict, Tuple
|
|
|
|
import prometheus_client
|
|
|
|
from . import Channel, Device, prometheus
|
|
|
|
logger = logging.getLogger('janet.ssdp')
|
|
|
|
_MSEARCH = """M-SEARCH * HTTP/1.1
|
|
Host: 239.255.255.250:1900
|
|
Man: "ssdp:discover"
|
|
ST: urn:schemas-sony-com:service:IRCC:1
|
|
MX: 3
|
|
User-Agent: Linux/5.8.0-40-generic UPnP/1.0 GSSDP/1.2.3
|
|
|
|
"""
|
|
|
|
_GROUP = ('239.255.255.250', 1900)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Service:
|
|
# Unique service name.
|
|
usn: str
|
|
# Service type.
|
|
st: str
|
|
# Location of the service.
|
|
location: str
|
|
# Server identifer, if any.
|
|
server: str
|
|
|
|
# Last time the service was seen.
|
|
seen: datetime.datetime
|
|
|
|
available: bool
|
|
|
|
|
|
class Discoverer:
|
|
def __init__(self):
|
|
self._services: Dict[Tuple[str, str], Tuple[Device, Service]] = {}
|
|
self._lock = threading.Lock()
|
|
self._listeners = Channel()
|
|
|
|
def listen(self, callback):
|
|
self._listeners.listen(callback)
|
|
|
|
def _searcher(self, sock: socket.socket):
|
|
while True:
|
|
sock.sendto(_MSEARCH.replace('\n', '\r\n').encode(), _GROUP)
|
|
time.sleep(10 * 60)
|
|
|
|
def _reaper(self):
|
|
while True:
|
|
time.sleep(71)
|
|
now = datetime.datetime.now()
|
|
expired = []
|
|
|
|
with self._lock:
|
|
for key, (device, service) in self._services.items():
|
|
age = now - service.seen
|
|
if age.total_seconds() >= 20 * 60:
|
|
logger.warning(f'expiring {service} as its {age}s old')
|
|
expired.append(key)
|
|
service.available = False
|
|
self._listeners.put(device, service)
|
|
|
|
for key in expired:
|
|
del self._services[key]
|
|
|
|
def _listener(self, sock: socket.socket):
|
|
while True:
|
|
method, headers = self._recv(sock)
|
|
|
|
if method == 'HTTP/1.1':
|
|
self._response(headers)
|
|
elif method == 'NOTIFY':
|
|
self._notify(headers)
|
|
elif method == 'M-SEARCH':
|
|
pass
|
|
else:
|
|
logger.info(f'Dropping unhandled method {method}')
|
|
pass
|
|
|
|
def run(self):
|
|
threading.Thread(target=self._reaper).start()
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
|
|
socket.IPPROTO_UDP)
|
|
|
|
threading.Thread(target=self._searcher, args=(sock, )).start()
|
|
threading.Thread(target=self._listener, args=(sock, )).start()
|
|
|
|
mcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
|
|
socket.IPPROTO_UDP)
|
|
mcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
|
|
mcast.bind(('', 1900))
|
|
group = socket.inet_aton(_GROUP[0])
|
|
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
|
|
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
|
|
|
|
self._listener(mcast)
|
|
|
|
def _response(self, headers):
|
|
logger.debug(f'search response {headers.items()}')
|
|
usn = headers.get('USN', None)
|
|
nt = headers.get('ST', None)
|
|
location = headers.get('LOCATION', None)
|
|
server = headers.get('SERVER', '')
|
|
self._alive(usn, nt, location, server)
|
|
|
|
def _notify(self, headers: http.client.HTTPMessage):
|
|
logger.debug(f'notify {headers.items()}')
|
|
usn = headers.get('USN', '')
|
|
if not usn:
|
|
logger.warning('Message has no USN, dropping')
|
|
return
|
|
nts = headers.get('NTS', '')
|
|
if not nts:
|
|
logger.warning('Message has no NTS, dropping')
|
|
return
|
|
nt = headers.get('NT', '')
|
|
|
|
if nts == 'ssdp:byebye':
|
|
self._byebye(usn, nt)
|
|
elif nts == 'ssdp:alive':
|
|
location = headers.get('LOCATION', '')
|
|
server = headers.get('SERVER', '')
|
|
self._alive(usn, nt, location, server)
|
|
else:
|
|
logger.warning(f'Unrecognised NTS {nts}, dropping')
|
|
|
|
def _byebye(self, usn: str, nt: str):
|
|
with self._lock:
|
|
key = (usn, nt)
|
|
|
|
if key in self._services:
|
|
device, service = self._services[key]
|
|
logger.warning(f'byebye {service}')
|
|
service.available = False
|
|
self._listeners.put(device, service)
|
|
|
|
del self._services[key]
|
|
|
|
def _alive(self, usn: str, st: str, location: str, server: str = ''):
|
|
if not st or not location:
|
|
logger.warning('Message is missing ST/NT or LOCATION, dropping')
|
|
return
|
|
|
|
with self._lock:
|
|
key = (usn, st)
|
|
if key not in self._services:
|
|
device = Device(identifiers=[f'ssdp_{usn}'],
|
|
name=st,
|
|
manufacturer=server,
|
|
kind='ssdp',
|
|
available=True)
|
|
service = Service(usn=usn,
|
|
st=st,
|
|
location=location,
|
|
server=server,
|
|
seen=datetime.datetime.now(),
|
|
available=True)
|
|
self._services[key] = (device, service)
|
|
logger.warning(f'discovered {service}')
|
|
else:
|
|
device, service = self._services[key]
|
|
|
|
service.server = service.server or server
|
|
service.seen = datetime.datetime.now()
|
|
self._listeners.put(device, service)
|
|
|
|
def _recv(self, sock) -> Tuple[str, http.client.HTTPMessage]:
|
|
got = sock.recv(2048)
|
|
msg = io.BytesIO(got)
|
|
top = msg.readline().decode().rstrip().split()
|
|
headers = http.client.parse_headers(msg)
|
|
if len(top) != 3:
|
|
logger.warning(f'Invalid HTTP first line {top}')
|
|
return '', headers
|
|
method, _, _ = top
|
|
return method, headers
|
|
|
|
|
|
def main():
|
|
prometheus_client.start_http_server(9876)
|
|
|
|
ssdp = Discoverer()
|
|
prom = prometheus.Client()
|
|
ssdp._listeners.listen(prom.publish)
|
|
|
|
def log(service):
|
|
print(f'Notify {service}')
|
|
|
|
ssdp._listeners.listen(log)
|
|
ssdp.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|