janet/janet/bravia.py

255 lines
7.5 KiB
Python
Raw Permalink Normal View History

2021-02-14 17:31:28 +01:00
"""HTTP based control of Bravia TVs.
Tested with the 2012 KDL-40HX850.
"""
import dataclasses
import hashlib
import logging
2021-02-14 17:31:28 +01:00
import socket
import threading
2021-02-14 17:31:28 +01:00
import typing
import urllib.parse
2021-02-14 17:31:28 +01:00
import xml.etree.ElementTree
from typing import Any, Dict, Iterable, Optional, Sequence
2021-02-14 17:31:28 +01:00
from xml.etree.ElementTree import Element
import requests
from . import Channel, Device, ssdp
2021-02-14 17:31:28 +01:00
logger = logging.getLogger('janet.bravia')
_IRCC = """
<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:X_SendIRCC xmlns:u="urn:schemas-sony-com:service:IRCC:1">
<IRCCCode>{code}</IRCCCode>
</u:X_SendIRCC>
</s:Body>
</s:Envelope>
"""
@dataclasses.dataclass
class SystemInformation:
"""System information."""
name: str
generation: str
area: str
language: str
country: str
model_name: str
@dataclasses.dataclass
class StatusItem:
"""A single item in the status."""
name: str
source: str
title: Optional[str] = None
provider: Optional[str] = None
2021-02-14 17:31:28 +01:00
@dataclasses.dataclass
class Command:
"""A remote control command."""
name: str
type: str
value: str
def _attrib_factory(node: Element, schema: Any) -> Any:
"""Returns a new instance of `schema` using attributes from `node`."""
values = {}
for field in dataclasses.fields(schema):
values[field.name] = node.attrib[_camel_case(field.name)]
return schema(**values)
def _remote_commands_factory(nodes: Iterable[Element]) -> Dict[str, Command]:
commands = [_attrib_factory(n, Command) for n in nodes]
return {c.name: c for c in commands}
def _status_map_factory(nodes: Iterable[Element]) -> Sequence[StatusItem]:
2021-02-14 17:31:28 +01:00
items = []
for node in nodes:
values = {'name': node.attrib['name']}
for item in node:
values[item.attrib['field']] = item.attrib['value']
keep = {}
for f in dataclasses.fields(StatusItem):
keep[f.name] = values.get(f.name, None)
items.append(StatusItem(**keep))
return items
@dataclasses.dataclass
class Status:
"""Status of the TV."""
status: Dict[str, StatusItem] = dataclasses.field(
default_factory=_status_map_factory)
@dataclasses.dataclass
class RemoteCommands:
"""All remote commands accepted by the TV."""
command: Dict[str, Command] = dataclasses.field(
default_factory=_remote_commands_factory)
def _camel_case(snake: str) -> str:
"""Convert a snake_case name to camelCase."""
words = snake.split('_')
words = [words[0]] + [x.capitalize() for x in words[1:]]
return ''.join(words)
def _extract_xml(text: str, schema: Any) -> Any:
"""Extract an XML tree as a dataclass instance."""
root = xml.etree.ElementTree.fromstring(text)
values = {}
for field in dataclasses.fields(schema):
path = _camel_case(field.name)
nodes = root.findall(path)
if field.type == str:
values[field.name] = nodes[0].text
2021-02-26 16:46:54 +01:00
elif field.default_factory is not dataclasses.MISSING:
2021-02-14 17:31:28 +01:00
values[field.name] = field.default_factory(nodes)
else:
logger.warning('Skipping unhandled field type %s', field)
return schema(**values)
class Client:
def __init__(self, host: str):
self.host = host
self.path = host + '/cers/api/'
id = hashlib.sha1(socket.getfqdn().encode()).hexdigest()
self.device_id = f'braviamqtt:{id}'
self.device_info = 'bravia.py'
def _get(self, path: str, params: Optional[Dict[str, str]] = None) -> str:
"""Perform a GET with authentication."""
headers = {
'X-CERS-DEVICE-ID': self.device_id,
'X-CERS-DEVICE-INFO': self.device_info,
}
2021-02-21 13:20:20 +01:00
r = requests.get(self.path + path,
params=params,
headers=headers,
timeout=3)
2021-02-14 17:31:28 +01:00
r.raise_for_status()
return r.text
def get_system_information(self) -> SystemInformation:
return _extract_xml(self._get('getSystemInformation'),
SystemInformation)
def register(self) -> None:
"""Register with the TV.
2021-02-26 16:46:54 +01:00
This is a no-op if the client has already registered. Otherwise the TV
will show a prompt asking to approve the new connection.
2021-02-14 17:31:28 +01:00
"""
self._get('register',
params={
'name': 'braviamqtt',
'registrationType': 'initial',
'deviceId': self.device_id,
})
def get_remote_command_list(self) -> RemoteCommands:
"""Fetch all of the supported remote commands."""
return _extract_xml(self._get('getRemoteCommandList'), RemoteCommands)
def get_status(self) -> Status:
"""Fetch the current high level status."""
return _extract_xml(self._get('getStatus'), Status)
def ircc(self, code: str) -> None:
"""Send a remote control command."""
path = '/IRCC'
headers = {
'soapaction': 'urn:schemas-sony-com:service:IRCC:1#X_SendIRCC',
'X-CERS-DEVICE-ID': self.device_id,
'X-CERS-DEVICE-INFO': self.device_info,
}
body = _IRCC.strip().format(code=code)
r = requests.post(self.host + path, data=body, headers=headers)
r.raise_for_status()
class Bridge:
def __init__(self, location: str, listeners: Channel):
self._listeners = listeners
self._location = location
url = urllib.parse.urlparse(location)
host = url.netloc.split(':')[0]
self._bravia = Client(f'{url.scheme}://{host}')
2021-02-16 21:23:41 +01:00
self._device = None
2021-02-14 17:31:28 +01:00
def start(self):
info = self._bravia.get_system_information()
self._device = Device(
identifiers=[f'bravia_{self._location}'],
name=info.name,
model=info.model_name,
sw_version=info.generation,
manufacturer='SONY',
kind='bravia',
available=True,
)
self._listeners.put(self._device, self._device)
self._listeners.put(self._device, info)
self._bravia.register()
commands = self._bravia.get_remote_command_list()
self._listeners.put(self._device, commands)
def available(self):
status = self._bravia.get_status()
for item in status.status:
self._listeners.put(self._device, item)
def stop(self):
2021-02-16 21:23:41 +01:00
if self._device:
self._device.available = False
self._listeners.put(self._device, self._device)
2021-02-14 17:31:28 +01:00
class Discoverer:
def __init__(self):
self._listeners = Channel()
self._devices = {}
self._lock = threading.Lock()
def ssdp(self, device: Device, entity: Any) -> None:
2021-02-14 17:31:28 +01:00
if not isinstance(entity, ssdp.Service):
return
entity = typing.cast(ssdp.Service, entity)
if entity.st != 'urn:schemas-sony-com:service:IRCC:1':
return
if entity.available:
with self._lock:
if entity.usn not in self._devices:
dev = Bridge(entity.location, self._listeners)
self._devices[entity.usn] = dev
dev.start()
self._devices[entity.usn].available()
else:
with self._lock:
dev = self._devices.get(entity.usn, None)
if dev is not None:
dev.stop()
del self._devices[entity.usn]
def listen(self, listener) -> None:
self._listeners.listen(listener)