scripts: add pytest plugin

Adding pytest plugin dedicated to running pytest tests in Zephyr
project. This plugin provides a dut fixture which allows to handle
bidirectional communication with the device under test. This version
of plugin can be used for tests dedicated to real hardware, QEMU and
native_posix simulator.

Co-authored-by: Lukasz Fundakowski <lukasz.fundakowski@nordicsemi.no>
Co-authored-by: Grzegorz Chwierut <grzegorz.chwierut@nordicsemi.no>
Co-authored-by: Katarzyna Giadla <katarzyna.giadla@nordicsemi.no>
Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
This commit is contained in:
Piotr Golyzniak 2023-05-26 11:33:25 +02:00 committed by Anas Nashif
commit 8c4bfcf324
30 changed files with 2189 additions and 0 deletions

View file

@ -0,0 +1,62 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Pycharm
.idea/
# VSCode
.vscode/

View file

@ -0,0 +1,47 @@
==============
Pytest Twister harness
==============
Installation
------------
If you plan to use this plugin with Twister, then you don't need to install it
separately by pip. When Twister uses this plugin for pytest tests, it updates
`PYTHONPATH` variable, and then extends pytest command by
`-p twister_harness.plugin` argument.
Usage
-----
Run exemplary test shell application by Twister:
.. code-block:: sh
cd ${ZEPHYR_BASE}
# native_posix & QEMU
./scripts/twister -p native_posix -p qemu_x86 -T samples/subsys/testsuite/pytest/shell
# hardware
./scripts/twister -p nrf52840dk_nrf52840 --device-testing --device-serial /dev/ttyACM0 -T samples/subsys/testsuite/pytest/shell
or build shell application by west and call pytest directly:
.. code-block:: sh
export PYTHONPATH=${ZEPHYR_BASE}/scripts/pylib/pytest-twister-harness/src:${PYTHONPATH}
cd ${ZEPHYR_BASE}/samples/subsys/testsuite/pytest/shell
# native_posix
west build -p -b native_posix -- -DCONFIG_NATIVE_UART_0_ON_STDINOUT=y
pytest --twister-harness --device-type=native --build-dir=build -p twister_harness.plugin
# QEMU
west build -p -b qemu_x86 -- -DQEMU_PIPE=qemu-fifo
pytest --twister-harness --device-type=qemu --build-dir=build -p twister_harness.plugin
# hardware
west build -p -b nrf52840dk_nrf52840
pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin

View file

@ -0,0 +1,6 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = [
"setuptools >= 48.0.0",
"wheel",
]

View file

@ -0,0 +1,33 @@
[metadata]
name = pytest-twister-harness
version = attr: twister_harness.__version__
description = Plugin for pytest to run tests which require interaction with real and simulated devices
long_description = file: README.rst
python_requires = ~=3.8
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Developers
Topic :: Software Development :: Embedded Systems
Topic :: Software Development :: Quality Assurance
Operating System :: Posix :: Linux
Operating System :: Microsoft :: Windows
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
[options]
packages = find:
package_dir =
=src
install_requires =
psutil
pyserial
pytest>=7.0.0
[options.packages.find]
where = src
[options.entry_points]
pytest11 =
twister_harness = twister_harness.plugin

View file

@ -0,0 +1,7 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import setuptools
setuptools.setup()

View file

@ -0,0 +1,5 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
__version__ = '0.0.1'

View file

@ -0,0 +1,8 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
QEMU_FIFO_FILE_NAME: str = 'qemu-fifo'
END_OF_DATA = object() #: used for indicating that there will be no more data in queue

View file

