janet/janet/bravia.py

255 lines
7.5 KiB
Python

"""HTTP based control of Bravia TVs.
Tested with the 2012 KDL-40HX850.
"""
import dataclasses
import hashlib
import logging
import socket
import threading
import typing
import urllib.parse
import xml.etree.ElementTree
from typing import Any, Dict, Iterable, Optional, Sequence
from xml.etree.ElementTree import Element
import requests
from . import Channel, Device, ssdp
logger = logging.getLogger('janet.bravia')
_IRCC = """
<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:X_SendIRCC xmlns:u="urn:schemas-sony-com:service:IRCC:1">
<IRCCCode>{code}</IRCCCode>
</u:X_SendIRCC>
</s:Body>
</s:Envelope>
"""
@dataclasses.dataclass
class SystemInformation:
"""System information."""
name: str
generation: str
area: str
language: str
country: str
model_name: str
@dataclasses.dataclass
class StatusItem:
"""A single item in the status."""
name: str
source: str
title: Optional[str] = None
provider: Optional[str] = None
@dataclasses.dataclass
class Command:
"""A remote control command."""
name: str
type: str
value: str
def _attrib_factory(node: Element, schema: Any) -> Any:
"""Returns a new instance of `schema` using attributes from `node`."""
values = {}
for field in dataclasses.fields(schema):
values[field.name] = node.attrib[_camel_case(field.name)]
return schema(**values)
def _remote_commands_factory(nodes: Iterable[Element]) -> Dict[str, Command]:
commands = [_attrib_factory(n, Command) for n in nodes]
return {c.name: c for c in commands}
def _status_map_factory(nodes: Iterable[Element]) -> Sequence[StatusItem]:
items = []
for node in nodes:
values = {'name': node.attrib['name']}
for item in node:
values[item.attrib['field']] = item.attrib['value']
keep = {}
for f in dataclasses.fields(StatusItem):
keep[f.name] = values.get(f.name, None)
items.append(StatusItem(**keep))
return items
@dataclasses.dataclass
class Status:
"""Status of the TV."""
status: Dict[str, StatusItem] = dataclasses.field(
default_factory=_status_map_factory)
@dataclasses.dataclass
class RemoteCommands:
"""All remote commands accepted by the TV."""
command: Dict[str, Command] = dataclasses.field(
default_factory=_remote_commands_factory)
def _camel_case(snake: str) -> str:
"""Convert a snake_case name to camelCase."""
words = snake.split('_')
words = [words[0]] + [x.capitalize() for x in words[1:]]
return ''.join(words)
def _extract_xml(text: str, schema: Any) -> Any:
"""Extract an XML tree as a dataclass instance."""
root = xml.etree.ElementTree.fromstring(text)
values = {}
for field in dataclasses.fields(schema):
path = _camel_case(field.name)
nodes = root.findall(path)
if field.type == str:
values[field.name] = nodes[0].text
elif field.default_factory is not dataclasses.MISSING:
values[field.name] = field.default_factory(nodes)
else:
logger.warning('Skipping unhandled field type %s', field)
return schema(**values)
class Client:
def __init__(self, host: str):
self.host = host
self.path = host + '/cers/api/'
id = hashlib.sha1(socket.getfqdn().encode()).hexdigest()
self.device_id = f'braviamqtt:{id}'
self.device_info = 'bravia.py'
def _get(self, path: str, params: Optional[Dict[str, str]] = None) -> str:
"""Perform a GET with authentication."""
headers = {
'X-CERS-DEVICE-ID': self.device_id,
'X-CERS-DEVICE-INFO': self.device_info,
}
r = requests.get(self.path + path,
params=params,
headers=headers,
timeout=3)
r.raise_for_status()
return r.text
def get_system_information(self) -> SystemInformation:
return _extract_xml(self._get('getSystemInformation'),
SystemInformation)
def register(self) -> None:
"""Register with the TV.
This is a no-op if the client has already registered. Otherwise the TV
will show a prompt asking to approve the new connection.
"""
self._get('register',
params={
'name': 'braviamqtt',
'registrationType': 'initial',
'deviceId': self.device_id,
})
def get_remote_command_list(self) -> RemoteCommands:
"""Fetch all of the supported remote commands."""
return _extract_xml(self._get('getRemoteCommandList'), RemoteCommands)
def get_status(self) -> Status:
"""Fetch the current high level status."""
return _extract_xml(self._get('getStatus'), Status)
def ircc(self, code: str) -> None:
"""Send a remote control command."""
path = '/IRCC'
headers = {
'soapaction': 'urn:schemas-sony-com:service:IRCC:1#X_SendIRCC',
'X-CERS-DEVICE-ID': self.device_id,
'X-CERS-DEVICE-INFO': self.device_info,
}
body = _IRCC.strip().format(code=code)
r = requests.post(self.host + path, data=body, headers=headers)
r.raise_for_status()
class Bridge:
def __init__(self, location: str, listeners: Channel):
self._listeners = listeners
self._location = location
url = urllib.parse.urlparse(location)
host = url.netloc.split(':')[0]
self._bravia = Client(f'{url.scheme}://{host}')
self._device = None
def start(self):
info = self._bravia.get_system_information()
self._device = Device(
identifiers=[f'bravia_{self._location}'],
name=info.name,
model=info.model_name,
sw_version=info.generation,
manufacturer='SONY',
kind='bravia',
available=True,
)
self._listeners.put(self._device, self._device)
self._listeners.put(self._device, info)
self._bravia.register()
commands = self._bravia.get_remote_command_list()
self._listeners.put(self._device, commands)
def available(self):
status = self._bravia.get_status()
for item in status.status:
self._listeners.put(self._device, item)
def stop(self):
if self._device:
self._device.available = False
self._listeners.put(self._device, self._device)
class Discoverer:
def __init__(self):
self._listeners = Channel()
self._devices = {}
self._lock = threading.Lock()
def ssdp(self, device: Device, entity: Any) -> None:
if not isinstance(entity, ssdp.Service):
return
entity = typing.cast(ssdp.Service, entity)
if entity.st != 'urn:schemas-sony-com:service:IRCC:1':
return
if entity.available:
with self._lock:
if entity.usn not in self._devices:
dev = Bridge(entity.location, self._listeners)
self._devices[entity.usn] = dev
dev.start()
self._devices[entity.usn].available()
else:
with self._lock:
dev = self._devices.get(entity.usn, None)
if dev is not None:
dev.stop()
del self._devices[entity.usn]
def listen(self, listener) -> None:
self._listeners.listen(listener)