From 944ad183dc2402c6ee74e9af6f3e5f0a1ea01a55 Mon Sep 17 00:00:00 2001 From: Seppo Takalo Date: Thu, 26 Oct 2023 13:13:42 +0300 Subject: [PATCH] tests: lwm2m: Implement event stream support for Leshan Implement support for reading stream of events from Leshan. This allows testing the LwM2M SEND/Notify/Update operations and reading content of those. Also convert the whole Leshan class to use requests.session() so it pools up connections and uses keep-alive. Signed-off-by: Seppo Takalo --- tests/net/lib/lwm2m/interop/pytest/leshan.py | 160 ++++++++++++------- 1 file changed, 106 insertions(+), 54 deletions(-) diff --git a/tests/net/lib/lwm2m/interop/pytest/leshan.py b/tests/net/lib/lwm2m/interop/pytest/leshan.py index ecd14b2481c..307a390bf9b 100644 --- a/tests/net/lib/lwm2m/interop/pytest/leshan.py +++ b/tests/net/lib/lwm2m/interop/pytest/leshan.py @@ -8,6 +8,8 @@ import json import binascii import requests from datetime import datetime +import time +from contextlib import contextmanager class Leshan: def __init__(self, url: str): @@ -15,6 +17,7 @@ class Leshan: self.timeout = 10 #self.format = 'TLV' self.format = "SENML_CBOR" + self._s = requests.Session() try: resp = self.get('/security/clients') if not isinstance(resp, list): @@ -41,19 +44,18 @@ class Leshan: if len(resp.text): obj = json.loads(resp.text) return obj - else: - return None + return None def get(self, path: str): """Send HTTP GET query""" params = {'timeout': self.timeout} if self.format is not None: params['format'] = self.format - resp = requests.get(f'{self.api_url}{path}', params=params, timeout=self.timeout) + resp = self._s.get(f'{self.api_url}{path}', params=params, timeout=self.timeout) return Leshan.handle_response(resp) def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None): - resp = requests.put(f'{self.api_url}{path}', data=data, headers=headers, timeout=self.timeout) + resp = self._s.put(f'{self.api_url}{path}', data=data, headers=headers, timeout=self.timeout) return Leshan.handle_response(resp) def put(self, path: str, data: str | dict, uri_options: str = ''): @@ -70,11 +72,11 @@ class Leshan: else: headers=None uri_options = '' - resp = requests.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout) + resp = self._s.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout) return Leshan.handle_response(resp) def delete(self, path: str): - resp = requests.delete(f'{self.api_url}{path}', timeout=self.timeout) + resp = self._s.delete(f'{self.api_url}{path}', timeout=self.timeout) return Leshan.handle_response(resp) def execute(self, endpoint: str, path: str): @@ -127,7 +129,8 @@ class Leshan: else: return value - def _define_obj_inst(self, path: str, resources: dict): + @classmethod + def _define_obj_inst(cls, path: str, resources: dict): data = { "kind": "instance", "id": int(path.split('/')[-1]), # ID is last element of path @@ -138,65 +141,70 @@ class Leshan: kind = 'multiResource' else: kind = 'singleResource' - data['resources'].append(self._define_resource(key, value, kind)) + data['resources'].append(cls._define_resource(key, value, kind)) return data - def _define_resource(self, rid, value, kind='singleResource'): + @classmethod + def _define_resource(cls, rid, value, kind='singleResource'): if kind in ('singleResource', 'resourceInstance'): return { "id": rid, "kind": kind, - "value": self._convert_type(value), - "type": self._type_to_string(value) + "value": cls._convert_type(value), + "type": cls._type_to_string(value) } if kind == 'multiResource': return { "id": rid, "kind": kind, "values": value, - "type": self._type_to_string(list(value.values())[0]) + "type": cls._type_to_string(list(value.values())[0]) } raise RuntimeError(f'Unhandled type {kind}') - def _decode_value(self, type, value): + @classmethod + def _decode_value(cls, val_type: str, value: str): """ Decode the Leshan representation of a value back to a Python value. """ - if type == 'BOOLEAN': + if val_type == 'BOOLEAN': return bool(value) - if type == 'INTEGER': + if val_type == 'INTEGER': return int(value) return value - def _decode_resource(self, content): + @classmethod + def _decode_resource(cls, content: dict): """ Decode the Leshan representation of a resource back to a Python dictionary. """ if content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance': - return {content['id']: self._decode_value(content['type'], content['value'])} + return {content['id']: cls._decode_value(content['type'], content['value'])} elif content['kind'] == 'multiResource': values = {} for riid, value in content['values'].items(): - values.update({int(riid): self._decode_value(content['type'], value)}) + values.update({int(riid): cls._decode_value(content['type'], value)}) return {content['id']: values} raise RuntimeError(f'Unhandled type {content["kind"]}') - def _decode_obj_inst(self, content): + @classmethod + def _decode_obj_inst(cls, content): """ Decode the Leshan representation of an object instance back to a Python dictionary. """ resources = {} for resource in content['resources']: - resources.update(self._decode_resource(resource)) + resources.update(cls._decode_resource(resource)) return {content['id']: resources} - def _decode_obj(self, content): + @classmethod + def _decode_obj(cls, content): """ Decode the Leshan representation of an object back to a Python dictionary. """ instances = {} for instance in content['instances']: - instances.update(self._decode_obj_inst(instance)) + instances.update(cls._decode_obj_inst(instance)) return {content['id']: instances} def read(self, endpoint: str, path: str): @@ -214,6 +222,35 @@ class Leshan: return self._decode_resource(content) raise RuntimeError(f'Unhandled type {content["kind"]}') + @classmethod + def parse_composite(cls, content: dict): + data = {} + for path, content in content.items(): + keys = [int(key) for key in path.lstrip("/").split('/')] + if len(keys) == 1: + data.update(cls._decode_obj(content)) + elif len(keys) == 2: + if keys[0] not in data: + data[keys[0]] = {} + data[keys[0]].update(cls._decode_obj_inst(content)) + elif len(keys) == 3: + if keys[0] not in data: + data[keys[0]] = {} + if keys[1] not in data[keys[0]]: + data[keys[0]][keys[1]] = {} + data[keys[0]][keys[1]].update(cls._decode_resource(content)) + elif len(keys) == 4: + if keys[0] not in data: + data[keys[0]] = {} + if keys[1] not in data[keys[0]]: + data[keys[0]][keys[1]] = {} + if keys[2] not in data[keys[0]][keys[1]]: + data[keys[0]][keys[1]][keys[2]] = {} + data[keys[0]][keys[1]][keys[2]].update(cls._decode_resource(content)) + else: + raise RuntimeError(f'Unhandled path {path}') + return data + def composite_read(self, endpoint: str, paths: list[str]): paths = [path if path.startswith('/') else '/' + path for path in paths] parameters = { @@ -222,39 +259,11 @@ class Leshan: 'timeout': self.timeout, 'paths': ','.join(paths) } - resp = requests.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout) + resp = self._s.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout) payload = Leshan.handle_response(resp) if not payload['status'] == 'CONTENT(205)': raise RuntimeError(f'No content received') - data = {} - for path, content in payload['content'].items(): - keys = [int(key) for key in path.lstrip("/").split('/')] - if len(keys) == 1: - data.update(self._decode_obj(content)) - elif len(keys) == 2: - if keys[0] not in data: - data[keys[0]] = {} - data[keys[0]].update(self._decode_obj_inst(content)) - elif len(keys) == 3: - if keys[0] not in data: - data[keys[0]] = {} - if keys[1] not in data[keys[0]]: - data[keys[0]][keys[1]] = {} - data[keys[0]][keys[1]].update(self._decode_resource(content)) - elif len(keys) == 4: - if keys[0] not in data: - data[keys[0]] = {} - if keys[1] not in data[keys[0]]: - data[keys[0]][keys[1]] = {} - if keys[2] not in data[keys[0]][keys[1]]: - data[keys[0]][keys[1]][keys[2]] = {} - data[keys[0]][keys[1]][keys[2]].update(self._decode_resource(content)) - else: - raise RuntimeError(f'Unhandled path {path}') - print(f'Requested paths: {paths}') - print(data) - return data - + return self.parse_composite(payload['content']) def composite_write(self, endpoint: str, resources: dict): """ @@ -295,11 +304,11 @@ class Leshan: raise RuntimeError(f'Unhandled path {path}') data[path] = value - resp = requests.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout) + resp = self._s.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout) return Leshan.handle_response(resp) def discover(self, endpoint: str, path: str): - resp = self.handle_response(requests.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout)) + resp = self.handle_response(self._s.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout)) data = {} for obj in resp['objectLinks']: data[obj['url']] = obj['attributes'] @@ -324,3 +333,46 @@ class Leshan: def delete_bs_device(self, endpoint: str): self.delete(f'/security/clients/{endpoint}') self.delete(f'/bootstrap/{endpoint}') + + @contextmanager + def get_event_stream(self, endpoint: str): + """ + Get stream of events regarding the given endpoint. + + Events are notifications, updates and sends. + + The event stream must be closed after the use, so this must be used in 'with' statement like this: + with leshan.get_event_stream('native_posix') as events: + data = events.next_event('SEND') + + If timeout happens, the event streams returns None. + """ + r = self._s.get(f'{self.api_url}/event?{endpoint}', stream=True, headers={'Accept': 'text/event-stream'}, timeout=self.timeout) + if r.encoding is None: + r.encoding = 'utf-8' + try: + yield LeshanEventsIterator(r, self.timeout) + finally: + r.close() + +class LeshanEventsIterator: + def __init__(self, req: requests.Response, timeout: int): + self._it = req.iter_lines(chunk_size=1, decode_unicode=True) + self._timeout = timeout + + def next_event(self, event: str): + timeout = time.time() + self._timeout + try: + for line in self._it: + if line == f'event: {event}': + for line in self._it: + if not line.startswith('data: '): + continue + data = json.loads(line.lstrip('data: ')) + if event == 'SEND': + return Leshan.parse_composite(data['val']) + return data + if time.time() > timeout: + return None + except requests.exceptions.Timeout: + return None