@ -0,0 +1,3 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,94 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import logging
import os
from typing import Generator
from twister_harness.log_files.log_file import LogFile, NullLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class DeviceAbstract(abc.ABC):
"""Class defines an interface for all devices."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
"""
:param device_config: device configuration
"""
self.device_config: DeviceConfig = device_config
self.handler_log_file: LogFile = NullLogFile.create()
self.device_log_file: LogFile = NullLogFile.create()
def __repr__(self) -> str:
return f'{self.__class__.__name__}()'
@property
def env(self) -> dict[str, str]:
env = os.environ.copy()
return env
@abc.abstractmethod
def connect(self, timeout: float = 1) -> None:
"""Connect with the device (e.g. via UART)"""
@abc.abstractmethod
def disconnect(self) -> None:
"""Close a connection with the device"""
@abc.abstractmethod
def generate_command(self) -> None:
"""
Generate command which will be used during flashing or running device.
"""
def flash_and_run(self, timeout: float = 60.0) -> None:
"""
Flash and run application on a device.
:param timeout: time out in seconds
"""
@property
@abc.abstractmethod
def iter_stdout(self) -> Generator[str, None, None]:
"""Iterate stdout from a device."""
@abc.abstractmethod
def write(self, data: bytes) -> None:
"""Write data bytes to device"""
@abc.abstractmethod
def initialize_log_files(self):
"""
Initialize file to store logs.
"""
def stop(self) -> None:
"""Stop device."""
# @abc.abstractmethod
# def read(self, size=1) -> None:
# """Read size bytes from device"""
# def read_until(self, expected, size=None):
# """Read until an expected bytes sequence is found"""
# lenterm = len(expected)
# line = bytearray()
# while True:
# c = self.read(1)
# if c:
# line += c
# if line[-lenterm:] == expected:
# break
# if size is not None and len(line) >= size:
# break
# else:
# break
# return bytes(line)

View file

@ -0,0 +1,49 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
from typing import Type
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.device.hardware_adapter import HardwareAdapter
from twister_harness.device.qemu_adapter import QemuAdapter
from twister_harness.device.simulator_adapter import (
CustomSimulatorAdapter,
NativeSimulatorAdapter,
UnitSimulatorAdapter,
)
from twister_harness.exceptions import TwisterHarnessException
logger = logging.getLogger(__name__)
class DeviceFactory:
_devices: dict[str, Type[DeviceAbstract]] = {}
@classmethod
def discover(cls):
"""Return available devices."""
@classmethod
def register_device_class(cls, name: str, klass: Type[DeviceAbstract]):
if name not in cls._devices:
cls._devices[name] = klass
@classmethod
def get_device(cls, name: str) -> Type[DeviceAbstract]:
logger.debug('Get device type "%s"', name)
try:
return cls._devices[name]
except KeyError as e:
logger.error('There is no device with name "%s"', name)
raise TwisterHarnessException(f'There is no device with name "{name}"') from e
DeviceFactory.register_device_class('custom', CustomSimulatorAdapter)
DeviceFactory.register_device_class('native', NativeSimulatorAdapter)
DeviceFactory.register_device_class('unit', UnitSimulatorAdapter)
DeviceFactory.register_device_class('hardware', HardwareAdapter)
DeviceFactory.register_device_class('qemu', QemuAdapter)

View file

@ -0,0 +1,91 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import io
import logging
import os
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
class FifoHandler:
"""Creates FIFO file for reading and writing."""
def __init__(self, fifo: str | Path):
"""
:param fifo: path to fifo file
"""
self._fifo_in = str(fifo) + '.in'
self._fifo_out = str(fifo) + '.out'
self.file_in: io.BytesIO | None = None
self.file_out: io.BytesIO | None = None
self._threads: list[threading.Thread] = []
@staticmethod
def _make_fifo_file(filename: str) -> None:
if os.path.exists(filename):
os.unlink(filename)
os.mkfifo(filename)
logger.debug('Created new fifo file: %s', filename)
@property
def is_open(self) -> bool:
try:
return bool(
self.file_in is not None and self.file_out is not None
and self.file_in.fileno() and self.file_out.fileno()
)
except ValueError:
return False
def connect(self):
self._make_fifo_file(self._fifo_in)
self._make_fifo_file(self._fifo_out)
self._threads = [
threading.Thread(target=self._open_fifo_in, daemon=True),
threading.Thread(target=self._open_fifo_out, daemon=True)
]
for t in self._threads:
t.start()
def _open_fifo_in(self):
self.file_in = open(self._fifo_in, 'wb', buffering=0)
def _open_fifo_out(self):
self.file_out = open(self._fifo_out, 'rb', buffering=0)
def disconnect(self):
if self.file_in is not None:
self.file_in.close()
if self.file_out is not None:
self.file_out.close()
for t in self._threads:
t.join(timeout=1)
logger.debug(f'Unlink {self._fifo_in}')
os.unlink(self._fifo_in)
logger.debug(f'Unlink {self._fifo_out}')
os.unlink(self._fifo_out)
def read(self, __size: int | None = None) -> bytes:
return self.file_out.read(__size) # type: ignore[union-attr]
def readline(self, __size: int | None = None) -> bytes:
line = self.file_out.readline(__size) # type: ignore[union-attr]
return line
def write(self, __buffer: bytes) -> int:
return self.file_in.write(__buffer) # type: ignore[union-attr]
def flush(self):
if self.file_in:
self.file_in.flush()
if self.file_out:
self.file_out.flush()
def fileno(self) -> int:
return self.file_out.fileno() # type: ignore[union-attr]

View file

@ -0,0 +1,226 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import pty
import re
import shutil
import subprocess
from datetime import datetime
from typing import Generator
import serial
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class HardwareAdapter(DeviceAbstract):
"""Adapter class for real device."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
super().__init__(device_config, **kwargs)
self.connection: serial.Serial | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'env': self.env,
}
self.serial_pty_proc: subprocess.Popen | None = None
def connect(self, timeout: float = 1) -> None:
"""
Open serial connection.
:param timeout: Read timeout value in seconds
"""
if self.connection:
# already opened
return
if self.device_config.pre_script:
self.run_custom_script(self.device_config.pre_script, 30)
serial_name = self._open_serial_pty() or self.device_config.serial
logger.info('Opening serial connection for %s', serial_name)
try:
self.connection = serial.Serial(
serial_name,
baudrate=self.device_config.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=timeout
)
except serial.SerialException as e:
logger.exception('Cannot open connection: %s', e)
self._close_serial_pty()
raise
self.connection.flush()
def disconnect(self) -> None:
"""Close serial connection."""
if self.connection:
serial_name = self.connection.port
self.connection.close()
self.connection = None
logger.info('Closed serial connection for %s', serial_name)
self._close_serial_pty()
def stop(self) -> None:
if self.device_config.post_script:
self.run_custom_script(self.device_config.post_script, 30)
def _open_serial_pty(self) -> str | None:
"""Open a pty pair, run process and return tty name"""
if not self.device_config.serial_pty:
return None
master, slave = pty.openpty()
try:
self.serial_pty_proc = subprocess.Popen(
re.split(',| ', self.device_config.serial_pty),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as e:
logger.exception('Failed to run subprocess %s, error %s',
self.device_config.serial_pty, str(e))
raise
return os.ttyname(slave)
def _close_serial_pty(self) -> None:
"""Terminate the process opened for serial pty script"""
if self.serial_pty_proc:
self.serial_pty_proc.terminate()
self.serial_pty_proc.communicate()
logger.info('Process %s terminated', self.device_config.serial_pty)
self.serial_pty_proc = None
def generate_command(self) -> None:
"""Return command to flash."""
west = shutil.which('west')
if west is None:
raise TwisterHarnessException('west not found')
command = [
west,
'flash',
'--skip-rebuild',
'--build-dir', str(self.device_config.build_dir),
]
command_extra_args = []
if self.device_config.west_flash_extra_args:
command_extra_args.extend(self.device_config.west_flash_extra_args)
if runner := self.device_config.runner:
command.extend(['--runner', runner])
if board_id := self.device_config.id:
if runner == 'pyocd':
command_extra_args.append('--board-id')
command_extra_args.append(board_id)
elif runner == 'nrfjprog':
command_extra_args.append('--dev-id')
command_extra_args.append(board_id)
elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'hla_serial {board_id}')
elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'cmsis_dap_serial {board_id}')
elif runner == 'jlink':
command.append(f'--tool-opt=-SelectEmuBySN {board_id}')
elif runner == 'stm32cubeprogrammer':
command.append(f'--tool-opt=sn={board_id}')
if command_extra_args:
command.append('--')
command.extend(command_extra_args)
self.command = command
@staticmethod
def run_custom_script(script, timeout):
with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
try:
stdout, stderr = proc.communicate(timeout=timeout)
logger.debug(stdout.decode())
if proc.returncode != 0:
logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
logger.error("{} timed out".format(script))
def flash_and_run(self, timeout: float = 60.0) -> None:
if not self.command:
msg = 'Flash command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
if self.device_config.id:
logger.info('Flashing device %s', self.device_config.id)
log_command(logger, 'Flashing command', self.command, level=logging.INFO)
try:
process = subprocess.Popen(
self.command,
**self.process_kwargs
)
except subprocess.CalledProcessError:
logger.error('Error while flashing device')
raise TwisterHarnessException('Could not flash device')
else:
stdout = stderr = None
try:
stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout)
except subprocess.TimeoutExpired:
process.kill()
finally:
if stdout:
self.device_log_file.handle(data=stdout)
logger.debug(stdout.decode(errors='ignore'))
if stderr:
self.device_log_file.handle(data=stderr)
if process.returncode == 0:
logger.info('Flashing finished')
else:
raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
finally:
if self.device_config.post_flash_script:
self.run_custom_script(self.device_config.post_flash_script, 30)
@property
def iter_stdout(self) -> Generator[str, None, None]:
"""Return output from serial."""
if not self.connection:
return
self.connection.flush()
self.connection.reset_input_buffer()
while self.connection and self.connection.is_open:
stream = self.connection.readline()
self.handler_log_file.handle(data=stream)
yield stream.decode(errors='ignore').strip()
def write(self, data: bytes) -> None:
"""Write data to serial"""
if self.connection:
self.connection.write(data)
def initialize_log_files(self) -> None:
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
self.device_log_file.handle(start_msg)

