janet/janet/ssdp.py

210 lines
6.3 KiB
Python
Raw Permalink Normal View History

import dataclasses
2021-02-13 14:54:45 +01:00
import datetime
import http.client
import io
import logging
2021-02-13 14:54:45 +01:00
import socket
import struct
import threading
import time
from typing import Dict, Tuple
2021-02-13 14:54:45 +01:00
import prometheus_client
from . import Channel, Device, prometheus
2021-02-13 14:54:45 +01:00
logger = logging.getLogger('janet.ssdp')
2021-02-13 14:54:45 +01:00
_MSEARCH = """M-SEARCH * HTTP/1.1
Host: 239.255.255.250:1900
Man: "ssdp:discover"
ST: urn:schemas-sony-com:service:IRCC:1
MX: 3
User-Agent: Linux/5.8.0-40-generic UPnP/1.0 GSSDP/1.2.3
"""
_GROUP = ('239.255.255.250', 1900)
@dataclasses.dataclass
class Service:
# Unique service name.
usn: str
# Service type.
st: str
# Location of the service.
location: str
# Server identifer, if any.
server: str
# Last time the service was seen.
seen: datetime.datetime
available: bool
class Discoverer:
def __init__(self):
self._services: Dict[Tuple[str, str], Tuple[Device, Service]] = {}
2021-02-13 14:54:45 +01:00
self._lock = threading.Lock()
self._listeners = Channel()
2021-02-14 17:31:28 +01:00
def listen(self, callback):
self._listeners.listen(callback)
2021-02-13 14:54:45 +01:00
def _searcher(self, sock: socket.socket):
while True:
sock.sendto(_MSEARCH.replace('\n', '\r\n').encode(), _GROUP)
time.sleep(10 * 60)
def _reaper(self):
while True:
time.sleep(71)
now = datetime.datetime.now()
expired = []
with self._lock:
2021-02-14 17:31:28 +01:00
for key, (device, service) in self._services.items():
2021-02-13 14:54:45 +01:00
age = now - service.seen
if age.total_seconds() >= 20 * 60:
logger.warning(f'expiring {service} as its {age}s old')
2021-02-13 14:54:45 +01:00
expired.append(key)
service.available = False
2021-02-14 17:31:28 +01:00
self._listeners.put(device, service)
2021-02-13 14:54:45 +01:00
for key in expired:
del self._services[key]
def _listener(self, sock: socket.socket):
while True:
method, headers = self._recv(sock)
if method == 'HTTP/1.1':
self._response(headers)
elif method == 'NOTIFY':
self._notify(headers)
2021-02-14 17:31:28 +01:00
elif method == 'M-SEARCH':
pass
2021-02-13 14:54:45 +01:00
else:
logger.info(f'Dropping unhandled method {method}')
2021-02-13 14:54:45 +01:00
pass
def run(self):
threading.Thread(target=self._reaper).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
threading.Thread(target=self._searcher, args=(sock, )).start()
threading.Thread(target=self._listener, args=(sock, )).start()
mcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
mcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
mcast.bind(('', 1900))
group = socket.inet_aton(_GROUP[0])
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
mcast.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self._listener(mcast)
def _response(self, headers):
logger.debug(f'search response {headers.items()}')
2021-02-13 14:54:45 +01:00
usn = headers.get('USN', None)
nt = headers.get('ST', None)
location = headers.get('LOCATION', None)
server = headers.get('SERVER', '')
self._alive(usn, nt, location, server)
def _notify(self, headers: http.client.HTTPMessage):
logger.debug(f'notify {headers.items()}')
usn = headers.get('USN', '')
2021-02-13 14:54:45 +01:00
if not usn:
2021-02-26 16:46:54 +01:00
logger.warning('Message has no USN, dropping')
2021-02-13 14:54:45 +01:00
return
nts = headers.get('NTS', '')
2021-02-13 14:54:45 +01:00
if not nts:
2021-02-26 16:46:54 +01:00
logger.warning('Message has no NTS, dropping')
2021-02-13 14:54:45 +01:00
return
nt = headers.get('NT', '')
2021-02-13 14:54:45 +01:00
if nts == 'ssdp:byebye':
self._byebye(usn, nt)
elif nts == 'ssdp:alive':
location = headers.get('LOCATION', '')
2021-02-13 14:54:45 +01:00
server = headers.get('SERVER', '')
self._alive(usn, nt, location, server)
else:
logger.warning(f'Unrecognised NTS {nts}, dropping')
2021-02-13 14:54:45 +01:00
def _byebye(self, usn: str, nt: str):
with self._lock:
key = (usn, nt)
2021-02-14 17:31:28 +01:00
if key in self._services:
device, service = self._services[key]
logger.warning(f'byebye {service}')
2021-02-13 14:54:45 +01:00
service.available = False
2021-02-14 17:31:28 +01:00
self._listeners.put(device, service)
2021-02-13 14:54:45 +01:00
del self._services[key]
def _alive(self, usn: str, st: str, location: str, server: str = ''):
if not st or not location:
logger.warning('Message is missing ST/NT or LOCATION, dropping')
2021-02-13 14:54:45 +01:00
return
with self._lock:
key = (usn, st)
2021-02-14 17:31:28 +01:00
if key not in self._services:
2021-02-13 14:54:45 +01:00
device = Device(identifiers=[f'ssdp_{usn}'],
name=st,
manufacturer=server,
kind='ssdp',
available=True)
2021-02-14 17:31:28 +01:00
service = Service(usn=usn,
2021-02-13 14:54:45 +01:00
st=st,
location=location,
server=server,
seen=datetime.datetime.now(),
available=True)
2021-02-14 17:31:28 +01:00
self._services[key] = (device, service)
logger.warning(f'discovered {service}')
2021-02-14 17:31:28 +01:00
else:
device, service = self._services[key]
2021-02-13 14:54:45 +01:00
service.server = service.server or server
service.seen = datetime.datetime.now()
2021-02-14 17:31:28 +01:00
self._listeners.put(device, service)
2021-02-13 14:54:45 +01:00
def _recv(self, sock) -> Tuple[str, http.client.HTTPMessage]:
got = sock.recv(2048)
msg = io.BytesIO(got)
top = msg.readline().decode().rstrip().split()
headers = http.client.parse_headers(msg)
2021-02-13 14:54:45 +01:00
if len(top) != 3:
logger.warning(f'Invalid HTTP first line {top}')
return '', headers
2021-02-13 14:54:45 +01:00
method, _, _ = top
return method, headers
def main():
prometheus_client.start_http_server(9876)
ssdp = Discoverer()
prom = prometheus.Client()
ssdp._listeners.listen(prom.publish)
def log(service):
print(f'Notify {service}')
ssdp._listeners.listen(log)
ssdp.run()
if __name__ == '__main__':
main()