diff --git a/tests/drivers/can/host/CMakeLists.txt b/tests/drivers/can/host/CMakeLists.txt new file mode 100644 index 00000000000..6ebd263b216 --- /dev/null +++ b/tests/drivers/can/host/CMakeLists.txt @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(can_host) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/drivers/can/host/README.rst b/tests/drivers/can/host/README.rst new file mode 100644 index 00000000000..b6a3fcef5c8 --- /dev/null +++ b/tests/drivers/can/host/README.rst @@ -0,0 +1,125 @@ +.. _can_host_tests: + +Controller Area Network (CAN) Host Tests +######################################## + +Overview +******** + +This test suite uses `python-can`_ for testing Controller Area Network (CAN) communication between a +host PC (running :ref:`Twister `) and a device under test (DUT) running Zephyr. + +Prerequisites +************* + +The test suite has the following prerequisites: + +* The python-can library installed on the host PC. +* A CAN fixture creating a CAN bus between the host PC and the DUT. + +The Zephyr end of the CAN fixture can be configured as follows: + +* The CAN controller to be used is set using the ``zephyr,canbus`` chosen devicetree node. +* The CAN bitrates are set using :kconfig:option:`CONFIG_CAN_DEFAULT_BITRATE` and + :kconfig:option:`CONFIG_CAN_DEFAULT_BITRATE_DATA`, but can be overridden on a board level using + the ``bus-speed`` and ``bus-speed-data`` CAN controller devicetree properties if needed. Default + bitrates are 125 kbits/s for the arbitration phase/CAN classic and 1 Mbit/s for the CAN FD data + phase when using bitrate switching (BRS). + +The host end of the CAN fixture can be configured through python-can. Available configuration +options depend on the type of host CAN adapter used. The python-can library provides a lot of +flexibility for configuration as decribed in the `python-can configuration`_ page. By default, the +python-can configuration context is not specified, causing python-can to use the default +configuration context. The context can be overridden using the ``--can-context`` test suite argument +(see examples below). + +Building and Running +******************** + +Running on native_sim +===================== + +Running the test suite on :ref:`native_sim` relies on the `Linux SocketCAN`_ virtual CAN driver +(vcan) providing a virtual CAN interface named ``zcan0``. + +On the host PC, a virtual SocketCAN interface needs to be created and brought up before running the +test suite: + +.. code-block:: shell + + sudo ip link add dev zcan0 type vcan + sudo ip link set up zcan0 + +Next, python-can needs to be configured for the ``zcan0`` interface. One option is to use a +dedicated ``zcan0`` context in the ``~/.canrc`` configuration file as shown here: + +.. code-block:: ini + + [zcan0] + interface = socketcan + channel = zcan0 + fd = True + +Once the virtual SocketCAN interface has been created, brought up, and configured the test suite can +be launched using Twister: + +.. code-block:: shell + + west twister -v -p native_sim/native/64 -X can -T tests/drivers/can/host/ --pytest-args=--can-context=zcan0 + +After the test suite has completed, the virtual SocketCAN interface can be removed again: + +.. code-block:: shell + + sudo ip link del zcan0 + +Running on Hardware +=================== + +Running the test suite on hardware requires a physical CAN adapter connected to the host PC. The CAN +adapter must be supported by python-can. The examples below assumes using a Linux SocketCAN +interface named ``can0``. For other platforms/adapters, please see the `python-can`_ documentation. + +The CAN bus of the CAN adapter must be connected to the CAN connector of the device under test. +Make sure the CAN bus is terminated with 120 ohm resistors at both ends. The termination resistor +may already be present on the device under test, but CAN adapters typically require external bus +termination. + +.. code-block:: shell + + # Leave out "dbitrate 1000000 fd on" if can0 does not support CAN FD + sudo ip link set can0 type can restart-ms 1000 bitrate 125000 dbitrate 1000000 fd on + sudo ip link set up can0 + +Next, python-can needs to be configured for the ``can0`` interface. One option is to use a dedicated +``can0`` context in the ``~/.canrc`` configuration file as shown here: + +.. code-block:: ini + + [can0] + interface = socketcan + channel = can0 + # Set "fd = False" if can0 does not support CAN FD + fd = True + +Once the SocketCAN interface has been brought up and configured the test suite can be launched using +Twister. Below is an example for running on the :ref:`lpcxpresso55s36`: + +.. code-block:: shell + + west twister -v -p lpcxpresso55s36/lpc55s36 --device-testing --device-serial /dev/ttyACM0 -X can -T tests/drivers/can/host/ --pytest-args=--can-context=can0 + +After the test suite has completed, the SocketCAN interface can be brought down again: + +.. code-block:: shell + + sudo ip link set down can0 + +.. _python-can: + https://python-can.readthedocs.io + +.. _python-can configuration: + https://python-can.readthedocs.io/en/stable/configuration.html + +.. _Linux SocketCAN: + https://www.kernel.org/doc/html/latest/networking/can.html diff --git a/tests/drivers/can/host/boards/native_sim.overlay b/tests/drivers/can/host/boards/native_sim.overlay new file mode 100644 index 00000000000..5eff0729acc --- /dev/null +++ b/tests/drivers/can/host/boards/native_sim.overlay @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Vestas Wind Systems A/S + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/ { + chosen { + zephyr,canbus = &can0; + }; +}; + +&can_loopback0 { + status = "disabled"; +}; + +&can0 { + status = "okay"; +}; diff --git a/tests/drivers/can/host/boards/native_sim_native_64.overlay b/tests/drivers/can/host/boards/native_sim_native_64.overlay new file mode 100644 index 00000000000..f8c12d391b9 --- /dev/null +++ b/tests/drivers/can/host/boards/native_sim_native_64.overlay @@ -0,0 +1,7 @@ +/* + * Copyright 2024 Vestas Wind Systems A/S + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "native_sim.overlay" diff --git a/tests/drivers/can/host/prj.conf b/tests/drivers/can/host/prj.conf new file mode 100644 index 00000000000..8005525fc2a --- /dev/null +++ b/tests/drivers/can/host/prj.conf @@ -0,0 +1,5 @@ +CONFIG_SHELL=y +CONFIG_CAN=y +CONFIG_CAN_FD_MODE=y +CONFIG_CAN_SHELL=y +CONFIG_CAN_SHELL_SCRIPTING_FRIENDLY=y diff --git a/tests/drivers/can/host/pytest/can_shell.py b/tests/drivers/can/host/pytest/can_shell.py new file mode 100644 index 00000000000..284b96f38d4 --- /dev/null +++ b/tests/drivers/can/host/pytest/can_shell.py @@ -0,0 +1,197 @@ +# Copyright (c) 2024 Vestas Wind Systems A/S +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Zephyr CAN shell module support for providing a python-can bus interface for testing. +""" + +import re +import logging +from typing import Optional, Tuple + +from can import BusABC, CanProtocol, Message +from can.exceptions import CanInitializationError, CanOperationError +from can.typechecking import CanFilters + +from twister_harness import DeviceAdapter, Shell + +logger = logging.getLogger(__name__) + +class CanShellBus(BusABC): # pylint: disable=abstract-method + """ + A CAN interface using the Zephyr CAN shell module. + """ + + def __init__(self, dut: DeviceAdapter, shell: Shell, channel: str, + can_filters: Optional[CanFilters] = None, **kwargs) -> None: + self._dut = dut + self._shell = shell + self._device = channel + self._is_filtered = False + self._filter_ids = [] + + self.channel_info = f'Zephyr CAN shell, device "{self._device}"' + + mode = 'normal' + if 'fd' in self._get_capabilities(): + self._can_protocol = CanProtocol.CAN_FD + mode += ' fd' + else: + self._can_protocol = CanProtocol.CAN_20 + + self._set_mode(mode) + self._start() + + super().__init__(channel=channel, can_filters=can_filters, **kwargs) + + def _retval(self): + """Get return value of last shell command.""" + return int(self._shell.get_filtered_output(self._shell.exec_command('retval'))[0]) + + def _get_capabilities(self) -> list[str]: + cmd = f'can show {self._device}' + + lines = self._shell.get_filtered_output(self._shell.exec_command(cmd)) + regex_compiled = re.compile(r'capabilities:\s+(?P.*)') + for line in lines: + m = regex_compiled.match(line) + if m: + return m.group('caps').split() + + raise CanOperationError('capabilities not found') + + def _set_mode(self, mode: str) -> None: + self._shell.exec_command(f'can mode {self._device} {mode}') + retval = self._retval() + if retval != 0: + raise CanOperationError(f'failed to set mode "{mode}" (err {retval})') + + def _start(self): + self._shell.exec_command(f'can start {self._device}') + retval = self._retval() + if retval != 0: + raise CanInitializationError(f'failed to start (err {retval})') + + def _stop(self): + self._shell.exec_command(f'can stop {self._device}') + + def send(self, msg: Message, timeout: Optional[float] = None) -> None: + logger.debug('sending: %s', msg) + + cmd = f'can send {self._device}' + cmd += ' -e' if msg.is_extended_id else '' + cmd += ' -r' if msg.is_remote_frame else '' + cmd += ' -f' if msg.is_fd else '' + cmd += ' -b' if msg.bitrate_switch else '' + + if msg.is_extended_id: + cmd += f' {msg.arbitration_id:08x}' + else: + cmd += f' {msg.arbitration_id:03x}' + + if msg.data: + cmd += ' ' + msg.data.hex(' ', 1) + + lines = self._shell.exec_command(cmd) + regex_compiled = re.compile(r'enqueuing\s+CAN\s+frame\s+#(?P\d+)') + frame_num = None + for line in lines: + m = regex_compiled.match(line) + if m: + frame_num = m.group('id') + break + + if frame_num is None: + raise CanOperationError('frame not enqueued') + + tx_regex = r'CAN\s+frame\s+#' + frame_num + r'\s+successfully\s+sent' + self._dut.readlines_until(regex=tx_regex, timeout=timeout) + + def _add_filter(self, can_id: int, can_mask: int, extended: bool) -> None: + """Add RX filter.""" + cmd = f'can filter add {self._device}' + cmd += ' -e' if extended else '' + + if extended: + cmd += f' {can_id:08x}' + cmd += f' {can_mask:08x}' + else: + cmd += f' {can_id:03x}' + cmd += f' {can_mask:03x}' + + lines = self._shell.exec_command(cmd) + regex_compiled = re.compile(r'filter\s+ID:\s+(?P\d+)') + for line in lines: + m = regex_compiled.match(line) + if m: + filter_id = int(m.group('id')) + self._filter_ids.append(filter_id) + return + + raise CanOperationError('filter_id not found') + + def _remove_filter(self, filter_id: int) -> None: + """Remove RX filter.""" + if filter_id in self._filter_ids: + self._filter_ids.remove(filter_id) + + self._shell.exec_command(f'can filter remove {self._device} {filter_id}') + retval = self._retval() + if retval != 0: + raise CanOperationError(f'failed to remove filter ID {filter_id} (err {retval})') + + def _remove_all_filters(self) -> None: + """Remove all RX filters.""" + for filter_id in self._filter_ids[:]: + self._remove_filter(filter_id) + + def _apply_filters(self, filters: Optional[CanFilters]) -> None: + self._remove_all_filters() + + if filters: + self._is_filtered = True + else: + # Accept all frames if no hardware filters provided + filters = [ + {'can_id': 0x0, 'can_mask': 0x0}, + {'can_id': 0x0, 'can_mask': 0x0, 'extended': True} + ] + self._is_filtered = False + + for can_filter in filters: + can_id = can_filter['can_id'] + can_mask = can_filter['can_mask'] + extended = can_filter['extended'] if 'extended' in can_filter else False + self._add_filter(can_id, can_mask, extended) + + def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[Message], bool]: + frame_regex = r'.*' + re.escape(self._device) + \ + r'\s+(?P\S)(?P\S)\s+(?P\d+)\s+\[(?P\d+)\]\s*(?P[a-z0-9 ]*)' + lines = self._dut.readlines_until(regex=frame_regex, timeout=timeout) + msg = None + + regex_compiled = re.compile(frame_regex) + for line in lines: + m = regex_compiled.match(line) + if m: + can_id = int(m.group('can_id'), 16) + ext = len(m.group('can_id')) == 8 + dlc = int(m.group('dlc')) + fd = len(m.group('dlc')) == 2 + brs = m.group('brs') == 'B' + esi = m.group('esi') == 'P' + data = bytearray.fromhex(m.group('data')) + msg = Message(arbitration_id=can_id,is_extended_id=ext, + data=data, dlc=dlc, + is_fd=fd, bitrate_switch=brs, error_state_indicator=esi, + channel=self._device, check=True) + logger.debug('received: %s', msg) + + return msg, self._is_filtered + + def shutdown(self) -> None: + if not self._is_shutdown: + super().shutdown() + self._stop() + self._remove_all_filters() diff --git a/tests/drivers/can/host/pytest/conftest.py b/tests/drivers/can/host/pytest/conftest.py new file mode 100644 index 00000000000..f7ffe763a69 --- /dev/null +++ b/tests/drivers/can/host/pytest/conftest.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024 Vestas Wind Systems A/S +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Configuration of Zephyr CAN <=> host CAN test suite. +""" + +import re +import logging +import pytest + +from twister_harness import DeviceAdapter, Shell + +from can import Bus, BusABC +from can_shell import CanShellBus + +logger = logging.getLogger(__name__) + +def pytest_addoption(parser) -> None: + """Add local parser options to pytest.""" + parser.addoption('--can-context', default=None, + help='Configuration context to use for python-can (default: None)') + +@pytest.fixture(name='context', scope='session') +def fixture_context(request) -> str: + """Return the name of the python-can configuration context to use.""" + ctx = request.config.getoption('--can-context') + logger.info('using python-can configuration context "%s"', ctx) + return ctx + +@pytest.fixture(name='chosen', scope='module') +def fixture_chosen(shell: Shell) -> str: + """Return the name of the zephyr,canbus devicetree chosen device.""" + chosen_regex = re.compile(r'zephyr,canbus:\s+(\S+)') + lines = shell.get_filtered_output(shell.exec_command('can_host chosen')) + + for line in lines: + m = chosen_regex.match(line) + if m: + chosen = m.groups()[0] + logger.info('testing on zephyr,canbus chosen device "%s"', chosen) + return chosen + + pytest.fail('zephyr,canbus chosen device not found or not ready') + return None + +@pytest.fixture +def can_dut(dut: DeviceAdapter, shell: Shell, chosen: str) -> BusABC: + """Return DUT CAN bus.""" + bus = CanShellBus(dut, shell, chosen) + yield bus + bus.shutdown() + dut.clear_buffer() + +@pytest.fixture +def can_host(context: str) -> BusABC: + """Return host CAN bus.""" + bus = Bus(config_context = context) + yield bus + bus.shutdown() diff --git a/tests/drivers/can/host/pytest/test_can.py b/tests/drivers/can/host/pytest/test_can.py new file mode 100644 index 00000000000..3a91813f75e --- /dev/null +++ b/tests/drivers/can/host/pytest/test_can.py @@ -0,0 +1,94 @@ +# Copyright (c) 2024 Vestas Wind Systems A/S +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Test suites for testing Zephyr CAN <=> host CAN. +""" + +import logging +import pytest + +import can +from can import BusABC, CanProtocol + +# RX/TX timeout in seconds +TIMEOUT = 1.0 + +logger = logging.getLogger(__name__) + +@pytest.mark.parametrize('msg', [ + pytest.param( + can.Message(arbitration_id=0x10, + is_extended_id=False), + id='std_id_dlc_0' + ), + pytest.param( + can.Message(arbitration_id=0x20, + data=[0xaa, 0xbb, 0xcc, 0xdd], + is_extended_id=False), + id='std_id_dlc_4' + ), + pytest.param( + can.Message(arbitration_id=0x30, + data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff], + is_extended_id=True), + id='ext_id_dlc_8' + ), + pytest.param( + can.Message(arbitration_id=0x40, + data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, + 0x10, 0x11], + is_fd=True, is_extended_id=False), + id='std_id_fdf_dlc_9' + ), + pytest.param( + can.Message(arbitration_id=0x50, + data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, + 0x10, 0x11], + is_fd=True, bitrate_switch=True, is_extended_id=False), + id='std_id_fdf_brs_dlc_9' + ), +]) +class TestCanRxTx(): + """ + Class for testing CAN RX/TX between Zephyr DUT and host. + """ + + @staticmethod + def check_rx(tx: can.Message, rx: can.Message) -> None: + """Check if received message matches transmitted message.""" + # pylint: disable-next=unused-variable + __tracebackhide__ = True + + if rx is None: + pytest.fail('no message received') + + if not tx.equals(rx, timestamp_delta=None, check_channel=False, + check_direction=False): + pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"') + + @staticmethod + def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: + """Skip test if message format is not supported by both DUT and host.""" + if msg.is_fd: + if can_dut.protocol == CanProtocol.CAN_20: + pytest.skip('CAN FD not supported by DUT') + if can_host.protocol == CanProtocol.CAN_20: + pytest.skip('CAN FD not supported by host') + + def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: + """Test DUT to host communication.""" + self.skip_if_unsupported(can_dut, can_host, msg) + + can_dut.send(msg, timeout=TIMEOUT) + rx = can_host.recv(timeout=TIMEOUT) + self.check_rx(msg, rx) + + def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: + """Test host to DUT communication.""" + self.skip_if_unsupported(can_dut, can_host, msg) + + can_host.send(msg, timeout=TIMEOUT) + rx = can_dut.recv(timeout=TIMEOUT) + self.check_rx(msg, rx) diff --git a/tests/drivers/can/host/src/main.c b/tests/drivers/can/host/src/main.c new file mode 100644 index 00000000000..f329fbe15f9 --- /dev/null +++ b/tests/drivers/can/host/src/main.c @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Vestas Wind Systems A/S + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +static const struct device *chosen = DEVICE_DT_GET(DT_CHOSEN(zephyr_canbus)); + +static int cmd_can_host_chosen(const struct shell *sh, size_t argc, char **argv) +{ + ARG_UNUSED(argc); + ARG_UNUSED(argv); + + if (!device_is_ready(chosen)) { + shell_error(sh, "zephyr,canbus device %s not ready", chosen->name); + return -ENODEV; + } + + shell_print(sh, "zephyr,canbus: %s", chosen->name); + + return 0; +} + +SHELL_STATIC_SUBCMD_SET_CREATE(sub_can_host_cmds, + SHELL_CMD(chosen, NULL, + "Get zephyr,canbus chosen device name\n" + "Usage: can_host chosen", + cmd_can_host_chosen), + SHELL_SUBCMD_SET_END +); + +SHELL_CMD_REGISTER(can_host, &sub_can_host_cmds, "CAN host test commands", NULL); diff --git a/tests/drivers/can/host/testcase.yaml b/tests/drivers/can/host/testcase.yaml new file mode 100644 index 00000000000..9c5ed9a1e37 --- /dev/null +++ b/tests/drivers/can/host/testcase.yaml @@ -0,0 +1,17 @@ +common: + tags: + - drivers + - can + depends_on: can +tests: + drivers.can.host: + filter: dt_chosen_enabled("zephyr,canbus") + harness: pytest + harness_config: + pytest_dut_scope: session + fixture: can + extra_configs: + - arch:posix:CONFIG_NATIVE_UART_0_ON_STDINOUT=y + integration_platforms: + - native_sim + - native_sim/native/64