View file

@ -0,0 +1,196 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from typing import Generator
import psutil
from twister_harness.constants import QEMU_FIFO_FILE_NAME
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.device.fifo_handler import FifoHandler
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class QemuAdapter(DeviceAbstract):
"""Adapter for Qemu simulator"""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
super().__init__(device_config, **kwargs)
self._process: subprocess.Popen | None = None
self._process_ended_with_timeout: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self._emulation_was_finished: bool = False
self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
self.command: list[str] = []
self.timeout: float = 60 # running timeout in seconds
self.booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds
def generate_command(self) -> None:
"""Return command to flash."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
else:
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
def connect(self, timeout: float = 1) -> None:
logger.debug('Opening connection')
self.connection.connect()
def flash_and_run(self, timeout: float = 60.0) -> None:
self.timeout = timeout
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True)
self._thread.start()
# Give a time to start subprocess before test is executed
time.sleep(0.1)
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_command(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
try:
self._process = subprocess.Popen(
self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=self.env
)
stdout, _ = self._process.communicate(timeout=timeout)
return_code: int = self._process.returncode
except subprocess.TimeoutExpired:
logger.error('Running simulation finished after timeout: %s seconds', timeout)
self._process_ended_with_timeout = True
# we don't want to raise Timeout exception, but allowed a test to parse the output
# and set proper status
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
else:
if return_code == 0:
logger.info('Running simulation finished with return code %s', return_code)
elif return_code == -15:
logger.info('Running simulation terminated')
else:
logger.warning('Running simulation finished with return code %s', return_code)
for line in stdout.decode('utf-8').split('\n'):
logger.info(line)
finally:
self._emulation_was_finished = True
def disconnect(self):
logger.debug('Closing connection')
self.connection.disconnect()
def stop(self) -> None:
"""Stop device."""
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s', self._process.pid)
# kill all child subprocesses
for child in psutil.Process(self._process.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
# kill subprocess if it is still running
os.kill(self._process.pid, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
def _wait_for_fifo(self):
for _ in range(int(self.booting_timeout_in_ms / 10) or 1):
if self.connection.is_open:
break
elif self._emulation_was_finished:
msg = 'Problem with starting QEMU'
logger.error(msg)
raise TwisterHarnessException(msg)
time.sleep(0.1)
else:
msg = 'Problem with starting QEMU - fifo file was not created yet'
logger.error(msg)
raise TwisterHarnessException(msg)
@property
def iter_stdout(self) -> Generator[str, None, None]:
if not self.connection:
return
# fifo file can be not create yet, so we need to wait for a while
self._wait_for_fifo()
# create unblocking reading from fifo file
q: Queue = Queue()
def read_lines():
while self.connection and self.connection.is_open:
try:
line = self.connection.readline().decode('UTF-8').strip()
except (OSError, ValueError):
# file could be closed already so we should stop reading
break
if len(line) != 0:
q.put(line)
t = threading.Thread(target=read_lines, daemon=True)
t.start()
end_time = time.time() + self.timeout
try:
while True:
try:
stream = q.get(timeout=0.1)
self.handler_log_file.handle(data=stream + '\n')
yield stream
except Empty: # timeout appeared
pass
if time.time() > end_time:
break
except KeyboardInterrupt:
# let thread to finish smoothly
pass
finally:
t.join(1)
def write(self, data: bytes) -> None:
"""Write data to serial"""
if self.connection:
self.connection.write(data)
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)

View file

@ -0,0 +1,220 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import asyncio
import asyncio.subprocess
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from asyncio.base_subprocess import BaseSubprocessTransport
from datetime import datetime
from functools import wraps
from pathlib import Path
from queue import Queue
from typing import Generator
import psutil
from twister_harness.constants import END_OF_DATA
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
# Workaround for RuntimeError: Event loop is closed
# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore
logger = logging.getLogger(__name__)
class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
"""
:param twister_config: twister configuration
"""
super().__init__(device_config, **kwargs)
self._process: asyncio.subprocess.Process | None = None
self._process_ended_with_timeout: bool = False
self.queue: Queue = Queue()
self._stop_job: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': asyncio.subprocess.PIPE,
'stderr': asyncio.subprocess.STDOUT,
'stdin': asyncio.subprocess.PIPE,
'env': self.env,
}
self._data_to_send: bytes | None = None
def connect(self, timeout: float = 1) -> None:
pass # pragma: no cover
def flash_and_run(self, timeout: float = 60.0) -> None:
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
self._thread.start()
# Give a time to start subprocess before test is executed
time.sleep(0.1)
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_simulation(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
try:
return_code: int = asyncio.run(self._run_command(timeout=timeout))
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
else:
if return_code == 0:
logger.info('Running simulation finished with return code %s', return_code)
elif return_code == -15:
logger.info('Running simulation stopped interrupted by user')
else:
logger.warning('Running simulation finished with return code %s', return_code)
finally:
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
async def _run_command(self, timeout: float = 60.):
assert isinstance(self.command, (list, tuple, set))
# to avoid stupid and difficult to debug mistakes
# we are using asyncio to run subprocess to be able to read from stdout
# without blocking while loop (readline with timeout)
self._process = await asyncio.create_subprocess_exec(
*self.command,
**self.process_kwargs
)
logger.debug('Started subprocess with PID %s', self._process.pid)
end_time = time.time() + timeout
while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr]
if line := await self._read_line(timeout=0.1):
self.queue.put(line.decode('utf-8').strip())
if time.time() > end_time:
self._process_ended_with_timeout = True
logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout')
break
if self._data_to_send:
self._process.stdin.write(self._data_to_send) # type: ignore[union-attr]
await self._process.stdin.drain() # type: ignore[union-attr]
self._data_to_send = None
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
return await self._process.wait()
async def _read_line(self, timeout=0.1) -> bytes | None:
try:
return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr]
except asyncio.TimeoutError:
return None
def disconnect(self):
pass # pragma: no cover
def stop(self) -> None:
"""Stop device."""
self._stop_job = True
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s', self._process.pid)
# kill subprocess if it is still running
for child in psutil.Process(self._process.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
# kill subprocess if it is still running
os.kill(self._process.pid, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
@property
def iter_stdout(self) -> Generator[str, None, None]:
"""Return output from serial."""
while True:
stream = self.queue.get()
if stream == END_OF_DATA:
logger.debug('No more data from running process')
break
self.handler_log_file.handle(data=stream + '\n')
yield stream
self.queue.task_done()
def write(self, data: bytes) -> None:
"""Write data to serial"""
while self._data_to_send:
# wait data will be write to self._process.stdin.write
time.sleep(0.1)
self._data_to_send = data
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
class NativeSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run `zephyr.exe` simulation"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [
str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
]
class UnitSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run unit tests"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
class CustomSimulatorAdapter(SimulatorAdapterBase):
def generate_command(self) -> None:
"""Return command to run."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
else:
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']

