net: lwm2m: Add functional tests for LwM2M against Leshan

Use the new Pytest integration to run testcases against Leshan.

Test with:
twister -T tests/net/lib/lwm2m/functional -p native_posix -vv

This requires Leshan running in localhost using following setup:
tcp/8080 Leshan web interface and REST API
tcp/8081 Leshan bootstrap server REST API
udp/5683 Leshan non-secure CoAP
udp/5684 Leshan DTLS CoAP
udp/5783 non-secure Bootstrap CoAP
udp/5684 DTLS Bootstrap CoAP

Leshan and Boostrap server must be reachable from native_posix
run using IP address 192.0.2.2 as in most of the examples.

Tests are written from test spec;
OMA Enabler Test Specification (Interoperability) for
Lightweight M2M

Following tests are implemented in this commit:
* LightweightM2M-1.1-int-0 – Client Initiated Bootstrap
* LightweightM2M-1.1-int-1 – Client Initiated Bootstrap Full (PSK)
* LightweightM2M-1.1-int-101 – Initial Registration
* LightweightM2M-1.1-int-102 – Registration Update
* LightweightM2M-1.1-int-104 – Registration Update Trigge
* LightweightM2M-1.1-int-105 - Discarded Register Update
* LightweightM2M-1.1-int-107 – Extending the lifetime of a registration
* LightweightM2M-1.1-int-108 – Turn on Queue Mode
* LightweightM2M-1.1-int-109 – Behavior in Queue Mode
* LightweightM2M-1.1-int-201 – Querying basic information in Plain Text
* LightweightM2M-1.1-int-203 – Querying basic information in TLV format
* LightweightM2M-1.1-int-204 – Querying basic information in JSON format
* LightweightM2M-1.1-int-205 – Setting basic information in Plain Text
* LightweightM2M-1.1-int-211 – Querying basic information in CBOR format
* LightweightM2M-1.1-int-212 – Setting basic information in CBOR format
* LightweightM2M-1.1-int-215 – Setting basic information in TLV format
* LightweightM2M-1.1-int-220 – Setting basic information in JSON format
* LightweightM2M-1.1-int-221 – Attempt to perform operations on Security
* LightweightM2M-1.1-int-401 – UDP Channel Security – PSK Mode

Signed-off-by: Seppo Takalo <seppo.takalo@nordicsemi.no>
This commit is contained in:
Seppo Takalo 2023-06-01 12:00:44 +03:00 committed by Carles Cufí
commit 7b4f22edd1
10 changed files with 966 additions and 0 deletions

View file

@ -0,0 +1,10 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(lwm2m_interop_tests)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})
include(${ZEPHYR_BASE}/samples/net/common/common.cmake)

View file

