468 lines
19 KiB
Python
468 lines
19 KiB
Python
"""
|
|
REST client for Leshan demo server
|
|
##################################
|
|
|
|
Copyright (c) 2023 Nordic Semiconductor ASA
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import binascii
|
|
import time
|
|
from datetime import datetime
|
|
from contextlib import contextmanager
|
|
import requests
|
|
|
|
class Leshan:
|
|
"""This class represents a Leshan client that interacts with demo server's REAT API"""
|
|
def __init__(self, url: str):
|
|
"""Initialize Leshan client and check if server is available"""
|
|
self.api_url = url
|
|
self.timeout = 10
|
|
#self.format = 'TLV'
|
|
self.format = "SENML_CBOR"
|
|
self._s = requests.Session()
|
|
try:
|
|
resp = self.get('/security/clients')
|
|
if not isinstance(resp, list):
|
|
raise RuntimeError('Did not receive list of endpoints')
|
|
except requests.exceptions.ConnectionError as exc:
|
|
raise RuntimeError('Leshan not responding') from exc
|
|
|
|
@staticmethod
|
|
def handle_response(resp: requests.models.Response):
|
|
"""
|
|
Handle the response received from the server.
|
|
|
|
Parameters:
|
|
- response: The response object received from the server.
|
|
|
|
Returns:
|
|
- dict: The parsed JSON response as a dictionary.
|
|
|
|
Raises:
|
|
- Exception: If the response indicates an error condition.
|
|
"""
|
|
if resp.status_code >= 300 or resp.status_code < 200:
|
|
raise RuntimeError(f'Error {resp.status_code}: {resp.text}')
|
|
if len(resp.text):
|
|
obj = json.loads(resp.text)
|
|
return obj
|
|
return None
|
|
|
|
def get(self, path: str):
|
|
"""Send HTTP GET query with typical parameters"""
|
|
params = {'timeout': self.timeout}
|
|
if self.format is not None:
|
|
params['format'] = self.format
|
|
resp = self._s.get(f'{self.api_url}{path}', params=params, timeout=self.timeout)
|
|
return Leshan.handle_response(resp)
|
|
|
|
def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None, params: dict | None = None):
|
|
"""Send HTTP PUT query without any default parameters"""
|
|
resp = self._s.put(f'{self.api_url}{path}', data=data, headers=headers, params=params, timeout=self.timeout)
|
|
return Leshan.handle_response(resp)
|
|
|
|
def put(self, path: str, data: str | dict, uri_options: str = ''):
|
|
"""Send HTTP PUT query with typical parameters"""
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data)
|
|
return self.put_raw(f'{path}?timeout={self.timeout}&format={self.format}' + uri_options, data=data, headers={'content-type': 'application/json'})
|
|
|
|
def post(self, path: str, data: str | dict | None = None):
|
|
"""Send HTTP POST query"""
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data)
|
|
if data is not None:
|
|
headers={'content-type': 'application/json'}
|
|
uri_options = f'?timeout={self.timeout}&format={self.format}'
|
|
else:
|
|
headers=None
|
|
uri_options = ''
|
|
resp = self._s.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout)
|
|
return Leshan.handle_response(resp)
|
|
|
|
def delete_raw(self, path: str):
|
|
"""Send HTTP DELETE query"""
|
|
resp = self._s.delete(f'{self.api_url}{path}', timeout=self.timeout)
|
|
return Leshan.handle_response(resp)
|
|
|
|
def delete(self, endpoint: str, path: str):
|
|
"""Send LwM2M DELETE command"""
|
|
return self.delete_raw(f'/clients/{endpoint}/{path}')
|
|
|
|
def execute(self, endpoint: str, path: str):
|
|
"""Send LwM2M EXECUTE command"""
|
|
return self.post(f'/clients/{endpoint}/{path}')
|
|
|
|
def write(self, endpoint: str, path: str, value: bool | int | str):
|
|
"""Send LwM2M WRITE command to a single resource or resource instance"""
|
|
if len(path.split('/')) == 3:
|
|
kind = 'singleResource'
|
|
else:
|
|
kind = 'resourceInstance'
|
|
rid = path.split('/')[-1]
|
|
return self.put(f'/clients/{endpoint}/{path}', self._define_resource(rid, value, kind))
|
|
|
|
def write_attributes(self, endpoint: str, path: str, attributes: dict):
|
|
"""Send LwM2M Write-Attributes to given path
|
|
example:
|
|
leshan.write_attributes(endpoint, '1/2/3, {'pmin': 10, 'pmax': 40})
|
|
"""
|
|
return self.put_raw(f'/clients/{endpoint}/{path}/attributes', params=attributes)
|
|
|
|
def remove_attributes(self, endpoint: str, path: str, attributes: list):
|
|
"""Send LwM2M Write-Attributes to given path
|
|
example:
|
|
leshan.remove_attributes(endpoint, '1/2/3, ['pmin', 'pmax'])
|
|
"""
|
|
attrs = '&'.join(attributes)
|
|
return self.put_raw(f'/clients/{endpoint}/{path}/attributes?'+ attrs)
|
|
|
|
def update_obj_instance(self, endpoint: str, path: str, resources: dict):
|
|
"""Update object instance"""
|
|
data = self._define_obj_inst(path, resources)
|
|
return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=false')
|
|
|
|
def replace_obj_instance(self, endpoint: str, path: str, resources: dict):
|
|
"""Replace object instance"""
|
|
data = self._define_obj_inst(path, resources)
|
|
return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=true')
|
|
|
|
def create_obj_instance(self, endpoint: str, path: str, resources: dict):
|
|
"""Send LwM2M CREATE command"""
|
|
data = self._define_obj_inst(path, resources)
|
|
path = '/'.join(path.split('/')[:-1]) # Create call should not have instance ID in path
|
|
return self.post(f'/clients/{endpoint}/{path}', data)
|
|
|
|
@classmethod
|
|
def _type_to_string(cls, value):
|
|
"""
|
|
Convert a Python value to its corresponding Leshan representation.
|
|
|
|
Parameters:
|
|
- value: The value to be converted.
|
|
|
|
Returns:
|
|
- str: The string representation of the value.
|
|
"""
|
|
if isinstance(value, bool):
|
|
return 'boolean'
|
|
if isinstance(value, int):
|
|
return 'integer'
|
|
if isinstance(value, datetime):
|
|
return 'time'
|
|
if isinstance(value, bytes):
|
|
return 'opaque'
|
|
return 'string'
|
|
|
|
@classmethod
|
|
def _convert_type(cls, value):
|
|
"""Wrapper for special types that are not understood by Json"""
|
|
if isinstance(value, datetime):
|
|
return int(value.timestamp())
|
|
elif isinstance(value, bytes):
|
|
return binascii.b2a_hex(value).decode()
|
|
else:
|
|
return value
|
|
|
|
@classmethod
|
|
def _define_obj_inst(cls, path: str, resources: dict):
|
|
"""Define an object instance for Leshan"""
|
|
data = {
|
|
"kind": "instance",
|
|
"id": int(path.split('/')[-1]), # ID is last element of path
|
|
"resources": []
|
|
}
|
|
for key, value in resources.items():
|
|
if isinstance(value, dict):
|
|
kind = 'multiResource'
|
|
else:
|
|
kind = 'singleResource'
|
|
data['resources'].append(cls._define_resource(key, value, kind))
|
|
return data
|
|
|
|
@classmethod
|
|
def _define_resource(cls, rid, value, kind='singleResource'):
|
|
"""Define a resource for Leshan"""
|
|
if kind in ('singleResource', 'resourceInstance'):
|
|
return {
|
|
"id": rid,
|
|
"kind": kind,
|
|
"value": cls._convert_type(value),
|
|
"type": cls._type_to_string(value)
|
|
}
|
|
if kind == 'multiResource':
|
|
return {
|
|
"id": rid,
|
|
"kind": kind,
|
|
"values": value,
|
|
"type": cls._type_to_string(list(value.values())[0])
|
|
}
|
|
raise RuntimeError(f'Unhandled type {kind}')
|
|
|
|
@classmethod
|
|
def _decode_value(cls, val_type: str, value: str):
|
|
"""
|
|
Decode the Leshan representation of a value back to a Python value.
|
|
"""
|
|
if val_type == 'BOOLEAN':
|
|
return bool(value)
|
|
if val_type == 'INTEGER':
|
|
return int(value)
|
|
return value
|
|
|
|
@classmethod
|
|
def _decode_resource(cls, content: dict):
|
|
"""
|
|
Decode the Leshan representation of a resource back to a Python dictionary.
|
|
"""
|
|
if content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance':
|
|
return {content['id']: cls._decode_value(content['type'], content['value'])}
|
|
elif content['kind'] == 'multiResource':
|
|
values = {}
|
|
for riid, value in content['values'].items():
|
|
values.update({int(riid): cls._decode_value(content['type'], value)})
|
|
return {content['id']: values}
|
|
raise RuntimeError(f'Unhandled type {content["kind"]}')
|
|
|
|
@classmethod
|
|
def _decode_obj_inst(cls, content):
|
|
"""
|
|
Decode the Leshan representation of an object instance back to a Python dictionary.
|
|
"""
|
|
resources = {}
|
|
for resource in content['resources']:
|
|
resources.update(cls._decode_resource(resource))
|
|
return {content['id']: resources}
|
|
|
|
@classmethod
|
|
def _decode_obj(cls, content):
|
|
"""
|
|
Decode the Leshan representation of an object back to a Python dictionary.
|
|
"""
|
|
instances = {}
|
|
for instance in content['instances']:
|
|
instances.update(cls._decode_obj_inst(instance))
|
|
return {content['id']: instances}
|
|
|
|
def read(self, endpoint: str, path: str):
|
|
"""Send LwM2M READ command and decode the response to a Python dictionary"""
|
|
resp = self.get(f'/clients/{endpoint}/{path}')
|
|
if not resp['success']:
|
|
return resp
|
|
content = resp['content']
|
|
if content['kind'] == 'obj':
|
|
return self._decode_obj(content)
|
|
elif content['kind'] == 'instance':
|
|
return self._decode_obj_inst(content)
|
|
elif content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance':
|
|
return self._decode_value(content['type'], content['value'])
|
|
elif content['kind'] == 'multiResource':
|
|
return self._decode_resource(content)
|
|
raise RuntimeError(f'Unhandled type {content["kind"]}')
|
|
|
|
@classmethod
|
|
def parse_composite(cls, payload: dict):
|
|
"""Decode the Leshan's response to composite query back to a Python dictionary"""
|
|
data = {}
|
|
if 'status' in payload:
|
|
if payload['status'] != 'CONTENT(205)' or 'content' not in payload:
|
|
raise RuntimeError(f'No content received')
|
|
payload = payload['content']
|
|
for path, content in payload.items():
|
|
if path == "/":
|
|
for obj in content['objects']:
|
|
data.update(cls._decode_obj(obj))
|
|
continue
|
|
keys = [int(key) for key in path.lstrip("/").split('/')]
|
|
if len(keys) == 1:
|
|
data.update(cls._decode_obj(content))
|
|
elif len(keys) == 2:
|
|
if keys[0] not in data:
|
|
data[keys[0]] = {}
|
|
data[keys[0]].update(cls._decode_obj_inst(content))
|
|
elif len(keys) == 3:
|
|
if keys[0] not in data:
|
|
data[keys[0]] = {}
|
|
if keys[1] not in data[keys[0]]:
|
|
data[keys[0]][keys[1]] = {}
|
|
data[keys[0]][keys[1]].update(cls._decode_resource(content))
|
|
elif len(keys) == 4:
|
|
if keys[0] not in data:
|
|
data[keys[0]] = {}
|
|
if keys[1] not in data[keys[0]]:
|
|
data[keys[0]][keys[1]] = {}
|
|
if keys[2] not in data[keys[0]][keys[1]]:
|
|
data[keys[0]][keys[1]][keys[2]] = {}
|
|
data[keys[0]][keys[1]][keys[2]].update(cls._decode_resource(content))
|
|
else:
|
|
raise RuntimeError(f'Unhandled path {path}')
|
|
return data
|
|
|
|
def _composite_params(self, paths: list[str] | None = None):
|
|
"""Common URI parameters for composite query"""
|
|
parameters = {
|
|
'pathformat': self.format,
|
|
'nodeformat': self.format,
|
|
'timeout': self.timeout
|
|
}
|
|
if paths is not None:
|
|
paths = [path if path.startswith('/') else '/' + path for path in paths]
|
|
parameters['paths'] = ','.join(paths)
|
|
|
|
return parameters
|
|
|
|
def composite_read(self, endpoint: str, paths: list[str]):
|
|
"""Send LwM2M Composite-Read command and decode the response to a Python dictionary"""
|
|
parameters = self._composite_params(paths)
|
|
resp = self._s.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout)
|
|
payload = Leshan.handle_response(resp)
|
|
return self.parse_composite(payload)
|
|
|
|
def composite_write(self, endpoint: str, resources: dict):
|
|
"""
|
|
Send LwM2m Composite-Write operation.
|
|
|
|
Targeted resources are defined as a dictionary with the following structure:
|
|
{
|
|
"/1/0/1": 60,
|
|
"/1/0/6": True,
|
|
"/16/0/0": {
|
|
"0": "aa",
|
|
"1": "bb",
|
|
"2": "cc",
|
|
"3": "dd"
|
|
}
|
|
}
|
|
|
|
Objects or object instances cannot be targeted.
|
|
"""
|
|
data = { }
|
|
parameters = self._composite_params()
|
|
for path, value in resources.items():
|
|
path = path if path.startswith('/') else '/' + path
|
|
level = len(path.split('/')) - 1
|
|
rid = int(path.split('/')[-1])
|
|
if level == 3:
|
|
if isinstance(value, dict):
|
|
value = self._define_resource(rid, value, kind='multiResource')
|
|
else:
|
|
value = self._define_resource(rid, value)
|
|
elif level == 4:
|
|
value = self._define_resource(rid, value, kind='resourceInstance')
|
|
else:
|
|
raise RuntimeError(f'Unhandled path {path}')
|
|
data[path] = value
|
|
|
|
resp = self._s.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout)
|
|
return Leshan.handle_response(resp)
|
|
|
|
def discover(self, endpoint: str, path: str):
|
|
resp = self.handle_response(self._s.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout))
|
|
data = {}
|
|
for obj in resp['objectLinks']:
|
|
data[obj['url']] = obj['attributes']
|
|
return data
|
|
|
|
def create_psk_device(self, endpoint: str, passwd: str):
|
|
psk = binascii.b2a_hex(passwd.encode()).decode()
|
|
self.put('/security/clients/', f'{{"endpoint":"{endpoint}","tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}} }} }}')
|
|
|
|
def delete_device(self, endpoint: str):
|
|
self.delete_raw(f'/security/clients/{endpoint}')
|
|
|
|
def create_bs_device(self, endpoint: str, server_uri: str, bs_passwd: str, passwd: str):
|
|
psk = binascii.b2a_hex(bs_passwd.encode()).decode()
|
|
data = f'{{"tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}}}},"endpoint":"{endpoint}"}}'
|
|
self.put('/security/clients/', data)
|
|
ep = str([ord(n) for n in endpoint])
|
|
key = str([ord(n) for n in passwd])
|
|
content = '{"servers":{"0":{"binding":"U","defaultMinPeriod":1,"lifetime":86400,"notifIfDisabled":false,"shortId":1}},"security":{"1":{"bootstrapServer":false,"clientOldOffTime":1,"publicKeyOrId":' + ep + ',"secretKey":' + key + ',"securityMode":"PSK","serverId":1,"serverSmsNumber":"","smsBindingKeyParam":[],"smsBindingKeySecret":[],"smsSecurityMode":"NO_SEC","uri":"'+server_uri+'"}},"oscore":{},"toDelete":["/0","/1"]}'
|
|
self.post(f'/bootstrap/{endpoint}', content)
|
|
|
|
def delete_bs_device(self, endpoint: str):
|
|
self.delete_raw(f'/security/clients/{endpoint}')
|
|
self.delete_raw(f'/bootstrap/{endpoint}')
|
|
|
|
def observe(self, endpoint: str, path: str):
|
|
return self.post(f'/clients/{endpoint}/{path}/observe', data="")
|
|
|
|
def cancel_observe(self, endpoint: str, path: str):
|
|
return self.delete_raw(f'/clients/{endpoint}/{path}/observe?active')
|
|
|
|
def passive_cancel_observe(self, endpoint: str, path: str):
|
|
return self.delete_raw(f'/clients/{endpoint}/{path}/observe')
|
|
|
|
def composite_observe(self, endpoint: str, paths: list[str]):
|
|
parameters = self._composite_params(paths)
|
|
resp = self._s.post(f'{self.api_url}/clients/{endpoint}/composite/observe', params=parameters, timeout=self.timeout)
|
|
payload = Leshan.handle_response(resp)
|
|
return self.parse_composite(payload)
|
|
|
|
def cancel_composite_observe(self, endpoint: str, paths: list[str]):
|
|
paths = [path if path.startswith('/') else '/' + path for path in paths]
|
|
return self.delete_raw(f'/clients/{endpoint}/composite/observe?paths=' + ','.join(paths) + '&active')
|
|
|
|
def passive_cancel_composite_observe(self, endpoint: str, paths: list[str]):
|
|
paths = [path if path.startswith('/') else '/' + path for path in paths]
|
|
return self.delete_raw(f'/clients/{endpoint}/composite/observe?paths=' + ','.join(paths))
|
|
|
|
@contextmanager
|
|
def get_event_stream(self, endpoint: str, timeout: int = None):
|
|
"""
|
|
Get stream of events regarding the given endpoint.
|
|
|
|
Events are notifications, updates and sends.
|
|
|
|
The event stream must be closed after the use, so this must be used in 'with' statement like this:
|
|
with leshan.get_event_stream('native_posix') as events:
|
|
data = events.next_event('SEND')
|
|
|
|
If timeout happens, the event streams returns None.
|
|
"""
|
|
if timeout is None:
|
|
timeout = self.timeout
|
|
r = requests.get(f'{self.api_url}/event?{endpoint}', stream=True, headers={'Accept': 'text/event-stream'}, timeout=timeout)
|
|
if r.encoding is None:
|
|
r.encoding = 'utf-8'
|
|
try:
|
|
yield LeshanEventsIterator(r, timeout)
|
|
finally:
|
|
r.close()
|
|
|
|
class LeshanEventsIterator:
|
|
"""Iterator for Leshan event stream"""
|
|
def __init__(self, req: requests.Response, timeout: int):
|
|
"""Initialize the iterator in line mode"""
|
|
self._it = req.iter_lines(chunk_size=1, decode_unicode=True)
|
|
self._timeout = timeout
|
|
|
|
def next_event(self, event: str):
|
|
"""
|
|
Finds the next occurrence of a specific event in the stream.
|
|
|
|
If timeout happens, the returns None.
|
|
"""
|
|
timeout = time.time() + self._timeout
|
|
try:
|
|
for line in self._it:
|
|
if line == f'event: {event}':
|
|
for line in self._it:
|
|
if not line.startswith('data: '):
|
|
continue
|
|
data = json.loads(line.lstrip('data: '))
|
|
if event == 'SEND' or (event == 'NOTIFICATION' and data['kind'] == 'composite'):
|
|
return Leshan.parse_composite(data['val'])
|
|
if event == 'NOTIFICATION':
|
|
d = {data['res']: data['val']}
|
|
return Leshan.parse_composite(d)
|
|
return data
|
|
if time.time() > timeout:
|
|
return None
|
|
except requests.exceptions.Timeout:
|
|
return None
|