View file

@ -0,0 +1,6 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
class TwisterHarnessException(Exception):
"""General Twister harness exception."""

View file

@ -0,0 +1,3 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,39 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Generator, Type
import pytest
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.device.factory import DeviceFactory
from twister_harness.twister_harness_config import DeviceConfig, TwisterHarnessConfig
logger = logging.getLogger(__name__)
@pytest.fixture(scope='function')
def dut(request: pytest.FixtureRequest) -> Generator[DeviceAbstract, None, None]:
"""Return device instance."""
twister_harness_config: TwisterHarnessConfig = request.config.twister_harness_config # type: ignore
device_config: DeviceConfig = twister_harness_config.devices[0]
device_type = device_config.type
device_class: Type[DeviceAbstract] = DeviceFactory.get_device(device_type)
device = device_class(device_config)
try:
device.connect()
device.generate_command()
device.initialize_log_files()
device.flash_and_run()
device.connect()
yield device
except KeyboardInterrupt:
pass
finally: # to make sure we close all running processes after user broke execution
device.disconnect()
device.stop()

View file

@ -0,0 +1,40 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os.path
import platform
import shlex
_WINDOWS = platform.system() == 'Windows'
logger = logging.getLogger(__name__)
def log_command(logger: logging.Logger, msg: str, args: list, level: int = logging.DEBUG):
"""
Platform-independent helper for logging subprocess invocations.
Will log a command string that can be copy/pasted into a POSIX
shell on POSIX platforms. This is not available on Windows, so
the entire args array is logged instead.
:param logger: logging.Logger to use
:param msg: message to associate with the command
:param args: argument list as passed to subprocess module
:param level: log level
"""
msg = f'{msg}: %s'
if _WINDOWS:
logger.log(level, msg, str(args))
else:
logger.log(level, msg, shlex.join(args))
def normalize_filename(filename: str) -> str:
filename = os.path.expanduser(os.path.expandvars(filename))
filename = os.path.normpath(os.path.abspath(filename))
return filename

