tests: drivers: can: add host communication test suite

Add test suite using python-can for testing Controller Area Network (CAN)
communication between a host PC and a device under test running Zephyr.

Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
This commit is contained in:
Henrik Brix Andersen 2024-05-27 21:02:46 +00:00 committed by Maureen Helm
commit 127cb9edb6
10 changed files with 568 additions and 0 deletions

View file

@ -0,0 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(can_host)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})

View file

@ -0,0 +1,125 @@
.. _can_host_tests:
Controller Area Network (CAN) Host Tests
########################################
Overview
********
This test suite uses `python-can`_ for testing Controller Area Network (CAN) communication between a
host PC (running :ref:`Twister <twister_script>`) and a device under test (DUT) running Zephyr.
Prerequisites
*************
The test suite has the following prerequisites:
* The python-can library installed on the host PC.
* A CAN fixture creating a CAN bus between the host PC and the DUT.
The Zephyr end of the CAN fixture can be configured as follows:
* The CAN controller to be used is set using the ``zephyr,canbus`` chosen devicetree node.
* The CAN bitrates are set using :kconfig:option:`CONFIG_CAN_DEFAULT_BITRATE` and
:kconfig:option:`CONFIG_CAN_DEFAULT_BITRATE_DATA`, but can be overridden on a board level using
the ``bus-speed`` and ``bus-speed-data`` CAN controller devicetree properties if needed. Default
bitrates are 125 kbits/s for the arbitration phase/CAN classic and 1 Mbit/s for the CAN FD data
phase when using bitrate switching (BRS).
The host end of the CAN fixture can be configured through python-can. Available configuration
options depend on the type of host CAN adapter used. The python-can library provides a lot of
flexibility for configuration as decribed in the `python-can configuration`_ page. By default, the
python-can configuration context is not specified, causing python-can to use the default
configuration context. The context can be overridden using the ``--can-context`` test suite argument
(see examples below).
Building and Running
********************
Running on native_sim
=====================
Running the test suite on :ref:`native_sim` relies on the `Linux SocketCAN`_ virtual CAN driver
(vcan) providing a virtual CAN interface named ``zcan0``.
On the host PC, a virtual SocketCAN interface needs to be created and brought up before running the
test suite:
.. code-block:: shell
sudo ip link add dev zcan0 type vcan
sudo ip link set up zcan0
Next, python-can needs to be configured for the ``zcan0`` interface. One option is to use a
dedicated ``zcan0`` context in the ``~/.canrc`` configuration file as shown here:
.. code-block:: ini
[zcan0]
interface = socketcan
channel = zcan0
fd = True
Once the virtual SocketCAN interface has been created, brought up, and configured the test suite can
be launched using Twister:
.. code-block:: shell
west twister -v -p native_sim/native/64 -X can -T tests/drivers/can/host/ --pytest-args=--can-context=zcan0
After the test suite has completed, the virtual SocketCAN interface can be removed again:
.. code-block:: shell
sudo ip link del zcan0
Running on Hardware
===================
Running the test suite on hardware requires a physical CAN adapter connected to the host PC. The CAN
adapter must be supported by python-can. The examples below assumes using a Linux SocketCAN
interface named ``can0``. For other platforms/adapters, please see the `python-can`_ documentation.
The CAN bus of the CAN adapter must be connected to the CAN connector of the device under test.
Make sure the CAN bus is terminated with 120 ohm resistors at both ends. The termination resistor
may already be present on the device under test, but CAN adapters typically require external bus
termination.
.. code-block:: shell
# Leave out "dbitrate 1000000 fd on" if can0 does not support CAN FD
sudo ip link set can0 type can restart-ms 1000 bitrate 125000 dbitrate 1000000 fd on
sudo ip link set up can0
Next, python-can needs to be configured for the ``can0`` interface. One option is to use a dedicated
``can0`` context in the ``~/.canrc`` configuration file as shown here:
.. code-block:: ini
[can0]
interface = socketcan
channel = can0
# Set "fd = False" if can0 does not support CAN FD
fd = True
Once the SocketCAN interface has been brought up and configured the test suite can be launched using
Twister. Below is an example for running on the :ref:`lpcxpresso55s36`:
.. code-block:: shell
west twister -v -p lpcxpresso55s36/lpc55s36 --device-testing --device-serial /dev/ttyACM0 -X can -T tests/drivers/can/host/ --pytest-args=--can-context=can0
After the test suite has completed, the SocketCAN interface can be brought down again:
.. code-block:: shell
sudo ip link set down can0
.. _python-can:
https://python-can.readthedocs.io
.. _python-can configuration:
https://python-can.readthedocs.io/en/stable/configuration.html
.. _Linux SocketCAN:
https://www.kernel.org/doc/html/latest/networking/can.html

