255 lines
7.5 KiB
Python
255 lines
7.5 KiB
Python
"""HTTP based control of Bravia TVs.
|
|
|
|
Tested with the 2012 KDL-40HX850.
|
|
"""
|
|
import dataclasses
|
|
import hashlib
|
|
import logging
|
|
import socket
|
|
import threading
|
|
import typing
|
|
import urllib.parse
|
|
import xml.etree.ElementTree
|
|
from typing import Any, Dict, Iterable, Optional, Sequence
|
|
from xml.etree.ElementTree import Element
|
|
|
|
import requests
|
|
|
|
from . import Channel, Device, ssdp
|
|
|
|
logger = logging.getLogger('janet.bravia')
|
|
|
|
_IRCC = """
|
|
<?xml version="1.0"?>
|
|
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
|
|
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
|
|
<s:Body>
|
|
<u:X_SendIRCC xmlns:u="urn:schemas-sony-com:service:IRCC:1">
|
|
<IRCCCode>{code}</IRCCCode>
|
|
</u:X_SendIRCC>
|
|
</s:Body>
|
|
</s:Envelope>
|
|
"""
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SystemInformation:
|
|
"""System information."""
|
|
name: str
|
|
generation: str
|
|
area: str
|
|
language: str
|
|
country: str
|
|
model_name: str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class StatusItem:
|
|
"""A single item in the status."""
|
|
name: str
|
|
source: str
|
|
title: Optional[str] = None
|
|
provider: Optional[str] = None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Command:
|
|
"""A remote control command."""
|
|
name: str
|
|
type: str
|
|
value: str
|
|
|
|
|
|
def _attrib_factory(node: Element, schema: Any) -> Any:
|
|
"""Returns a new instance of `schema` using attributes from `node`."""
|
|
values = {}
|
|
for field in dataclasses.fields(schema):
|
|
values[field.name] = node.attrib[_camel_case(field.name)]
|
|
return schema(**values)
|
|
|
|
|
|
def _remote_commands_factory(nodes: Iterable[Element]) -> Dict[str, Command]:
|
|
commands = [_attrib_factory(n, Command) for n in nodes]
|
|
return {c.name: c for c in commands}
|
|
|
|
|
|
def _status_map_factory(nodes: Iterable[Element]) -> Sequence[StatusItem]:
|
|
items = []
|
|
for node in nodes:
|
|
values = {'name': node.attrib['name']}
|
|
for item in node:
|
|
values[item.attrib['field']] = item.attrib['value']
|
|
keep = {}
|
|
for f in dataclasses.fields(StatusItem):
|
|
keep[f.name] = values.get(f.name, None)
|
|
items.append(StatusItem(**keep))
|
|
|
|
return items
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Status:
|
|
"""Status of the TV."""
|
|
status: Dict[str, StatusItem] = dataclasses.field(
|
|
default_factory=_status_map_factory)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class RemoteCommands:
|
|
"""All remote commands accepted by the TV."""
|
|
command: Dict[str, Command] = dataclasses.field(
|
|
default_factory=_remote_commands_factory)
|
|
|
|
|
|
def _camel_case(snake: str) -> str:
|
|
"""Convert a snake_case name to camelCase."""
|
|
words = snake.split('_')
|
|
words = [words[0]] + [x.capitalize() for x in words[1:]]
|
|
return ''.join(words)
|
|
|
|
|
|
def _extract_xml(text: str, schema: Any) -> Any:
|
|
"""Extract an XML tree as a dataclass instance."""
|
|
root = xml.etree.ElementTree.fromstring(text)
|
|
|
|
values = {}
|
|
for field in dataclasses.fields(schema):
|
|
path = _camel_case(field.name)
|
|
nodes = root.findall(path)
|
|
if field.type == str:
|
|
values[field.name] = nodes[0].text
|
|
elif field.default_factory is not dataclasses.MISSING:
|
|
values[field.name] = field.default_factory(nodes)
|
|
else:
|
|
logger.warning('Skipping unhandled field type %s', field)
|
|
|
|
return schema(**values)
|
|
|
|
|
|
class Client:
|
|
def __init__(self, host: str):
|
|
self.host = host
|
|
self.path = host + '/cers/api/'
|
|
id = hashlib.sha1(socket.getfqdn().encode()).hexdigest()
|
|
self.device_id = f'braviamqtt:{id}'
|
|
self.device_info = 'bravia.py'
|
|
|
|
def _get(self, path: str, params: Optional[Dict[str, str]] = None) -> str:
|
|
"""Perform a GET with authentication."""
|
|
headers = {
|
|
'X-CERS-DEVICE-ID': self.device_id,
|
|
'X-CERS-DEVICE-INFO': self.device_info,
|
|
}
|
|
r = requests.get(self.path + path,
|
|
params=params,
|
|
headers=headers,
|
|
timeout=3)
|
|
r.raise_for_status()
|
|
return r.text
|
|
|
|
def get_system_information(self) -> SystemInformation:
|
|
return _extract_xml(self._get('getSystemInformation'),
|
|
SystemInformation)
|
|
|
|
def register(self) -> None:
|
|
"""Register with the TV.
|
|
|
|
This is a no-op if the client has already registered. Otherwise the TV
|
|
will show a prompt asking to approve the new connection.
|
|
"""
|
|
self._get('register',
|
|
params={
|
|
'name': 'braviamqtt',
|
|
'registrationType': 'initial',
|
|
'deviceId': self.device_id,
|
|
})
|
|
|
|
def get_remote_command_list(self) -> RemoteCommands:
|
|
"""Fetch all of the supported remote commands."""
|
|
return _extract_xml(self._get('getRemoteCommandList'), RemoteCommands)
|
|
|
|
def get_status(self) -> Status:
|
|
"""Fetch the current high level status."""
|
|
return _extract_xml(self._get('getStatus'), Status)
|
|
|
|
def ircc(self, code: str) -> None:
|
|
"""Send a remote control command."""
|
|
path = '/IRCC'
|
|
headers = {
|
|
'soapaction': 'urn:schemas-sony-com:service:IRCC:1#X_SendIRCC',
|
|
'X-CERS-DEVICE-ID': self.device_id,
|
|
'X-CERS-DEVICE-INFO': self.device_info,
|
|
}
|
|
body = _IRCC.strip().format(code=code)
|
|
r = requests.post(self.host + path, data=body, headers=headers)
|
|
r.raise_for_status()
|
|
|
|
|
|
class Bridge:
|
|
def __init__(self, location: str, listeners: Channel):
|
|
self._listeners = listeners
|
|
self._location = location
|
|
url = urllib.parse.urlparse(location)
|
|
host = url.netloc.split(':')[0]
|
|
self._bravia = Client(f'{url.scheme}://{host}')
|
|
self._device = None
|
|
|
|
def start(self):
|
|
info = self._bravia.get_system_information()
|
|
self._device = Device(
|
|
identifiers=[f'bravia_{self._location}'],
|
|
name=info.name,
|
|
model=info.model_name,
|
|
sw_version=info.generation,
|
|
manufacturer='SONY',
|
|
kind='bravia',
|
|
available=True,
|
|
)
|
|
self._listeners.put(self._device, self._device)
|
|
self._listeners.put(self._device, info)
|
|
|
|
self._bravia.register()
|
|
commands = self._bravia.get_remote_command_list()
|
|
self._listeners.put(self._device, commands)
|
|
|
|
def available(self):
|
|
status = self._bravia.get_status()
|
|
for item in status.status:
|
|
self._listeners.put(self._device, item)
|
|
|
|
def stop(self):
|
|
if self._device:
|
|
self._device.available = False
|
|
self._listeners.put(self._device, self._device)
|
|
|
|
|
|
class Discoverer:
|
|
def __init__(self):
|
|
self._listeners = Channel()
|
|
self._devices = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def ssdp(self, device: Device, entity: Any) -> None:
|
|
if not isinstance(entity, ssdp.Service):
|
|
return
|
|
entity = typing.cast(ssdp.Service, entity)
|
|
if entity.st != 'urn:schemas-sony-com:service:IRCC:1':
|
|
return
|
|
|
|
if entity.available:
|
|
with self._lock:
|
|
if entity.usn not in self._devices:
|
|
dev = Bridge(entity.location, self._listeners)
|
|
self._devices[entity.usn] = dev
|
|
dev.start()
|
|
self._devices[entity.usn].available()
|
|
else:
|
|
with self._lock:
|
|
dev = self._devices.get(entity.usn, None)
|
|
if dev is not None:
|
|
dev.stop()
|
|
del self._devices[entity.usn]
|
|
|
|
def listen(self, listener) -> None:
|
|
self._listeners.listen(listener)
|