View file

@ -0,0 +1,71 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging.config
import os
import pytest
def configure_logging(config: pytest.Config) -> None:
"""Configure logging."""
output_dir = config.option.output_dir
os.makedirs(output_dir, exist_ok=True)
log_file = os.path.join(output_dir, 'twister_harness.log')
if hasattr(config, 'workerinput'):
worker_id = config.workerinput['workerid']
log_file = os.path.join(output_dir, f'twister_harness_{worker_id}.log')
log_format = '%(asctime)s:%(levelname)s:%(name)s: %(message)s'
log_level = config.getoption('--log-level') or config.getini('log_level') or logging.INFO
log_file = config.getoption('--log-file') or config.getini('log_file') or log_file
log_format = config.getini('log_cli_format') or log_format
default_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': log_format,
},
'simply': {
'format': '%(asctime)s.%(msecs)d:%(levelname)s: %(message)s',
'datefmt': '%H:%M:%S'
}
},
'handlers': {
'file': {
'class': 'logging.FileHandler',
'level': 'DEBUG',
'formatter': 'standard',
'filters': [],
'filename': log_file,
'encoding': 'utf8',
'mode': 'w'
},
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'simply',
'filters': [],
}
},
'loggers': {
'': {
'handlers': ['console', 'file'],
'level': 'WARNING',
'propagate': False
},
'twister_harness': {
'handlers': ['console', 'file'],
'level': log_level,
'propagate': False,
}
}
}
logging.config.dictConfig(default_config)

View file

@ -0,0 +1,3 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,71 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import sys
from pathlib import Path
from twister_harness.helper import normalize_filename
logger = logging.getLogger(__name__)
class LogFile:
"""Base class for logging files."""
name = 'uninitialized'
def __init__(self, filename: str | Path) -> None:
self.default_encoding = sys.getdefaultencoding()
self.filename = filename
@staticmethod
def get_log_filename(build_dir: Path | str, name: str) -> str:
"""
:param build_dir: path to building directory.
:param name: name of the logging file.
:return: path to logging file
"""
if not build_dir:
filename = os.devnull
else:
name = name + '.log'
filename = os.path.join(build_dir, name)
filename = normalize_filename(filename=filename)
return filename
def handle(self, data: str | bytes) -> None:
"""Save information to logging file."""
if data:
data = data.decode(encoding=self.default_encoding) if isinstance(data, bytes) else data
with open(file=self.filename, mode='a+', encoding=self.default_encoding) as log_file:
log_file.write(data) # type: ignore[arg-type]
@classmethod
def create(cls, build_dir: Path | str = '') -> LogFile:
filename = cls.get_log_filename(build_dir=build_dir, name=cls.name)
return cls(filename)
class BuildLogFile(LogFile):
"""Save logs from the building."""
name = 'build'
class HandlerLogFile(LogFile):
"""Save output from a device."""
name = 'handler'
class DeviceLogFile(LogFile):
"""Save errors during flashing onto device."""
name = 'device'
class NullLogFile(LogFile):
"""Placeholder for no initialized log file"""
def handle(self, data: str | bytes) -> None:
"""This method does nothing."""

View file

@ -0,0 +1,148 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
from pathlib import Path
import pytest
from twister_harness.log import configure_logging
from twister_harness.twister_harness_config import TwisterHarnessConfig
logger = logging.getLogger(__name__)
pytest_plugins = (
'twister_harness.fixtures.dut'
)
def pytest_addoption(parser: pytest.Parser):
twister_harness_group = parser.getgroup('Twister harness')
twister_harness_group.addoption(
'--twister-harness',
action='store_true',
default=False,
help='Activate Twister harness plugin'
)
parser.addini(
'twister_harness',
'Activate Twister harness plugin',
type='bool'
)
twister_harness_group.addoption(
'-O',
'--outdir',
metavar='PATH',
dest='output_dir',
help='Output directory for logs. If not provided then use '
'--build-dir path as default.'
)
twister_harness_group.addoption(
'--platform',
help='Choose specific platform'
)
twister_harness_group.addoption(
'--device-type',
choices=('native', 'qemu', 'hardware', 'unit', 'custom'),
help='Choose type of device (hardware, qemu, etc.)'
)
twister_harness_group.addoption(
'--device-serial',
help='Serial device for accessing the board '
'(e.g., /dev/ttyACM0)'
)
twister_harness_group.addoption(
'--device-serial-baud',
type=int,
default=115200,
help='Serial device baud rate (default 115200)'
)
twister_harness_group.addoption(
'--runner',
help='use the specified west runner (pyocd, nrfjprog, etc)'
)
twister_harness_group.addoption(
'--device-id',
help='ID of connected hardware device (for example 000682459367)'
)
twister_harness_group.addoption(
'--device-product',
help='Product name of connected hardware device (for example "STM32 STLink")'
)
twister_harness_group.addoption(
'--device-serial-pty',
metavar='PATH',
help='Script for controlling pseudoterminal. '
'E.g --device-testing --device-serial-pty=<script>'
)
twister_harness_group.addoption(
'--west-flash-extra-args',
help='Extend parameters for west flash. '
'E.g. --west-flash-extra-args="--board-id=foobar,--erase" '
'will translate to "west flash -- --board-id=foobar --erase"'
)
twister_harness_group.addoption(
'--flashing-timeout',
type=int,
default=60,
help='Set timeout for the device flash operation in seconds.'
)
twister_harness_group.addoption(
'--build-dir',
dest='build_dir',
metavar='PATH',
help='Directory with built application.'
)
twister_harness_group.addoption(
'--binary-file',
metavar='PATH',
help='Path to file which should be flashed.'
)
twister_harness_group.addoption(
'--pre-script',
metavar='PATH'
)
twister_harness_group.addoption(
'--post-script',
metavar='PATH'
)
twister_harness_group.addoption(
'--post-flash-script',
metavar='PATH'
)
def pytest_configure(config: pytest.Config):
if config.getoption('help'):
return
if not (config.getoption('twister_harness') or config.getini('twister_harness')):
return
validate_options(config)
if config.option.output_dir is None:
config.option.output_dir = config.option.build_dir
config.option.output_dir = _normalize_path(config.option.output_dir)
# create output directory if not exists
os.makedirs(config.option.output_dir, exist_ok=True)
configure_logging(config)
config.twister_harness_config = TwisterHarnessConfig.create(config) # type: ignore
def validate_options(config: pytest.Config) -> None:
"""Verify if user provided proper options"""
# TBD
def _normalize_path(path: str | Path) -> str:
path = os.path.expanduser(os.path.expandvars(path))
path = os.path.normpath(os.path.abspath(path))
return path