View file

@ -0,0 +1,19 @@
/*
* Copyright 2024 Vestas Wind Systems A/S
*
* SPDX-License-Identifier: Apache-2.0
*/
/ {
chosen {
zephyr,canbus = &can0;
};
};
&can_loopback0 {
status = "disabled";
};
&can0 {
status = "okay";
};

View file

@ -0,0 +1,7 @@
/*
* Copyright 2024 Vestas Wind Systems A/S
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "native_sim.overlay"

View file

@ -0,0 +1,5 @@
CONFIG_SHELL=y
CONFIG_CAN=y
CONFIG_CAN_FD_MODE=y
CONFIG_CAN_SHELL=y
CONFIG_CAN_SHELL_SCRIPTING_FRIENDLY=y

View file

@ -0,0 +1,197 @@
# Copyright (c) 2024 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
"""
Zephyr CAN shell module support for providing a python-can bus interface for testing.
"""
import re
import logging
from typing import Optional, Tuple
from can import BusABC, CanProtocol, Message
from can.exceptions import CanInitializationError, CanOperationError
from can.typechecking import CanFilters
from twister_harness import DeviceAdapter, Shell
logger = logging.getLogger(__name__)
class CanShellBus(BusABC): # pylint: disable=abstract-method
"""
A CAN interface using the Zephyr CAN shell module.
"""
def __init__(self, dut: DeviceAdapter, shell: Shell, channel: str,
can_filters: Optional[CanFilters] = None, **kwargs) -> None:
self._dut = dut
self._shell = shell
self._device = channel
self._is_filtered = False
self._filter_ids = []
self.channel_info = f'Zephyr CAN shell, device "{self._device}"'
mode = 'normal'
if 'fd' in self._get_capabilities():
self._can_protocol = CanProtocol.CAN_FD
mode += ' fd'
else:
self._can_protocol = CanProtocol.CAN_20
self._set_mode(mode)
self._start()
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
def _retval(self):
"""Get return value of last shell command."""
return int(self._shell.get_filtered_output(self._shell.exec_command('retval'))[0])
def _get_capabilities(self) -> list[str]:
cmd = f'can show {self._device}'
lines = self._shell.get_filtered_output(self._shell.exec_command(cmd))
regex_compiled = re.compile(r'capabilities:\s+(?P<caps>.*)')
for line in lines:
m = regex_compiled.match(line)
if m:
return m.group('caps').split()
raise CanOperationError('capabilities not found')
def _set_mode(self, mode: str) -> None:
self._shell.exec_command(f'can mode {self._device} {mode}')
retval = self._retval()
if retval != 0:
raise CanOperationError(f'failed to set mode "{mode}" (err {retval})')
def _start(self):
self._shell.exec_command(f'can start {self._device}')
retval = self._retval()
if retval != 0:
raise CanInitializationError(f'failed to start (err {retval})')
def _stop(self):
self._shell.exec_command(f'can stop {self._device}')
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
logger.debug('sending: %s', msg)
cmd = f'can send {self._device}'
cmd += ' -e' if msg.is_extended_id else ''
cmd += ' -r' if msg.is_remote_frame else ''
cmd += ' -f' if msg.is_fd else ''
cmd += ' -b' if msg.bitrate_switch else ''
if msg.is_extended_id:
cmd += f' {msg.arbitration_id:08x}'
else:
cmd += f' {msg.arbitration_id:03x}'
if msg.data:
cmd += ' ' + msg.data.hex(' ', 1)
lines = self._shell.exec_command(cmd)
regex_compiled = re.compile(r'enqueuing\s+CAN\s+frame\s+#(?P<id>\d+)')
frame_num = None
for line in lines:
m = regex_compiled.match(line)
if m:
frame_num = m.group('id')
break
if frame_num is None:
raise CanOperationError('frame not enqueued')
tx_regex = r'CAN\s+frame\s+#' + frame_num + r'\s+successfully\s+sent'
self._dut.readlines_until(regex=tx_regex, timeout=timeout)
def _add_filter(self, can_id: int, can_mask: int, extended: bool) -> None:
"""Add RX filter."""
cmd = f'can filter add {self._device}'
cmd += ' -e' if extended else ''
if extended:
cmd += f' {can_id:08x}'
cmd += f' {can_mask:08x}'
else:
cmd += f' {can_id:03x}'
cmd += f' {can_mask:03x}'
lines = self._shell.exec_command(cmd)
regex_compiled = re.compile(r'filter\s+ID:\s+(?P<id>\d+)')
for line in lines:
m = regex_compiled.match(line)
if m:
filter_id = int(m.group('id'))
self._filter_ids.append(filter_id)
return
raise CanOperationError('filter_id not found')
def _remove_filter(self, filter_id: int) -> None:
"""Remove RX filter."""
if filter_id in self._filter_ids:
self._filter_ids.remove(filter_id)
self._shell.exec_command(f'can filter remove {self._device} {filter_id}')
retval = self._retval()
if retval != 0:
raise CanOperationError(f'failed to remove filter ID {filter_id} (err {retval})')
def _remove_all_filters(self) -> None:
"""Remove all RX filters."""
for filter_id in self._filter_ids[:]:
self._remove_filter(filter_id)
def _apply_filters(self, filters: Optional[CanFilters]) -> None:
self._remove_all_filters()
if filters:
self._is_filtered = True
else:
# Accept all frames if no hardware filters provided
filters = [
{'can_id': 0x0, 'can_mask': 0x0},
{'can_id': 0x0, 'can_mask': 0x0, 'extended': True}
]
self._is_filtered = False
for can_filter in filters:
can_id = can_filter['can_id']
can_mask = can_filter['can_mask']
extended = can_filter['extended'] if 'extended' in can_filter else False
self._add_filter(can_id, can_mask, extended)
def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[Message], bool]:
frame_regex = r'.*' + re.escape(self._device) + \
r'\s+(?P<brs>\S)(?P<esi>\S)\s+(?P<can_id>\d+)\s+\[(?P<dlc>\d+)\]\s*(?P<data>[a-z0-9 ]*)'
lines = self._dut.readlines_until(regex=frame_regex, timeout=timeout)
msg = None
regex_compiled = re.compile(frame_regex)
for line in lines:
m = regex_compiled.match(line)
if m:
can_id = int(m.group('can_id'), 16)
ext = len(m.group('can_id')) == 8
dlc = int(m.group('dlc'))
fd = len(m.group('dlc')) == 2
brs = m.group('brs') == 'B'
esi = m.group('esi') == 'P'
data = bytearray.fromhex(m.group('data'))
msg = Message(arbitration_id=can_id,is_extended_id=ext,
data=data, dlc=dlc,
is_fd=fd, bitrate_switch=brs, error_state_indicator=esi,
channel=self._device, check=True)
logger.debug('received: %s', msg)
return msg, self._is_filtered
def shutdown(self) -> None:
if not self._is_shutdown:
super().shutdown()
self._stop()
self._remove_all_filters()

View file

@ -0,0 +1,61 @@
# Copyright (c) 2024 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
"""
Configuration of Zephyr CAN <=> host CAN test suite.
"""
import re
import logging
import pytest
from twister_harness import DeviceAdapter, Shell
from can import Bus, BusABC
from can_shell import CanShellBus
logger = logging.getLogger(__name__)
def pytest_addoption(parser) -> None:
"""Add local parser options to pytest."""
parser.addoption('--can-context', default=None,
help='Configuration context to use for python-can (default: None)')
@pytest.fixture(name='context', scope='session')
def fixture_context(request) -> str:
"""Return the name of the python-can configuration context to use."""
ctx = request.config.getoption('--can-context')
logger.info('using python-can configuration context "%s"', ctx)
return ctx
@pytest.fixture(name='chosen', scope='module')
def fixture_chosen(shell: Shell) -> str:
"""Return the name of the zephyr,canbus devicetree chosen device."""
chosen_regex = re.compile(r'zephyr,canbus:\s+(\S+)')
lines = shell.get_filtered_output(shell.exec_command('can_host chosen'))
for line in lines:
m = chosen_regex.match(line)
if m:
chosen = m.groups()[0]
logger.info('testing on zephyr,canbus chosen device "%s"', chosen)
return chosen
pytest.fail('zephyr,canbus chosen device not found or not ready')
return None
@pytest.fixture
def can_dut(dut: DeviceAdapter, shell: Shell, chosen: str) -> BusABC:
"""Return DUT CAN bus."""
bus = CanShellBus(dut, shell, chosen)
yield bus
bus.shutdown()
dut.clear_buffer()
@pytest.fixture
def can_host(context: str) -> BusABC:
"""Return host CAN bus."""
bus = Bus(config_context = context)
yield bus
bus.shutdown()

View file

@ -0,0 +1,94 @@
# Copyright (c) 2024 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
"""
Test suites for testing Zephyr CAN <=> host CAN.
"""
import logging
import pytest
import can
from can import BusABC, CanProtocol
# RX/TX timeout in seconds
TIMEOUT = 1.0
logger = logging.getLogger(__name__)
@pytest.mark.parametrize('msg', [
pytest.param(
can.Message(arbitration_id=0x10,
is_extended_id=False),
id='std_id_dlc_0'
),
pytest.param(
can.Message(arbitration_id=0x20,
data=[0xaa, 0xbb, 0xcc, 0xdd],
is_extended_id=False),
id='std_id_dlc_4'
),
pytest.param(
can.Message(arbitration_id=0x30,
data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff],
is_extended_id=True),
id='ext_id_dlc_8'
),
pytest.param(
can.Message(arbitration_id=0x40,
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
0x10, 0x11],
is_fd=True, is_extended_id=False),
id='std_id_fdf_dlc_9'
),
pytest.param(
can.Message(arbitration_id=0x50,
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
0x10, 0x11],
is_fd=True, bitrate_switch=True, is_extended_id=False),
id='std_id_fdf_brs_dlc_9'
),
])
class TestCanRxTx():
"""
Class for testing CAN RX/TX between Zephyr DUT and host.
"""
@staticmethod
def check_rx(tx: can.Message, rx: can.Message) -> None:
"""Check if received message matches transmitted message."""
# pylint: disable-next=unused-variable
__tracebackhide__ = True
if rx is None:
pytest.fail('no message received')
if not tx.equals(rx, timestamp_delta=None, check_channel=False,
check_direction=False):
pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"')
@staticmethod
def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Skip test if message format is not supported by both DUT and host."""
if msg.is_fd:
if can_dut.protocol == CanProtocol.CAN_20:
pytest.skip('CAN FD not supported by DUT')
if can_host.protocol == CanProtocol.CAN_20:
pytest.skip('CAN FD not supported by host')
def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Test DUT to host communication."""
self.skip_if_unsupported(can_dut, can_host, msg)
can_dut.send(msg, timeout=TIMEOUT)
rx = can_host.recv(timeout=TIMEOUT)
self.check_rx(msg, rx)
def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Test host to DUT communication."""
self.skip_if_unsupported(can_dut, can_host, msg)
can_host.send(msg, timeout=TIMEOUT)
rx = can_dut.recv(timeout=TIMEOUT)
self.check_rx(msg, rx)

View file

@ -0,0 +1,35 @@
/*
* Copyright (c) 2024 Vestas Wind Systems A/S
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/device.h>
#include <zephyr/shell/shell.h>
static const struct device *chosen = DEVICE_DT_GET(DT_CHOSEN(zephyr_canbus));
static int cmd_can_host_chosen(const struct shell *sh, size_t argc, char **argv)
{
ARG_UNUSED(argc);
ARG_UNUSED(argv);
if (!device_is_ready(chosen)) {
shell_error(sh, "zephyr,canbus device %s not ready", chosen->name);
return -ENODEV;
}
shell_print(sh, "zephyr,canbus: %s", chosen->name);
return 0;
}
SHELL_STATIC_SUBCMD_SET_CREATE(sub_can_host_cmds,
SHELL_CMD(chosen, NULL,
"Get zephyr,canbus chosen device name\n"
"Usage: can_host chosen",
cmd_can_host_chosen),
SHELL_SUBCMD_SET_END
);
SHELL_CMD_REGISTER(can_host, &sub_can_host_cmds, "CAN host test commands", NULL);

View file

@ -0,0 +1,17 @@
common:
tags:
- drivers
- can
depends_on: can
tests:
drivers.can.host:
filter: dt_chosen_enabled("zephyr,canbus")
harness: pytest
harness_config:
pytest_dut_scope: session
fixture: can
extra_configs:
- arch:posix:CONFIG_NATIVE_UART_0_ON_STDINOUT=y
integration_platforms:
- native_sim
- native_sim/native/64