@ -0,0 +1,103 @@
# LwM2M Interoperability tests using Leshan demo server
This directory contains list of testcases that use
the Twister's Pytest integration to run testcases against Leshan demo server.
These tests use emulated hardware (native_posix).
These tests require setup that is not done in Twister run, so follow this documentation to set
up the test environment.
## Network setup
As with typical network samples, host machine uses IP address `192.0.2.2` and the emulated device
running Zephyr is using address `192.0.2.1`.
Follow [Networking with the host system](https://docs.zephyrproject.org/latest/connectivity/networking/networking_with_host.html#networking-with-the-host-system)
from Zephyr's documentation how to set it up, or follow [Create NAT and routing for Zephyr native network on Linux](https://github.com/zephyrproject-rtos/net-tools/blob/master/README%20NAT.md).
### Leshan server setup
* Leshan server must be reachable from the device using IP address `192.0.2.2`.
Configure the port forwarding, if you use Docker to run Leshan.
* Leshan demo server REST API must be reachable from localhost.
* tcp/8080 Leshan web interface and REST API
* tcp/8081 Leshan bootstrap server REST API
* udp/5683 Leshan non-secure CoAP
* udp/5684 Leshan DTLS CoAP
* udp/5783 non-secure Bootstrap CoAP
* udp/5684 DTLS Bootstrap CoAP
* Download Leshan JAR file from https://ci.eclipse.org/leshan/job/leshan/lastSuccessfulBuild/artifact/leshan-server-demo.jar
* Download Leshan Bootstrap server JAR file from https://ci.eclipse.org/leshan/job/leshan/lastSuccessfulBuild/artifact/leshan-bsserver-demo.jar
Both server can be started like this:
```
java -jar ./leshan-server-demo.jar -wp 8080 -vv
java -jar ./leshan-bsserver-demo.jar -lp=5783 -slp=5784 -wp 8081
```
Or create a helper script that does everything, including download:
```
#!/bin/sh -eu
# Download Leshan if needed
if [ ! -f leshan-server-demo.jar ]; then
wget https://ci.eclipse.org/leshan/job/leshan/lastSuccessfulBuild/artifact/leshan-server-demo.jar
fi
if [ ! -f leshan-bsserver-demo.jar ]; then
wget 'https://ci.eclipse.org/leshan/job/leshan/lastSuccessfulBuild/artifact/leshan-bsserver-demo.jar'
fi
mkdir -p log
start-stop-daemon --make-pidfile --pidfile log/leshan.pid --chdir $(pwd) --background --start \
--startas /bin/bash -- -c "exec java -jar ./leshan-server-demo.jar -wp 8080 -vv --models-folder objects >log/leshan.log 2>&1"
start-stop-daemon --make-pidfile --pidfile log/leshan_bs.pid --chdir $(pwd) --background --start \
--startas /bin/bash -- -c "exec java -jar ./leshan-bsserver-demo.jar -lp=5783 -slp=5784 -wp 8081 -vv >log/leshan_bs.log 2>&1"
```
Then stopping would require similar script:
```
#!/bin/sh -eu
start-stop-daemon --remove-pidfile --pidfile log/leshan.pid --stop
start-stop-daemon --remove-pidfile --pidfile log/leshan_bs.pid --stop
```
## Python package requirements
These tests require extra package that is not installed when you follow the Zephyr's setup.
Install with `pip install CoAPthon3`
## Running tests
```
twister -p native_posix -vv --enable-slow -T tests/net/lib/lwm2m/interop
``````
## Test specification
Tests are written from test spec;
[OMA Enabler Test Specification (Interoperability) for Lightweight M2M](https://www.openmobilealliance.org/release/LightweightM2M/ETS/OMA-ETS-LightweightM2M-V1_1-20190912-D.pdf)
Following tests are implemented:
* LightweightM2M-1.1-int-0 Client Initiated Bootstrap
* LightweightM2M-1.1-int-1 Client Initiated Bootstrap Full (PSK)
* LightweightM2M-1.1-int-101 Initial Registration
* LightweightM2M-1.1-int-102 Registration Update
* LightweightM2M-1.1-int-104 Registration Update Trigge
* LightweightM2M-1.1-int-105 - Discarded Register Update
* LightweightM2M-1.1-int-107 Extending the lifetime of a registration
* LightweightM2M-1.1-int-108 Turn on Queue Mode
* LightweightM2M-1.1-int-109 Behavior in Queue Mode
* LightweightM2M-1.1-int-201 Querying basic information in Plain Text
* LightweightM2M-1.1-int-203 Querying basic information in TLV format
* LightweightM2M-1.1-int-204 Querying basic information in JSON format
* LightweightM2M-1.1-int-205 Setting basic information in Plain Text
* LightweightM2M-1.1-int-211 Querying basic information in CBOR format
* LightweightM2M-1.1-int-212 Setting basic information in CBOR format
* LightweightM2M-1.1-int-215 Setting basic information in TLV format
* LightweightM2M-1.1-int-220 Setting basic information in JSON format
* LightweightM2M-1.1-int-221 Attempt to perform operations on Security
* LightweightM2M-1.1-int-401 UDP Channel Security PSK Mode

View file

@ -0,0 +1,7 @@
CONFIG_DNS_RESOLVER=y
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="192.0.2.2"
CONFIG_LWM2M_DNS_SUPPORT=y
CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2"
CONFIG_NATIVE_POSIX_SLOWDOWN_TO_REAL_TIME=y
CONFIG_NATIVE_UART_0_ON_STDINOUT=y

View file

@ -0,0 +1,20 @@
CONFIG_NET_L2_ETHERNET=y
CONFIG_ETH_DRIVER=y
CONFIG_ETH_STELLARIS=y
CONFIG_NET_QEMU_ETHERNET=y
# RAM/ROM tuning
CONFIG_IDLE_STACK_SIZE=128
CONFIG_MBEDTLS_HEAP_SIZE=7000
CONFIG_ISR_STACK_SIZE=512
CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=1024
CONFIG_LWM2M_ENGINE_STACK_SIZE=2000
CONFIG_LWM2M_LOG_LEVEL_INF=y
CONFIG_LWM2M_ENGINE_MAX_MESSAGES=3
CONFIG_LWM2M_ENGINE_VALIDATION_BUFFER_SIZE=0
CONFIG_LWM2M_ENGINE_MAX_OBSERVER=5
CONFIG_LWM2M_SECURITY_DTLS_TLS_CIPHERSUITE_MAX=3
CONFIG_LWM2M_DEVICE_PWRSRC_MAX=2
CONFIG_LWM2M_DEVICE_ERROR_CODE_MAX=5
CONFIG_LWM2M_DEVICE_EXT_DEV_INFO_MAX=2
CONFIG_LWM2M_NUM_ATTR=10

View file

@ -0,0 +1,90 @@
CONFIG_NETWORKING=y
CONFIG_LOG=y
CONFIG_LWM2M_LOG_LEVEL_DBG=y
CONFIG_TEST_RANDOM_GENERATOR=y
CONFIG_NET_IPV6=y
CONFIG_NET_IF_UNICAST_IPV6_ADDR_COUNT=3
CONFIG_NET_IF_MCAST_IPV6_ADDR_COUNT=2
CONFIG_NET_IPV4=y
CONFIG_NET_DHCPV4=n
CONFIG_NET_IF_UNICAST_IPV4_ADDR_COUNT=3
CONFIG_NET_IF_MCAST_IPV4_ADDR_COUNT=2
CONFIG_PRINTK=y
CONFIG_NET_PKT_RX_COUNT=10
CONFIG_NET_PKT_TX_COUNT=10
CONFIG_NET_BUF_RX_COUNT=10
CONFIG_NET_BUF_TX_COUNT=10
CONFIG_NET_MAX_CONTEXTS=5
CONFIG_NET_CONFIG_MY_IPV6_ADDR="2001:db8::1"
CONFIG_NET_CONFIG_PEER_IPV6_ADDR="2001:db8::2"
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2"
CONFIG_NET_LOG=y
CONFIG_NET_CONFIG_NEED_IPV6=y
CONFIG_NET_CONFIG_NEED_IPV4=y
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_LWM2M=y
CONFIG_LWM2M_COAP_BLOCK_SIZE=512
CONFIG_LWM2M_IPSO_SUPPORT=y
CONFIG_LWM2M_SHELL=y
CONFIG_LWM2M_ACCESS_CONTROL_ENABLE=n
#Enable Portfolio object
CONFIG_LWM2M_PORTFOLIO_OBJ_SUPPORT=y
#LwM2M v1.1 configure
CONFIG_LWM2M_VERSION_1_1=y
CONFIG_LWM2M_DTLS_SUPPORT=y
CONFIG_LWM2M_RD_CLIENT_SUPPORT_BOOTSTRAP=y
#Enable SenML JSON content format
CONFIG_JSON_LIBRARY=y
CONFIG_BASE64=y
CONFIG_LWM2M_RW_SENML_JSON_SUPPORT=y
#Enable SenML CBOR content format
CONFIG_LWM2M_RW_SENML_CBOR_SUPPORT=y
CONFIG_LWM2M_RW_SENML_CBOR_RECORDS=60
CONFIG_ZCBOR_CANONICAL=y
#Enable legacy content formats
CONFIG_LWM2M_RW_JSON_SUPPORT=y
CONFIG_LWM2M_RW_OMA_TLV_SUPPORT=y
# Longer endpoint name might be returned in a registration reply
CONFIG_COAP_EXTENDED_OPTIONS_LEN=y
CONFIG_COAP_EXTENDED_OPTIONS_LEN_VALUE=40
# Use QUEUE mode by default
CONFIG_LWM2M_QUEUE_MODE_ENABLED=y
CONFIG_LWM2M_QUEUE_MODE_UPTIME=20
# LwM2M configuration as OMA-ETS-LightweightM2M_INT-V1_1-20190912-D Configuration 3
CONFIG_LWM2M_ENGINE_DEFAULT_LIFETIME=30
CONFIG_LWM2M_SERVER_DEFAULT_PMIN=1
CONFIG_LWM2M_SERVER_DEFAULT_PMAX=10
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_TLS_VERSION_1_2=y
# Special MbedTLS changes
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=8192
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=1500
CONFIG_MBEDTLS_CIPHER_CCM_ENABLED=y
# Disable RSA, we don't parse certs: saves flash/memory
CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=n
# Enable PSK instead
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
CONFIG_NET_SOCKETS_TLS_MAX_CONTEXTS=4
CONFIG_NET_SOCKETS_ENABLE_DTLS=y
# MbedTLS needs a larger stack
CONFIG_MAIN_STACK_SIZE=2048
CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=2048

View file

@ -0,0 +1,104 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import json
import requests
import binascii
class Leshan:
def __init__(self, url: str):
self.api_url = url
self.timeout = 10
self.format = 'TLV'
# self.format = "SENML_CBOR"
try:
resp = self.get('/security/clients')
if not isinstance(resp, list):
raise RuntimeError('Did not receive list of endpoints')
except requests.exceptions.ConnectionError:
raise RuntimeError('Leshan not responding')
@staticmethod
def handle_response(resp: requests.models.Response):
"""Generic response handler for all queries"""
if resp.status_code >= 300 or resp.status_code < 200:
raise RuntimeError(f'Error {resp.status_code}: {resp.text}')
if len(resp.text):
obj = json.loads(resp.text)
return obj
else:
return None
def get(self, path: str):
"""Send HTTP GET query"""
resp = requests.get(f'{self.api_url}{path}?timeout={self.timeout}&format={self.format}')
return Leshan.handle_response(resp)
def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None):
resp = requests.put(f'{self.api_url}{path}', data=data, headers=headers)
return Leshan.handle_response(resp)
def put(self, path: str, data: str | dict, uri_options: str = ''):
if isinstance(data, dict):
data = json.dumps(data)
return self.put_raw(f'{path}?timeout={self.timeout}&format={self.format}' + uri_options, data=data, headers={'content-type': 'application/json'})
def post(self, path: str, data: str | dict | None = None):
resp = requests.post(f'{self.api_url}{path}', data=data, headers={'content-type': 'application/json'})
return Leshan.handle_response(resp)
def delete(self, path: str):
resp = requests.delete(f'{self.api_url}{path}')
return Leshan.handle_response(resp)
def execute(self, endpoint: str, path: str):
return self.post(f'/clients/{endpoint}/{path}')
def write(self, endpoint: str, path: str, value: bool | int | str):
if isinstance(value, bool):
type = 'boolean'
value = "true" if value else "false"
elif isinstance(value, int):
type = 'integer'
value = str(value)
elif isinstance(value, str):
type = 'string'
value = '"' + value + '"'
id = path.split('/')[2]
return self.put(f'/clients/{endpoint}/{path}', f'{{"id":{id},"kind":"singleResource","value":{value},"type":"{type}"}}')
def read(self, endpoint: str, path: str):
resp = self.get(f'/clients/{endpoint}/{path}')
if not resp['success']:
return resp
content = resp['content']
if content['kind'] == 'instance':
return content['resources']
elif content['kind'] == 'singleResource':
return content['value']
elif content['kind'] == 'multiResource':
return content['values']
raise RuntimeError(f'Unhandled type {content["kind"]}')
def create_psk_device(self, endpoint: str, passwd: str):
psk = binascii.b2a_hex(passwd.encode()).decode()
self.put('/security/clients/', f'{{"endpoint":"{endpoint}","tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}} }} }}')
def delete_device(self, endpoint: str):
self.delete(f'/security/clients/{endpoint}')
def create_bs_device(self, endpoint: str, server_uri: str, passwd: str):
psk = binascii.b2a_hex(passwd.encode()).decode()
data = f'{{"tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}}}},"endpoint":"{endpoint}"}}'
self.put('/security/clients/', data)
id = str([ord(n) for n in endpoint])
key = str([ord(n) for n in passwd])
content = '{"servers":{"0":{"binding":"U","defaultMinPeriod":1,"lifetime":86400,"notifIfDisabled":false,"shortId":1}},"security":{"1":{"bootstrapServer":false,"clientOldOffTime":1,"publicKeyOrId":' + id + ',"secretKey":' + key + ',"securityMode":"PSK","serverId":1,"serverSmsNumber":"","smsBindingKeyParam":[],"smsBindingKeySecret":[],"smsSecurityMode":"NO_SEC","uri":"'+server_uri+'"}},"oscore":{},"toDelete":["/0","/1"]}'
self.post(f'/bootstrap/{endpoint}', content)
def delete_bs_device(self, endpoint: str):
self.delete(f'/security/clients/{endpoint}')
self.delete(f'/bootstrap/{endpoint}')

View file

@ -0,0 +1,446 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import time
import logging
import pytest
from leshan import Leshan
import os
import binascii
import random
import string
from twister_harness import Shell
LESHAN_IP: str = '192.0.2.2'
COAP_PORT: int = 5683
COAPS_PORT: int = 5684
BOOTSTRAP_COAPS_PORT: int = 5784
logger = logging.getLogger(__name__)
@pytest.fixture(scope='module')
def helperclient() -> object:
try:
from coapthon.client.helperclient import HelperClient
except ModuleNotFoundError:
pytest.skip('CoAPthon3 package not installed')
return HelperClient(server=('127.0.0.1', COAP_PORT))
@pytest.fixture(scope='session')
def leshan() -> Leshan:
try:
return Leshan("http://localhost:8080/api")
except RuntimeError:
pytest.skip('Leshan server not available')
@pytest.fixture(scope='session')
def leshan_bootstrap() -> Leshan:
try:
return Leshan("http://localhost:8081/api")
except RuntimeError:
pytest.skip('Leshan Bootstrap server not available')
#
# Test specification:
# https://www.openmobilealliance.org/release/LightweightM2M/ETS/OMA-ETS-LightweightM2M-V1_1-20190912-D.pdf
#
def verify_LightweightM2M_1_1_int_0(shell: Shell):
logger.info("LightweightM2M-1.1-int-0 - Client Initiated Bootstrap")
shell._device.readlines_until(regex='.*Bootstrap started with endpoint', timeout=5.0)
shell._device.readlines_until(regex='.*Bootstrap registration done', timeout=5.0)
shell._device.readlines_until(regex='.*Bootstrap data transfer done', timeout=5.0)
def verify_LightweightM2M_1_1_int_1(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-1 - Client Initiated Bootstrap Full (PSK)")
verify_LightweightM2M_1_1_int_0(shell)
verify_LightweightM2M_1_1_int_101(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_401(shell, leshan, endpoint)
def verify_LightweightM2M_1_1_int_101(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-101 - Initial Registration")
shell._device.readlines_until(regex='.*Registration Done', timeout=5.0)
assert leshan.get(f'/clients/{endpoint}')
def verify_LightweightM2M_1_1_int_102(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-102 - Registration Update")
lines = shell.get_filtered_output(shell.exec_command('lwm2m read 1/0/1 -u32'))
litetime = int(lines[0])
lifetime = litetime + 10
start_time = time.time() * 1000
leshan.write(endpoint, '1/0/1', lifetime)
shell._device.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
latest = leshan.get(f'/clients/{endpoint}')
assert latest["lastUpdate"] > start_time
assert latest["lastUpdate"] <= time.time()*1000
assert latest["lifetime"] == lifetime
shell.exec_command('lwm2m write 1/0/1 -u32 86400')
def verify_LightweightM2M_1_1_int_103():
"""LightweightM2M-1.1-int-103 - Deregistration"""
# Unsupported. We don't have "disabled" functionality in server object
def verify_LightweightM2M_1_1_int_104(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-104 - Registration Update Trigger")
shell.exec_command('lwm2m update')
shell._device.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
leshan.execute(endpoint, '1/0/8')
shell._device.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
def verify_LightweightM2M_1_1_int_105(shell: Shell, leshan: Leshan, endpoint: str, helperclient: object):
logger.info("LightweightM2M-1.1-int-105 - Discarded Register Update")
status = leshan.get(f'/clients/{endpoint}')
if status["secure"]:
logger.debug("Skip, requires non-secure connection")
return
id = status["registrationId"]
assert id
# Fake unregister message
helperclient.delete(f'rd/{id}', timeout=0.1)
helperclient.stop()
time.sleep(1)
shell.exec_command('lwm2m update')
shell._device.readlines_until(regex=r'.*Failed with code 4\.4', timeout=5.0)
shell._device.readlines_until(regex='.*Registration Done', timeout=10.0)
def verify_LightweightM2M_1_1_int_107(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-107 - Extending the lifetime of a registration")
leshan.write(endpoint, '1/0/1', 120)
shell._device.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
lines = shell.get_filtered_output(shell.exec_command('lwm2m read 1/0/1 -u32'))
lifetime = int(lines[0])
assert lifetime == 120
logger.debug(f'sleeping for {lifetime} s')
shell._device.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=lifetime)
assert leshan.get(f'/clients/{endpoint}')
def verify_LightweightM2M_1_1_int_108(leshan, endpoint):
logger.info("LightweightM2M-1.1-int-108 - Turn on Queue Mode")
assert leshan.get(f'/clients/{endpoint}')["queuemode"]
def verify_LightweightM2M_1_1_int_109(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-109 - Behavior in Queue Mode")
verify_LightweightM2M_1_1_int_107(shell, leshan, endpoint)
shell._device.readlines_until(regex='.*Queue mode RX window closed', timeout=120)
# Restore previous value
shell.exec_command('lwm2m write 1/0/1 -u32 86400')
shell._device.readlines_until(regex='.*Registration update complete', timeout=10)
def verify_LightweightM2M_1_1_int_201(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-201 - Querying basic information in Plain Text format")
fmt = leshan.format
leshan.format = 'TEXT'
assert leshan.get(f'/clients/{endpoint}/3/0/0')['content']['value'] == 'Zephyr'
assert leshan.get(f'/clients/{endpoint}/3/0/1')['content']['value'] == 'client-1'
assert leshan.get(f'/clients/{endpoint}/3/0/2')['content']['value'] == 'serial-1'
leshan.format = fmt
def verify_device_object(resp):
''' Verify that Device object match Configuration 3 '''
assert resp['valid'] is True
found = 0
for res in resp['content']['resources']:
if res['id'] == 0:
assert res['value'] == 'Zephyr'
found += 1
elif res['id'] == 1:
assert res['value'] == 'client-1'
found += 1
elif res['id'] == 2:
assert res['value'] == 'serial-1'
found += 1
elif res['id'] == 3:
assert res['value'] == '1.2.3'
found += 1
elif res['id'] == 11:
assert res['kind'] == 'multiResource'
assert res['values']['0'] == '0'
found += 1
elif res['id'] == 16:
assert res['value'] == 'U'
found += 1
assert found == 6
def verify_server_object(obj):
''' Verify that server object match Configuration 3 '''
found = 0
for res in obj['resources']:
if res['id'] == 0:
assert res['value'] == '1'
found += 1
elif res['id'] == 1:
assert res['value'] == '86400'
found += 1
elif res['id'] == 2:
assert res['value'] == '1'
found += 1
elif res['id'] == 3:
assert res['value'] == '10'
found += 1
elif res['id'] == 5:
assert res['value'] == '86400'
found += 1
elif res['id'] == 6:
assert res['value'] is False
found += 1
elif res['id'] == 7:
assert res['value'] == 'U'
found += 1
assert found == 7
def verify_LightweightM2M_1_1_int_203(shell: Shell, leshan: Leshan, endpoint: str):
shell.exec_command('lwm2m update')
logger.info('LightweightM2M-1.1-int-203 - Querying basic information in TLV format')
fmt = leshan.format
leshan.format = 'TLV'
resp = leshan.get(f'/clients/{endpoint}/3/0')
verify_device_object(resp)
leshan.format = fmt
def verify_LightweightM2M_1_1_int_204(shell: Shell, leshan: Leshan, endpoint: str):
shell.exec_command('lwm2m update')
logger.info('LightweightM2M-1.1-int-204 - Querying basic information in JSON format')
fmt = leshan.format
leshan.format = 'JSON'
resp = leshan.get(f'/clients/{endpoint}/3/0')
verify_device_object(resp)
leshan.format = fmt
def verify_LightweightM2M_1_1_int_205(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-205 - Setting basic information in Plain Text format')
fmt = leshan.format
leshan.format = 'TEXT'
leshan.write(endpoint, '1/0/2', 101)
leshan.write(endpoint, '1/0/3', 1010)
leshan.write(endpoint, '1/0/5', 2000)
assert leshan.read(endpoint, '1/0/2') == '101'
assert leshan.read(endpoint, '1/0/3') == '1010'
assert leshan.read(endpoint, '1/0/5') == '2000'
leshan.write(endpoint, '1/0/2', 1)
leshan.write(endpoint, '1/0/3', 10)
leshan.write(endpoint, '1/0/5', 86400)
assert leshan.read(endpoint, '1/0/2') == '1'
assert leshan.read(endpoint, '1/0/3') == '10'
assert leshan.read(endpoint, '1/0/5') == '86400'
leshan.format = fmt
def verify_LightweightM2M_1_1_int_211(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-211 - Querying basic information in CBOR format')
fmt = leshan.format
leshan.format = 'CBOR'
lines = shell.get_filtered_output(shell.exec_command('lwm2m read 1/0/0 -u16'))
id = lines[0]
assert leshan.read(endpoint, '1/0/0') == id
assert leshan.read(endpoint, '1/0/6') is False
assert leshan.read(endpoint, '1/0/7') == 'U'
leshan.format = fmt
def verify_LightweightM2M_1_1_int_212(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-212 - Setting basic information in CBOR format')
fmt = leshan.format
leshan.format = 'CBOR'
leshan.write(endpoint, '1/0/2', 101)
leshan.write(endpoint, '1/0/3', 1010)
leshan.write(endpoint, '1/0/6', True)
assert leshan.read(endpoint, '1/0/2') == '101'
assert leshan.read(endpoint, '1/0/3') == '1010'
assert leshan.read(endpoint, '1/0/6') is True
leshan.write(endpoint, '1/0/2', 1)
leshan.write(endpoint, '1/0/3', 10)
leshan.write(endpoint, '1/0/6', False)
leshan.format = fmt
def verify_setting_basic_in_format(shell, leshan, endpoint, format):
fmt = leshan.format
leshan.format = format
server_obj = leshan.get(f'/clients/{endpoint}/1/0')['content']
verify_server_object(server_obj)
# Remove Read-Only resources, so we don't end up writing those
for res in server_obj['resources']:
if res['id'] in (0, 11, 12):
server_obj['resources'].remove(res)
data = '''{
"kind": "instance",
"id": 0,
"resources": [
{
"id": 2,
"kind": "singleResource",
"value": "101",
"type": "integer"
},
{
"id": 3,
"kind": "singleResource",
"value": "1010",
"type": "integer"
},
{
"id": 5,
"kind": "singleResource",
"value": "2000",
"type": "integer"
},
{
"id": 6,
"kind": "singleResource",
"value": true,
"type": "boolean"
},
{
"id": 7,
"kind": "singleResource",
"value": "U",
"type": "string"
}
]
}'''
assert leshan.put(f'/clients/{endpoint}/1/0', data, uri_options = '&replace=false')['status'] == 'CHANGED(204)'
resp = leshan.get(f'/clients/{endpoint}/1/0')
assert resp['valid'] is True
found = 0
for res in resp['content']['resources']:
if res['id'] == 2:
assert res['value'] == '101'
found += 1
elif res['id'] == 3:
assert res['value'] == '1010'
found += 1
elif res['id'] == 5:
assert res['value'] == '2000'
found += 1
elif res['id'] == 6:
assert res['value'] is True
found += 1
elif res['id'] == 7:
assert res['value'] == 'U'
found += 1
assert found == 5
assert leshan.put(f'/clients/{endpoint}/1/0', data = server_obj, uri_options = '&replace=true')['status'] == 'CHANGED(204)'
server_obj = leshan.get(f'/clients/{endpoint}/1/0')['content']
verify_server_object(server_obj)
leshan.format = fmt
def verify_LightweightM2M_1_1_int_215(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-215 - Setting basic information in TLV format')
verify_setting_basic_in_format(shell, leshan, endpoint, 'TLV')
def verify_LightweightM2M_1_1_int_220(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-220 - Setting basic information in JSON format')
verify_setting_basic_in_format(shell, leshan, endpoint, 'JSON')
def verify_LightweightM2M_1_1_int_221(shell: Shell, leshan: Leshan, endpoint: str):
logger.info('LightweightM2M-1.1-int-221 - Attempt to perform operations on Security')
assert leshan.read(endpoint, '0/0')['status'] == 'UNAUTHORIZED(401)'
assert leshan.write(endpoint, '0/0/0', 'coap://localhost')['status'] == 'UNAUTHORIZED(401)'
assert leshan.put_raw(f'/clients/{endpoint}/0/attributes?pmin=10')['status'] == 'UNAUTHORIZED(401)'
def verify_LightweightM2M_1_1_int_401(shell: Shell, leshan: Leshan, endpoint: str):
logger.info("LightweightM2M-1.1-int-401 - UDP Channel Security - Pre-shared Key Mode")
lines = shell.get_filtered_output(shell.exec_command('lwm2m read 0/0/0 -s'))
host = lines[0]
assert 'coaps://' in host
lines = shell.get_filtered_output(shell.exec_command('lwm2m read 0/0/2 -u8'))
mode = int(lines[0])
assert mode == 0
resp = leshan.get(f'/clients/{endpoint}')
assert resp["secure"]
def test_lwm2m_bootstrap_psk(shell: Shell, leshan, leshan_bootstrap):
try:
# Generate randon device id and password (PSK key)
endpoint = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()
passwd = ''.join(random.choice(string.ascii_lowercase) for i in range(16))
# Create device entries in Leshan and Bootstrap server
leshan_bootstrap.create_bs_device(endpoint, f'coaps://{LESHAN_IP}:{COAPS_PORT}', passwd)
leshan.create_psk_device(endpoint, passwd)
# Allow engine to start & stop once.
time.sleep(2)
#
# Verify PSK security using Bootstrap
#
# Write bootsrap server information and PSK keys
shell.exec_command(f'lwm2m write 0/0/0 -s coaps://{LESHAN_IP}:{BOOTSTRAP_COAPS_PORT}')
shell.exec_command('lwm2m write 0/0/1 -b 1')
shell.exec_command('lwm2m write 0/0/2 -u8 0')
shell.exec_command(f'lwm2m write 0/0/3 -s {endpoint}')
shell.exec_command(f'lwm2m write 0/0/5 -s {passwd}')
shell.exec_command(f'lwm2m start {endpoint} -b 1')
#
# Bootstrap Interface test cases
# LightweightM2M-1.1-int-0 (included)
# LightweightM2M-1.1-int-401 (included)
verify_LightweightM2M_1_1_int_1(shell, leshan, endpoint)
#
# Registration Interface test cases (using PSK security)
#
verify_LightweightM2M_1_1_int_102(shell, leshan, endpoint)
# skip, not implemented verify_LightweightM2M_1_1_int_103()
verify_LightweightM2M_1_1_int_104(shell, leshan, endpoint)
# skip, included in 109: verify_LightweightM2M_1_1_int_107(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_108(leshan, endpoint)
verify_LightweightM2M_1_1_int_109(shell, leshan, endpoint)
#
# Device management & Service Enablement Interface test cases
#
verify_LightweightM2M_1_1_int_201(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_203(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_204(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_205(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_211(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_212(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_215(shell, leshan, endpoint)
shell.exec_command('lwm2m stop')
shell._device.readlines_until(regex=r'.*Deregistration success', timeout=10.0)
finally:
# Remove device and bootstrap information
# Leshan does not accept non-secure connection if device information is provided with PSK
leshan.delete_device(endpoint)
leshan_bootstrap.delete_bs_device(endpoint)
def test_lwm2m_nosecure(shell: Shell, leshan, helperclient):
# Allow engine to start & stop once.
time.sleep(2)
# Generate randon device id and password (PSK key)
endpoint = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()
#
# Registration Interface test cases (using Non-secure mode)
#
shell.exec_command(f'lwm2m write 0/0/0 -s coap://{LESHAN_IP}:{COAP_PORT}')
shell.exec_command('lwm2m write 0/0/1 -b 0')
shell.exec_command('lwm2m write 0/0/2 -u8 3')
shell.exec_command(f'lwm2m write 0/0/3 -s {endpoint}')
shell.exec_command('lwm2m create 1/0')
shell.exec_command('lwm2m write 0/0/10 -u16 1')
shell.exec_command('lwm2m write 1/0/0 -u16 1')
shell.exec_command('lwm2m write 1/0/1 -u32 86400')
shell.exec_command(f'lwm2m start {endpoint} -b 0')
shell._device.readlines_until(regex=f"RD Client started with endpoint '{endpoint}'", timeout=10.0)
verify_LightweightM2M_1_1_int_101(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_105(shell, leshan, endpoint, helperclient) # needs no-security
verify_LightweightM2M_1_1_int_215(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_220(shell, leshan, endpoint)
verify_LightweightM2M_1_1_int_221(shell, leshan, endpoint)
# All done
shell.exec_command('lwm2m stop')
shell._device.readlines_until(regex=r'.*Deregistration success', timeout=10.0)

View file

@ -0,0 +1 @@
CoAPthon3

View file

@ -0,0 +1,171 @@
/*
* Copyright (c) 2017 Linaro Limited
* Copyright (c) 2017-2019 Foundries.io
*
* SPDX-License-Identifier: Apache-2.0
*/
#define LOG_MODULE_NAME net_lwm2m_client_app
#define LOG_LEVEL LOG_LEVEL_DBG
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(LOG_MODULE_NAME);
#include <stdlib.h>
#include <zephyr/drivers/hwinfo.h>
#include <zephyr/kernel.h>
#include <zephyr/drivers/gpio.h>
#include <zephyr/drivers/sensor.h>
#include <zephyr/net/lwm2m.h>
#define APP_BANNER "Run LWM2M client"
#define WAIT_TIME K_SECONDS(10)
#define CONNECT_TIME K_SECONDS(10)
#define NAME "Zephyr"
#define MODEL "client-1"
#define SERIAL "serial-1"
#define VERSION "1.2.3"
static struct lwm2m_ctx client;
static int device_reboot_cb(uint16_t obj_inst_id,
uint8_t *args, uint16_t args_len)
{
LOG_INF("DEVICE: REBOOT");
return 0;
}
static int lwm2m_setup(void)
{
/* setup DEVICE object */
lwm2m_set_res_buf(&LWM2M_OBJ(3, 0, 0), NAME, sizeof(NAME),
sizeof(NAME), LWM2M_RES_DATA_FLAG_RO);
lwm2m_set_res_buf(&LWM2M_OBJ(3, 0, 1), MODEL, sizeof(MODEL),
sizeof(MODEL), LWM2M_RES_DATA_FLAG_RO);
lwm2m_set_res_buf(&LWM2M_OBJ(3, 0, 2), SERIAL, sizeof(SERIAL),
sizeof(SERIAL), LWM2M_RES_DATA_FLAG_RO);
lwm2m_set_res_buf(&LWM2M_OBJ(3, 0, 3), VERSION, sizeof(VERSION),
sizeof(VERSION), LWM2M_RES_DATA_FLAG_RO);
lwm2m_register_exec_callback(&LWM2M_OBJ(3, 0, 4), device_reboot_cb);
lwm2m_set_res_buf(&LWM2M_OBJ(3, 0, 17), CONFIG_BOARD, sizeof(CONFIG_BOARD),
sizeof(CONFIG_BOARD), LWM2M_RES_DATA_FLAG_RO);
return 0;
}
static void rd_client_event(struct lwm2m_ctx *client,
enum lwm2m_rd_client_event client_event)
{
switch (client_event) {
case LWM2M_RD_CLIENT_EVENT_NONE:
/* do nothing */
break;
case LWM2M_RD_CLIENT_EVENT_BOOTSTRAP_REG_FAILURE:
LOG_DBG("Bootstrap registration failure!");
break;
case LWM2M_RD_CLIENT_EVENT_BOOTSTRAP_REG_COMPLETE:
LOG_DBG("Bootstrap registration complete");
break;
case LWM2M_RD_CLIENT_EVENT_BOOTSTRAP_TRANSFER_COMPLETE:
LOG_DBG("Bootstrap transfer complete");
break;
case LWM2M_RD_CLIENT_EVENT_REGISTRATION_FAILURE:
LOG_DBG("Registration failure!");
break;
case LWM2M_RD_CLIENT_EVENT_REGISTRATION_COMPLETE:
LOG_DBG("Registration complete");
break;
case LWM2M_RD_CLIENT_EVENT_REG_TIMEOUT:
LOG_DBG("Registration timeout!");
break;
case LWM2M_RD_CLIENT_EVENT_REG_UPDATE_COMPLETE:
LOG_DBG("Registration update complete");
break;
case LWM2M_RD_CLIENT_EVENT_DEREGISTER_FAILURE:
LOG_DBG("Deregister failure!");
break;
case LWM2M_RD_CLIENT_EVENT_DISCONNECT:
LOG_DBG("Disconnected");
break;
case LWM2M_RD_CLIENT_EVENT_QUEUE_MODE_RX_OFF:
LOG_DBG("Queue mode RX window closed");
break;
case LWM2M_RD_CLIENT_EVENT_ENGINE_SUSPENDED:
LOG_DBG("LwM2M engine suspended");
break;
case LWM2M_RD_CLIENT_EVENT_NETWORK_ERROR:
LOG_ERR("LwM2M engine reported a network error.");
lwm2m_rd_client_stop(client, rd_client_event, true);
break;
case LWM2M_RD_CLIENT_EVENT_REG_UPDATE:
LOG_DBG("Registration update");
break;
}
}
static void observe_cb(enum lwm2m_observe_event event,
struct lwm2m_obj_path *path, void *user_data)
{
char buf[LWM2M_MAX_PATH_STR_SIZE];
switch (event) {
case LWM2M_OBSERVE_EVENT_OBSERVER_ADDED:
LOG_INF("Observer added for %s", lwm2m_path_log_buf(buf, path));
break;
case LWM2M_OBSERVE_EVENT_OBSERVER_REMOVED:
LOG_INF("Observer removed for %s", lwm2m_path_log_buf(buf, path));
break;
case LWM2M_OBSERVE_EVENT_NOTIFY_ACK:
LOG_INF("Notify acknowledged for %s", lwm2m_path_log_buf(buf, path));
break;
case LWM2M_OBSERVE_EVENT_NOTIFY_TIMEOUT:
LOG_INF("Notify timeout for %s, trying registration update",
lwm2m_path_log_buf(buf, path));
lwm2m_rd_client_update();
break;
}
}
int main(void)
{
int ret;
#if defined(CONFIG_BOARD_NATIVE_POSIX)
srandom(time(NULL));
#endif
ret = lwm2m_setup();
if (ret < 0) {
LOG_ERR("Cannot setup LWM2M fields (%d)", ret);
return 0;
}
client.tls_tag = 1;
lwm2m_rd_client_start(&client, CONFIG_BOARD, 0, rd_client_event, observe_cb);
lwm2m_rd_client_stop(&client, rd_client_event, false);
return 0;
}

View file

@ -0,0 +1,14 @@
tests:
net.lwm2m.interop:
harness: pytest
timeout: 300
slow: true
integration_platforms:
- native_posix
platform_allow:
- native_posix
- qemu_cortex_m3
tags:
- testing
- pytest
- shell