View file

@ -0,0 +1,75 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
import pytest
logger = logging.getLogger(__name__)
@dataclass
class DeviceConfig:
platform: str = ''
type: str = ''
serial: str = ''
baud: int = 115200
runner: str = ''
id: str = ''
product: str = ''
serial_pty: str = ''
west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
flashing_timeout: int = 60 # [s]
build_dir: Path | str = ''
binary_file: Path | str = ''
name: str = ''
pre_script: str = ''
post_script: str = ''
post_flash_script: str = ''
@dataclass
class TwisterHarnessConfig:
"""Store Twister harness configuration to have easy access in test."""
output_dir: Path = Path('twister_harness_out')
devices: list[DeviceConfig] = field(default_factory=list, repr=False)
@classmethod
def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
"""Create new instance from pytest.Config."""
output_dir: Path = config.option.output_dir
devices = []
west_flash_extra_args: list[str] = []
if config.option.west_flash_extra_args:
west_flash_extra_args = [w.strip() for w in config.option.west_flash_extra_args.split(',')]
device_from_cli = DeviceConfig(
platform=config.option.platform,
type=config.option.device_type,
serial=config.option.device_serial,
baud=config.option.device_serial_baud,
runner=config.option.runner,
id=config.option.device_id,
product=config.option.device_product,
serial_pty=config.option.device_serial_pty,
west_flash_extra_args=west_flash_extra_args,
flashing_timeout=config.option.flashing_timeout,
build_dir=config.option.build_dir,
binary_file=config.option.binary_file,
pre_script=config.option.pre_script,
post_script=config.option.post_script,
post_flash_script=config.option.post_flash_script,
)
devices.append(device_from_cli)
return cls(
output_dir=output_dir,
devices=devices
)

View file

@ -0,0 +1,23 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from pathlib import Path
import pytest
# pytest_plugins = ['pytester']
@pytest.fixture
def resources(request: pytest.FixtureRequest) -> Path:
"""Return path to `data` folder"""
return Path(request.module.__file__).parent.joinpath('data')
@pytest.fixture(scope='function')
def copy_example(pytester) -> Path:
"""Copy example tests to temporary directory and return path the temp directory."""
resources_dir = Path(__file__).parent / 'data'
pytester.copy_example(str(resources_dir))
return pytester.path

View file

@ -0,0 +1,100 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import sys
import threading
import time
from argparse import ArgumentParser
content = """
The Zen of Python, by Tim Peters
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
class FifoFile:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.thread = None
self.file = None
self.logger = logging.getLogger(__name__)
def _open(self):
self.logger.info(f'Creating fifo file: {self.filename}')
end_time = time.time() + 2
while not os.path.exists(self.filename):
time.sleep(0.1)
if time.time() > end_time:
self.logger.error(f'Did not able create fifo file: {self.filename}')
return
self.file = open(self.filename, self.mode, buffering=0)
self.logger.info(f'File created: {self.filename}')
def open(self):
self.thread = threading.Thread(target=self._open(), daemon=True)
self.thread.start()
def write(self, data):
if self.file:
self.file.write(data)
def read(self):
if self.file:
return self.file.readline()
def close(self):
if self.file:
self.file.close()
self.thread.join(1)
self.logger.info(f'Closed file: {self.filename}')
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def main():
logging.basicConfig(level='DEBUG')
parser = ArgumentParser()
parser.add_argument('file')
args = parser.parse_args()
read_path = args.file + '.in'
write_path = args.file + '.out'
logger = logging.getLogger(__name__)
logger.info('Start')
with FifoFile(read_path, 'rb'), FifoFile(write_path, 'wb') as wf:
for line in content.split('\n'):
wf.write(f'{line}\n'.encode('utf-8'))
time.sleep(0.1)
return 0
if __name__ == '__main__':
sys.exit(main())

View file

@ -0,0 +1,68 @@
#!/usr/bin/python
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
"""
Simply mock for bash script to use with unit tests.
"""
import sys
import time
from argparse import ArgumentParser
s = """
The Zen of Python, by Tim Peters
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
def main() -> int:
parser = ArgumentParser()
parser.add_argument('--sleep', action='store', default=0, type=float)
parser.add_argument('--long-sleep', action='store_true')
parser.add_argument('--return-code', action='store', default=0, type=int)
parser.add_argument('--exception', action='store_true')
args = parser.parse_args()
if args.exception:
# simulate crashing application
raise Exception
if args.long_sleep:
# prints data and wait for certain time
for line in s.split('\n'):
print(line, flush=True)
time.sleep(args.sleep)
else:
# prints lines with delay
for line in s.split('\n'):
print(line, flush=True)
time.sleep(args.sleep)
print('End of script', flush=True)
print('Returns with code', args.return_code, flush=True)
return args.return_code
if __name__ == '__main__':
sys.exit(main())

