tests: lwm2m: Implement event stream support for Leshan

Implement support for reading stream of events from Leshan.
This allows testing the LwM2M SEND/Notify/Update operations
and reading content of those.

Also convert the whole Leshan class to use requests.session() so
it pools up connections and uses keep-alive.

Signed-off-by: Seppo Takalo <seppo.takalo@nordicsemi.no>
This commit is contained in:
Seppo Takalo 2023-10-26 13:13:42 +03:00 committed by Carles Cufí
commit 944ad183dc

View file

@ -8,6 +8,8 @@ import json
import binascii
import requests
from datetime import datetime
import time
from contextlib import contextmanager
class Leshan:
def __init__(self, url: str):
@ -15,6 +17,7 @@ class Leshan:
self.timeout = 10
#self.format = 'TLV'
self.format = "SENML_CBOR"
self._s = requests.Session()
try:
resp = self.get('/security/clients')
if not isinstance(resp, list):
@ -41,19 +44,18 @@ class Leshan:
if len(resp.text):
obj = json.loads(resp.text)
return obj
else:
return None
return None
def get(self, path: str):
"""Send HTTP GET query"""
params = {'timeout': self.timeout}
if self.format is not None:
params['format'] = self.format
resp = requests.get(f'{self.api_url}{path}', params=params, timeout=self.timeout)
resp = self._s.get(f'{self.api_url}{path}', params=params, timeout=self.timeout)
return Leshan.handle_response(resp)
def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None):
resp = requests.put(f'{self.api_url}{path}', data=data, headers=headers, timeout=self.timeout)
resp = self._s.put(f'{self.api_url}{path}', data=data, headers=headers, timeout=self.timeout)
return Leshan.handle_response(resp)
def put(self, path: str, data: str | dict, uri_options: str = ''):
@ -70,11 +72,11 @@ class Leshan:
else:
headers=None
uri_options = ''
resp = requests.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout)
resp = self._s.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout)
return Leshan.handle_response(resp)
def delete(self, path: str):
resp = requests.delete(f'{self.api_url}{path}', timeout=self.timeout)
resp = self._s.delete(f'{self.api_url}{path}', timeout=self.timeout)
return Leshan.handle_response(resp)
def execute(self, endpoint: str, path: str):
@ -127,7 +129,8 @@ class Leshan:
else:
return value
def _define_obj_inst(self, path: str, resources: dict):
@classmethod
def _define_obj_inst(cls, path: str, resources: dict):
data = {
"kind": "instance",
"id": int(path.split('/')[-1]), # ID is last element of path
@ -138,65 +141,70 @@ class Leshan:
kind = 'multiResource'
else:
kind = 'singleResource'
data['resources'].append(self._define_resource(key, value, kind))
data['resources'].append(cls._define_resource(key, value, kind))
return data
def _define_resource(self, rid, value, kind='singleResource'):
@classmethod
def _define_resource(cls, rid, value, kind='singleResource'):
if kind in ('singleResource', 'resourceInstance'):
return {
"id": rid,
"kind": kind,
"value": self._convert_type(value),
"type": self._type_to_string(value)
"value": cls._convert_type(value),
"type": cls._type_to_string(value)
}
if kind == 'multiResource':
return {
"id": rid,
"kind": kind,
"values": value,
"type": self._type_to_string(list(value.values())[0])
"type": cls._type_to_string(list(value.values())[0])
}
raise RuntimeError(f'Unhandled type {kind}')
def _decode_value(self, type, value):
@classmethod
def _decode_value(cls, val_type: str, value: str):
"""
Decode the Leshan representation of a value back to a Python value.
"""
if type == 'BOOLEAN':
if val_type == 'BOOLEAN':
return bool(value)
if type == 'INTEGER':
if val_type == 'INTEGER':
return int(value)
return value
def _decode_resource(self, content):
@classmethod
def _decode_resource(cls, content: dict):
"""
Decode the Leshan representation of a resource back to a Python dictionary.
"""
if content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance':
return {content['id']: self._decode_value(content['type'], content['value'])}
return {content['id']: cls._decode_value(content['type'], content['value'])}
elif content['kind'] == 'multiResource':
values = {}
for riid, value in content['values'].items():
values.update({int(riid): self._decode_value(content['type'], value)})
values.update({int(riid): cls._decode_value(content['type'], value)})
return {content['id']: values}
raise RuntimeError(f'Unhandled type {content["kind"]}')
def _decode_obj_inst(self, content):
@classmethod
def _decode_obj_inst(cls, content):
"""
Decode the Leshan representation of an object instance back to a Python dictionary.
"""
resources = {}
for resource in content['resources']:
resources.update(self._decode_resource(resource))
resources.update(cls._decode_resource(resource))
return {content['id']: resources}
def _decode_obj(self, content):
@classmethod
def _decode_obj(cls, content):
"""
Decode the Leshan representation of an object back to a Python dictionary.
"""
instances = {}
for instance in content['instances']:
instances.update(self._decode_obj_inst(instance))
instances.update(cls._decode_obj_inst(instance))
return {content['id']: instances}
def read(self, endpoint: str, path: str):
@ -214,6 +222,35 @@ class Leshan:
return self._decode_resource(content)
raise RuntimeError(f'Unhandled type {content["kind"]}')
@classmethod
def parse_composite(cls, content: dict):
data = {}
for path, content in content.items():
keys = [int(key) for key in path.lstrip("/").split('/')]
if len(keys) == 1:
data.update(cls._decode_obj(content))
elif len(keys) == 2:
if keys[0] not in data:
data[keys[0]] = {}
data[keys[0]].update(cls._decode_obj_inst(content))
elif len(keys) == 3:
if keys[0] not in data:
data[keys[0]] = {}
if keys[1] not in data[keys[0]]:
data[keys[0]][keys[1]] = {}
data[keys[0]][keys[1]].update(cls._decode_resource(content))
elif len(keys) == 4:
if keys[0] not in data:
data[keys[0]] = {}
if keys[1] not in data[keys[0]]:
data[keys[0]][keys[1]] = {}
if keys[2] not in data[keys[0]][keys[1]]:
data[keys[0]][keys[1]][keys[2]] = {}
data[keys[0]][keys[1]][keys[2]].update(cls._decode_resource(content))
else:
raise RuntimeError(f'Unhandled path {path}')
return data
def composite_read(self, endpoint: str, paths: list[str]):
paths = [path if path.startswith('/') else '/' + path for path in paths]
parameters = {
@ -222,39 +259,11 @@ class Leshan:
'timeout': self.timeout,
'paths': ','.join(paths)
}
resp = requests.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout)
resp = self._s.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout)
payload = Leshan.handle_response(resp)
if not payload['status'] == 'CONTENT(205)':
raise RuntimeError(f'No content received')
data = {}
for path, content in payload['content'].items():
keys = [int(key) for key in path.lstrip("/").split('/')]
if len(keys) == 1:
data.update(self._decode_obj(content))
elif len(keys) == 2:
if keys[0] not in data:
data[keys[0]] = {}
data[keys[0]].update(self._decode_obj_inst(content))
elif len(keys) == 3:
if keys[0] not in data:
data[keys[0]] = {}
if keys[1] not in data[keys[0]]:
data[keys[0]][keys[1]] = {}
data[keys[0]][keys[1]].update(self._decode_resource(content))
elif len(keys) == 4:
if keys[0] not in data:
data[keys[0]] = {}
if keys[1] not in data[keys[0]]:
data[keys[0]][keys[1]] = {}
if keys[2] not in data[keys[0]][keys[1]]:
data[keys[0]][keys[1]][keys[2]] = {}
data[keys[0]][keys[1]][keys[2]].update(self._decode_resource(content))
else:
raise RuntimeError(f'Unhandled path {path}')
print(f'Requested paths: {paths}')
print(data)
return data
return self.parse_composite(payload['content'])
def composite_write(self, endpoint: str, resources: dict):
"""
@ -295,11 +304,11 @@ class Leshan:
raise RuntimeError(f'Unhandled path {path}')
data[path] = value
resp = requests.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout)
resp = self._s.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout)
return Leshan.handle_response(resp)
def discover(self, endpoint: str, path: str):
resp = self.handle_response(requests.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout))
resp = self.handle_response(self._s.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout))
data = {}
for obj in resp['objectLinks']:
data[obj['url']] = obj['attributes']
@ -324,3 +333,46 @@ class Leshan:
def delete_bs_device(self, endpoint: str):
self.delete(f'/security/clients/{endpoint}')
self.delete(f'/bootstrap/{endpoint}')
@contextmanager
def get_event_stream(self, endpoint: str):
"""
Get stream of events regarding the given endpoint.
Events are notifications, updates and sends.
The event stream must be closed after the use, so this must be used in 'with' statement like this:
with leshan.get_event_stream('native_posix') as events:
data = events.next_event('SEND')
If timeout happens, the event streams returns None.
"""
r = self._s.get(f'{self.api_url}/event?{endpoint}', stream=True, headers={'Accept': 'text/event-stream'}, timeout=self.timeout)
if r.encoding is None:
r.encoding = 'utf-8'
try:
yield LeshanEventsIterator(r, self.timeout)
finally:
r.close()
class LeshanEventsIterator:
def __init__(self, req: requests.Response, timeout: int):
self._it = req.iter_lines(chunk_size=1, decode_unicode=True)
self._timeout = timeout
def next_event(self, event: str):
timeout = time.time() + self._timeout
try:
for line in self._it:
if line == f'event: {event}':
for line in self._it:
if not line.startswith('data: '):
continue
data = json.loads(line.lstrip('data: '))
if event == 'SEND':
return Leshan.parse_composite(data['val'])
return data
if time.time() > timeout:
return None
except requests.exceptions.Timeout:
return None