View file

@ -0,0 +1,206 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import os
from pathlib import Path
from unittest import mock
import pytest
from twister_harness.device.hardware_adapter import HardwareAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
@pytest.fixture(name='device')
def fixture_adapter() -> HardwareAdapter:
device_config = DeviceConfig(
runner='runner',
build_dir=Path('build'),
platform='platform',
id='p_id',
)
return HardwareAdapter(device_config)
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_1(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == ['west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'runner']
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_2(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'pyocd'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd', '--', '--board-id', 'p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_raise_exception_if_west_is_not_installed(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = None
with pytest.raises(TwisterHarnessException, match='west not found'):
device.generate_command()
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_3(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'nrfjprog'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_4(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'openocd'
device.device_config.product = 'STM32 STLink'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
'--', '--cmd-pre-init', 'hla_serial p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_5(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'openocd'
device.device_config.product = 'EDBG CMSIS-DAP'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
'--', '--cmd-pre-init', 'cmsis_dap_serial p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_6(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'jlink'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'jlink',
'--tool-opt=-SelectEmuBySN p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_7(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'stm32cubeprogrammer'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'stm32cubeprogrammer',
'--tool-opt=sn=p_id'
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_8(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.runner = 'openocd'
device.device_config.product = 'STLINK-V3'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build',
'--runner', 'openocd', '--', '--cmd-pre-init', 'hla_serial p_id'
]
def test_if_hardware_adapter_raises_exception_empty_command(device: HardwareAdapter) -> None:
device.command = []
exception_msg = 'Flash command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run()
def test_handler_and_device_log_correct_initialized_on_hardware(device: HardwareAdapter, tmp_path: Path) -> None:
device.device_config.build_dir = tmp_path
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, DeviceLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
assert device.device_log_file.filename.endswith('device.log') # type: ignore[union-attr]
@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
def test_device_log_correct_error_handle(patched_popen, device: HardwareAdapter, tmp_path: Path) -> None:
popen_mock = mock.Mock()
popen_mock.communicate.return_value = (b'', b'flashing error')
patched_popen.return_value = popen_mock
device.device_config.build_dir = tmp_path
device.initialize_log_files()
device.command = [
'west', 'flash', '--skip-rebuild', '--build-dir', str(tmp_path),
'--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
]
with pytest.raises(expected_exception=TwisterHarnessException, match='Could not flash device p_id'):
device.flash_and_run()
assert os.path.isfile(device.device_log_file.filename)
with open(device.device_log_file.filename, 'r') as file:
assert 'flashing error' in file.readlines()
@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
@mock.patch('twister_harness.device.hardware_adapter.serial.Serial')
def test_if_hardware_adapter_uses_serial_pty(
patched_serial, patched_popen, device: HardwareAdapter, monkeypatch: pytest.MonkeyPatch
):
device.device_config.serial_pty = 'script.py'
popen_mock = mock.Mock()
popen_mock.communicate.return_value = (b'output', b'error')
patched_popen.return_value = popen_mock
monkeypatch.setattr('twister_harness.device.hardware_adapter.pty.openpty', lambda: (123, 456))
monkeypatch.setattr('twister_harness.device.hardware_adapter.os.ttyname', lambda x: f'/pty/ttytest/{x}')
serial_mock = mock.Mock()
serial_mock.port = '/pty/ttytest/456'
patched_serial.return_value = serial_mock
device.connect()
assert device.connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
assert device.serial_pty_proc
patched_popen.assert_called_with(
['script.py'],
stdout=123,
stdin=123,
stderr=123
)
device.disconnect()
assert not device.connection
assert not device.serial_pty_proc
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_with_west_flash(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.west_flash_extra_args = ['--board-id=foobar', '--erase']
device.device_config.runner = 'pyocd'
device.device_config.id = ''
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd',
'--', '--board-id=foobar', '--erase'
]

View file

@ -0,0 +1,98 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import os
import subprocess
from typing import Generator
from unittest import mock
from unittest.mock import patch
import pytest
from twister_harness.device.qemu_adapter import QemuAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
from twister_harness.twister_harness_config import DeviceConfig
@pytest.fixture(name='device')
def fixture_device_adapter(tmp_path) -> Generator[QemuAdapter, None, None]:
build_dir = tmp_path / 'build_dir'
adapter = QemuAdapter(DeviceConfig(build_dir=build_dir))
yield adapter
try:
adapter.stop() # to make sure all running processes are closed
except TwisterHarnessException:
pass
@patch('shutil.which', return_value='/usr/bin/west')
def test_if_generate_command_creates_proper_command(patched_which):
adapter = QemuAdapter(DeviceConfig(build_dir='build_dir'))
adapter.generate_command()
assert adapter.command == ['/usr/bin/west', 'build', '-d', 'build_dir', '-t', 'run']
@patch('shutil.which', return_value=None)
def test_if_generate_command_creates_empty_listy_if_west_is_not_installed(patched_which):
adapter = QemuAdapter(DeviceConfig())
adapter.generate_command()
assert adapter.command == []
def test_if_qemu_adapter_raises_exception_for_empty_command(device) -> None:
device.command = []
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run(timeout=0.1)
def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
device.command = ['dummy']
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
device.flash_and_run(timeout=0.1)
device.stop()
assert device._exc is not None
assert isinstance(device._exc, TwisterHarnessException)
@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
def test_if_qemu_adapter_raises_exception_when_subprocess_raised_an_error(patched_run, device):
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Exception message'):
device.flash_and_run(timeout=0.1)
device.stop()
def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
fifo_file_path = str(tmp_path / 'qemu-fifo')
script_path = resources.joinpath('fifo_mock.py')
device = QemuAdapter(DeviceConfig(build_dir=str(tmp_path)))
device.booting_timeout_in_ms = 1000
device.command = ['python', str(script_path), fifo_file_path]
device.connect()
device.initialize_log_files()
device.flash_and_run(timeout=1)
lines = list(device.iter_stdout)
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_file.filename)
with open(device.handler_log_file.filename, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
assert file_lines[-2:] == lines[-2:]
device.disconnect()
def test_if_qemu_adapter_finishes_after_timeout(device) -> None:
device.command = ['sleep', '0.3']
device.flash_and_run(timeout=0.1)
device.stop()
assert device._process_ended_with_timeout is True
def test_handler_and_device_log_correct_initialized_on_qemu(device, tmp_path) -> None:
device.device_config.build_dir = tmp_path
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, NullLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]

View file

@ -0,0 +1,137 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import os
import subprocess
from pathlib import Path
from unittest import mock
import pytest
from twister_harness.device.simulator_adapter import (
CustomSimulatorAdapter,
NativeSimulatorAdapter,
UnitSimulatorAdapter,
)
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
from twister_harness.twister_harness_config import DeviceConfig
@pytest.fixture(name='device')
def fixture_adapter(tmp_path) -> NativeSimulatorAdapter:
return NativeSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
def test_if_native_simulator_adapter_get_command_returns_proper_string(
device: NativeSimulatorAdapter, resources: Path
) -> None:
device.device_config.build_dir = resources
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [str(resources.joinpath('zephyr', 'zephyr.exe'))]
def test_if_native_simulator_adapter_runs_without_errors(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""
Run script which prints text line by line and ends without errors.
Verify if subprocess was ended without errors, and without timeout.
"""
script_path = resources.joinpath('mock_script.py')
# patching original command by mock_script.py to simulate same behaviour as zephyr.exe
device.command = ['python3', str(script_path)]
device.initialize_log_files()
device.flash_and_run(timeout=4)
lines = list(device.iter_stdout) # give it time before close thread
device.stop()
assert device._process_ended_with_timeout is False
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_file.filename)
with open(device.handler_log_file.filename, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
assert file_lines[-2:] == lines[-2:]
def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
script_path = resources.joinpath('mock_script.py')
device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
device.initialize_log_files()
device.flash_and_run(timeout=0.5)
lines = list(device.iter_stdout)
device.stop()
assert device._process_ended_with_timeout is True
assert device._exc is None
# this message should not be printed because script has been terminated due to timeout
assert 'End of script' not in lines, 'Script has not been terminated before end'
def test_if_native_simulator_adapter_raises_exception_file_not_found(device: NativeSimulatorAdapter) -> None:
device.command = ['dummy']
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
device.flash_and_run(timeout=0.1)
device.stop()
assert device._exc is not None
assert isinstance(device._exc, TwisterHarnessException)
def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
device.command = []
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run(timeout=0.1)
def test_handler_and_device_log_correct_initialized_on_simulators(device: NativeSimulatorAdapter) -> None:
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, NullLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
@mock.patch('asyncio.run', side_effect=subprocess.SubprocessError(1, 'Exception message'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
patched_run, device: NativeSimulatorAdapter
):
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Exception message'):
device.flash_and_run(timeout=0.1)
device.stop()
@mock.patch('asyncio.run', side_effect=Exception(1, 'Raised other exception'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
patched_run, device: NativeSimulatorAdapter
):
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Raised other exception'):
device.flash_and_run(timeout=0.1)
device.stop()
@mock.patch('shutil.which', return_value='west')
def test_if_custom_simulator_adapter_get_command_returns_proper_string(patched_which) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run']
@mock.patch('shutil.which', return_value=None)
def test_if_custom_simulator_adapter_get_command_returns_empty_string(patched_which) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == []
def test_if_unit_simulator_adapter_get_command_returns_proper_string(resources: Path) -> None:
device = UnitSimulatorAdapter(DeviceConfig(build_dir=resources))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [str(resources.joinpath('testbinary'))]

View file

@ -0,0 +1,54 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import pytest
from twister_harness.log_files.log_file import LogFile
logger = logging.getLogger(__name__)
@pytest.fixture
def sample_log_file(tmpdir):
log_file = LogFile.create(build_dir=tmpdir)
yield log_file
def test_if_filename_is_correct(sample_log_file: LogFile):
assert sample_log_file.filename.endswith('uninitialized.log') # type: ignore[union-attr]
def test_handle_data_is_str(sample_log_file: LogFile):
msg = 'str message'
sample_log_file.handle(data=msg)
assert os.path.exists(path=sample_log_file.filename)
with open(file=sample_log_file.filename, mode='r') as file:
assert file.readline() == 'str message'
def test_handle_data_is_byte(sample_log_file: LogFile):
msg = b'bytes message'
sample_log_file.handle(data=msg)
assert os.path.exists(path=sample_log_file.filename)
with open(file=sample_log_file.filename, mode='r') as file:
assert file.readline() == 'bytes message'
def test_handle_data_is_empty(sample_log_file: LogFile):
msg = ''
sample_log_file.handle(data=msg)
assert not os.path.exists(path=sample_log_file.filename)
def test_get_log_filename_null_filename():
log_file = LogFile.create()
assert log_file.filename == os.devnull
def test_get_log_filename_sample_filename(tmpdir):
log_file = LogFile.create(build_dir=tmpdir)
assert log_file.filename == os.path.join(tmpdir, 'uninitialized.log')