scripts: pytest: align adapters API

Select and place common code of three adapters (HardwareAdapter,
BinaryAdapterBase and QemuAdapter) into basic DeviceAdapter class.

Introduce new way of reading device output by run separate thread which
try to read device output and place it into internal python queue.
Thanks to this, now it is possible to create readline method for all
adapters, which can be unblock when timeout occur.

Collect all common steps which have to be done before setup device in
launch method. The same was done for teardown operations which were
placed into close method.

Additionally some protection mechanisms were introduced to prevent for
undesirable side-effects when user could try to launch to already
launched device or try to send some data to disconnected device.

iter_stdout method was replaced by two new methods: readline and
readlines. To make it possible to remove all read output from internal
buffer (queue), clear_buffer method was introduced.

Also unit tests were rewritten to work properly with current version
of adapters.

Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
This commit is contained in:
Piotr Golyzniak 2023-08-04 15:23:34 +02:00 committed by Anas Nashif
commit f22c2d6388
17 changed files with 754 additions and 780 deletions

View file

@ -6,15 +6,15 @@ import time
import logging
from twister_harness import DeviceAdapter
from twister_harness.exceptions import TwisterHarnessTimeoutException
logger = logging.getLogger(__name__)
def wait_for_message(dut: DeviceAdapter, message, timeout=20):
time_started = time.time()
for line in dut.iter_stdout:
if line:
logger.debug("#: " + line)
while True:
line = dut.readline(timeout=timeout)
if message in line:
return True
if time.time() > time_started + timeout:
@ -25,10 +25,14 @@ def wait_for_prompt(dut: DeviceAdapter, prompt='uart:~$', timeout=20):
time_started = time.time()
while True:
dut.write(b'\n')
for line in dut.iter_stdout:
if prompt in line:
logger.debug('Got prompt')
return True
try:
line = dut.readline(timeout=0.5)
except TwisterHarnessTimeoutException:
# ignore read timeout and try to send enter once again
continue
if prompt in line:
logger.debug('Got prompt')
return True
if time.time() > time_started + timeout:
return False

View file

@ -5,216 +5,132 @@
from __future__ import annotations
import abc
import asyncio
import asyncio.subprocess
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from asyncio.base_subprocess import BaseSubprocessTransport
from datetime import datetime
from functools import wraps
from pathlib import Path
from queue import Queue
from typing import Generator
import psutil
from twister_harness.constants import END_OF_DATA
from twister_harness.device.device_adapter import DeviceAdapter
from twister_harness.helper import log_command, terminate_process
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
# Workaround for RuntimeError: Event loop is closed
# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore
logger = logging.getLogger(__name__)
class BinaryAdapterBase(DeviceAdapter, abc.ABC):
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
def __init__(self, device_config: DeviceConfig) -> None:
"""
:param twister_config: twister configuration
"""
super().__init__(device_config, **kwargs)
self._process: asyncio.subprocess.Process | None = None
self._process_ended_with_timeout: bool = False
self.queue: Queue = Queue()
self._stop_job: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self.command: list[str] = []
super().__init__(device_config)
self._process: subprocess.Popen | None = None
self.process_kwargs: dict = {
'stdout': asyncio.subprocess.PIPE,
'stderr': asyncio.subprocess.STDOUT,
'stdin': asyncio.subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'stdin': subprocess.PIPE,
'env': self.env,
}
self._data_to_send: bytes | None = None
def connect(self) -> None:
pass # pragma: no cover
@abc.abstractmethod
def generate_command(self) -> None:
"""Generate and set command which will be used during running device."""
def flash_and_run(self) -> None:
def _flash_and_run(self) -> None:
self._run_subprocess()
def _run_subprocess(self) -> None:
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
msg = 'Run command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_simulation, args=(self.connection_timeout,), daemon=True)
self._thread.start()
# Give a time to start subprocess before test is executed
time.sleep(0.1)
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_simulation(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
log_command(logger, 'Running command', self.command, level=logging.DEBUG)
try:
return_code: int = asyncio.run(self._run_command(timeout=timeout))
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
else:
if return_code == 0:
logger.info('Running simulation finished with return code %s', return_code)
elif return_code == -15:
logger.info('Running simulation stopped interrupted by user')
else:
logger.warning('Running simulation finished with return code %s', return_code)
finally:
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
self._process = subprocess.Popen(self.command, **self.process_kwargs)
except subprocess.SubprocessError as exc:
msg = f'Running subprocess failed due to SubprocessError {exc}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
except FileNotFoundError as exc:
msg = f'Running subprocess failed due to file not found: {exc.filename}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
except Exception as exc:
msg = f'Running subprocess failed {exc}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
async def _run_command(self, timeout: float):
assert isinstance(self.command, (list, tuple, set))
# to avoid stupid and difficult to debug mistakes
# we are using asyncio to run subprocess to be able to read from stdout
# without blocking while loop (readline with timeout)
self._process = await asyncio.create_subprocess_exec(
*self.command,
**self.process_kwargs
)
logger.debug('Started subprocess with PID %s', self._process.pid)
end_time = time.time() + timeout
while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr]
if line := await self._read_line(timeout=0.1):
self.queue.put(line.decode('utf-8').strip())
if time.time() > end_time:
self._process_ended_with_timeout = True
logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout')
break
if self._data_to_send:
self._process.stdin.write(self._data_to_send) # type: ignore[union-attr]
await self._process.stdin.drain() # type: ignore[union-attr]
self._data_to_send = None
def _connect_device(self) -> None:
"""
This method was implemented only to imitate standard connect behavior
like in Serial class.
"""
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
return await self._process.wait()
def _disconnect_device(self) -> None:
"""
This method was implemented only to imitate standard disconnect behavior
like in serial connection.
"""
async def _read_line(self, timeout=0.1) -> bytes | None:
try:
return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr]
except asyncio.TimeoutError:
return None
def _close_device(self) -> None:
"""Terminate subprocess"""
self._stop_subprocess()
def disconnect(self):
pass # pragma: no cover
def _stop_subprocess(self) -> None:
if self._process is None:
# subprocess already stopped
return
return_code: int | None = self._process.poll()
if return_code is None:
terminate_process(self._process)
return_code = self._process.wait(self.base_timeout)
self._process = None
logger.debug('Running subprocess finished with return code %s', return_code)
def stop(self) -> None:
"""Stop device."""
self._stop_job = True
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s', self._process.pid)
# kill subprocess if it is still running
for child in psutil.Process(self._process.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
# kill subprocess if it is still running
os.kill(self._process.pid, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
def _read_device_output(self) -> bytes:
return self._process.stdout.readline()
def iter_stdout_lines(self) -> Generator[str, None, None]:
"""Return output from serial."""
while True:
stream = self.queue.get()
if stream == END_OF_DATA:
logger.debug('No more data from running process')
break
self.handler_log_file.handle(data=stream + '\n')
yield stream
self.queue.task_done()
self.iter_object = None
def _write_to_device(self, data: bytes) -> None:
self._process.stdin.write(data)
self._process.stdin.flush()
def write(self, data: bytes) -> None:
"""Write data to serial"""
while self._data_to_send:
# wait data will be write to self._process.stdin.write
time.sleep(0.1)
self._data_to_send = data
def _flush_device_output(self) -> None:
if self.is_device_running():
self._process.stdout.flush()
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
def is_device_running(self) -> bool:
return self._device_run.is_set() and self._is_binary_running()
def _is_binary_running(self) -> bool:
if self._process is None or self._process.poll() is not None:
return False
return True
def is_device_connected(self) -> bool:
"""Return true if device is connected."""
return self.is_device_running() and self._device_connected.is_set()
def _clear_internal_resources(self) -> None:
super()._clear_internal_resources()
self._process = None
class NativeSimulatorAdapter(BinaryAdapterBase):
"""Simulator adapter to run `zephyr.exe` simulation"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [
str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
]
"""Set command to run."""
self.command = [str(Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe')]
class UnitSimulatorAdapter(BinaryAdapterBase):
"""Simulator adapter to run unit tests"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
"""Set command to run."""
self.command = [str(Path(self.device_config.build_dir) / 'testbinary')]
class CustomSimulatorAdapter(BinaryAdapterBase):
def generate_command(self) -> None:
"""Return command to run."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
else:
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
"""Set command to run."""
self.command = [self.west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']

View file

@ -7,9 +7,17 @@ from __future__ import annotations
import abc
import logging
import os
from typing import Generator
import queue
import shutil
import threading
import time
from datetime import datetime
from pathlib import Path
from twister_harness.log_files.log_file import LogFile, NullLogFile
from twister_harness.exceptions import (
TwisterHarnessException,
TwisterHarnessTimeoutException,
)
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
@ -18,15 +26,21 @@ logger = logging.getLogger(__name__)
class DeviceAdapter(abc.ABC):
"""Class defines an interface for all devices."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
def __init__(self, device_config: DeviceConfig) -> None:
"""
:param device_config: device configuration
"""
self.device_config: DeviceConfig = device_config
self.connection_timeout: float = device_config.connection_timeout
self.handler_log_file: LogFile = NullLogFile.create()
self.device_log_file: LogFile = NullLogFile.create()
self.iter_object: Generator[str, None, None] | None = None
self.base_timeout: float = device_config.base_timeout
self._device_read_queue: queue.Queue = queue.Queue()
self._reader_thread: threading.Thread | None = None
self._device_run: threading.Event = threading.Event()
self._device_connected: threading.Event = threading.Event()
self.command: list[str] = []
self._west: str | None = None
self.handler_log_path: Path = Path(device_config.build_dir) / 'handler.log'
self._initialize_log_file(self.handler_log_path)
def __repr__(self) -> str:
return f'{self.__class__.__name__}()'
@ -36,43 +50,197 @@ class DeviceAdapter(abc.ABC):
env = os.environ.copy()
return env
@abc.abstractmethod
def connect(self) -> None:
"""Connect with the device (e.g. via UART)"""
def launch(self) -> None:
"""
Start by closing previously running application (no effect if not
needed). Then, flash and run test application. Finally, start a reader
thread capturing an output from a device.
"""
self.close()
self._clear_internal_resources()
if not self.command:
self.generate_command()
self._flash_and_run()
self._device_run.set()
self._start_reader_thread()
self.connect()
def close(self) -> None:
"""Disconnect, close device and close reader thread."""
if not self._device_run.is_set():
# device already closed
return
self.disconnect()
self._close_device()
self._device_run.clear()
self._join_reader_thread()
def connect(self) -> None:
"""Connect to device - allow for output gathering."""
if self.is_device_connected():
logger.debug('Device already connected')
return
if not self.is_device_running():
msg = 'Cannot connect to not working device'
logger.error(msg)
raise TwisterHarnessException(msg)
self._connect_device()
self._device_connected.set()
@abc.abstractmethod
def disconnect(self) -> None:
"""Close a connection with the device"""
"""Disconnect device - block output gathering."""
if not self.is_device_connected():
logger.debug("Device already disconnected")
return
self._disconnect_device()
self._device_connected.clear()
def readline(self, timeout: float | None = None, print_output: bool = True) -> str:
"""
Read line from device output. If timeout is not provided, then use
base_timeout
"""
timeout = timeout or self.base_timeout
if self.is_device_connected() or not self._device_read_queue.empty():
data = self._read_from_queue(timeout)
else:
msg = 'No connection to the device and no more data to read.'
logger.error(msg)
raise TwisterHarnessException('No connection to the device and no more data to read.')
if print_output:
logger.debug('#: %s', data)
return data
def readlines(self, print_output: bool = True) -> list[str]:
"""
Read all available output lines produced by device from internal buffer.
"""
lines: list[str] = []
while not self._device_read_queue.empty():
line = self.readline(0.1, print_output)
lines.append(line)
return lines
def clear_buffer(self) -> None:
"""
Remove all available output produced by device from internal buffer
(queue).
"""
self.readlines(print_output=False)
def write(self, data: bytes) -> None:
"""Write data bytes to device."""
if not self.is_device_connected():
msg = 'No connection to the device'
logger.error(msg)
raise TwisterHarnessException(msg)
self._write_to_device(data)
def _initialize_log_file(self, log_file_path: Path) -> None:
with open(log_file_path, 'a+') as log_file:
log_file.write(f'\n==== Test started at {datetime.now()} ====\n')
def _start_reader_thread(self) -> None:
self._reader_thread = threading.Thread(target=self._handle_device_output, daemon=True)
self._reader_thread.start()
def _handle_device_output(self) -> None:
"""
This method is dedicated to run it in separate thread to read output
from device and put them into internal queue and save to log file.
"""
with open(self.handler_log_path, 'a+') as log_file:
while self.is_device_running():
if self.is_device_connected():
output = self._read_device_output().decode().strip()
if output:
self._device_read_queue.put(output)
log_file.write(f'{output}\n')
log_file.flush()
else:
# ignore output from device
self._flush_device_output()
time.sleep(0.1)
def _read_from_queue(self, timeout: float) -> str:
"""Read data from internal queue"""
try:
data: str | object = self._device_read_queue.get(timeout=timeout)
except queue.Empty as exc:
raise TwisterHarnessTimeoutException(f'Read from device timeout occurred ({timeout}s)') from exc
return data
def _join_reader_thread(self) -> None:
if self._reader_thread is not None:
self._reader_thread.join(self.base_timeout)
self._reader_thread = None
def _clear_internal_resources(self) -> None:
self._reader_thread = None
self._device_read_queue = queue.Queue()
self._device_run.clear()
self._device_connected.clear()
@property
def west(self) -> str:
"""
Return a path to west or if not found - raise an error. Once found
west path is stored as internal property to save time of looking for it
in the next time.
"""
if self._west is None:
self._west = shutil.which('west')
if self._west is None:
msg = 'west not found'
logger.error(msg)
raise TwisterHarnessException(msg)
return self._west
@abc.abstractmethod
def generate_command(self) -> None:
"""Generate command which will be used during flashing or running device."""
"""
Generate and set command which will be used during flashing or running
device.
"""
def flash_and_run(self) -> None:
@abc.abstractmethod
def _flash_and_run(self) -> None:
"""Flash and run application on a device."""
@abc.abstractmethod
def write(self, data: bytes) -> None:
"""Write data bytes to device"""
def _connect_device(self) -> None:
"""Connect with the device (e.g. via serial port)."""
@abc.abstractmethod
def initialize_log_files(self):
"""Initialize file to store logs."""
def stop(self) -> None:
"""Stop device."""
def _disconnect_device(self) -> None:
"""Disconnect from the device (e.g. from serial port)."""
@abc.abstractmethod
def iter_stdout_lines(self) -> Generator[str, None, None]:
"""A generator that yields lines read from device"""
def _close_device(self) -> None:
"""Stop application"""
@property
def iter_stdout(self) -> Generator[str, None, None]:
@abc.abstractmethod
def _read_device_output(self) -> bytes:
"""
Get generator object to iterate stdout from a device.
This wrapper method is added to avoid problems, when
user creates an instance of generator multiple times.
Read device output directly through serial, subprocess, FIFO, etc.
Even if device is not connected, this method has to return something
(e.g. empty bytes string). This assumption is made to maintain
compatibility between various adapters and their reading technique.
"""
if not self.iter_object:
self.iter_object = self.iter_stdout_lines()
return self.iter_object
@abc.abstractmethod
def _write_to_device(self, data: bytes) -> None:
"""Write to device directly through serial, subprocess, FIFO, etc."""
@abc.abstractmethod
def _flush_device_output(self) -> None:
"""Flush device connection (serial, subprocess output, FIFO, etc.)"""
@abc.abstractmethod
def is_device_running(self) -> bool:
"""Return true if application is running on device."""
@abc.abstractmethod
def is_device_connected(self) -> bool:
"""Return true if device is connected."""

View file

@ -8,17 +8,17 @@ import logging
import os
import pty
import re
import shutil
import subprocess
from datetime import datetime
from typing import Generator
import time
from pathlib import Path
import serial
from twister_harness.device.device_adapter import DeviceAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
from twister_harness.exceptions import (
TwisterHarnessException,
TwisterHarnessTimeoutException,
)
from twister_harness.helper import log_command, terminate_process
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
@ -27,92 +27,20 @@ logger = logging.getLogger(__name__)
class HardwareAdapter(DeviceAdapter):
"""Adapter class for real device."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
super().__init__(device_config, **kwargs)
self.connection: serial.Serial | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'env': self.env,
}
self.serial_pty_proc: subprocess.Popen | None = None
def __init__(self, device_config: DeviceConfig) -> None:
super().__init__(device_config)
self._flashing_timeout: float = self.base_timeout
self._serial_connection: serial.Serial | None = None
self._serial_pty_proc: subprocess.Popen | None = None
self._serial_buffer: bytearray = bytearray()
def connect(self) -> None:
"""Open serial connection."""
if self.connection:
# already opened
return
if self.device_config.pre_script:
self.run_custom_script(self.device_config.pre_script, self.connection_timeout)
serial_name = self._open_serial_pty() or self.device_config.serial
logger.info('Opening serial connection for %s', serial_name)
try:
self.connection = serial.Serial(
serial_name,
baudrate=self.device_config.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=self.connection_timeout
)
except serial.SerialException as e:
logger.exception('Cannot open connection: %s', e)
self._close_serial_pty()
raise
self.connection.flush()
def disconnect(self) -> None:
"""Close serial connection."""
if self.connection:
serial_name = self.connection.port
self.connection.close()
self.connection = None
logger.info('Closed serial connection for %s', serial_name)
self._close_serial_pty()
def stop(self) -> None:
if self.device_config.post_script:
self.run_custom_script(self.device_config.post_script, self.connection_timeout)
def _open_serial_pty(self) -> str | None:
"""Open a pty pair, run process and return tty name"""
if not self.device_config.serial_pty:
return None
master, slave = pty.openpty()
try:
self.serial_pty_proc = subprocess.Popen(
re.split(',| ', self.device_config.serial_pty),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as e:
logger.exception('Failed to run subprocess %s, error %s',
self.device_config.serial_pty, str(e))
raise
return os.ttyname(slave)
def _close_serial_pty(self) -> None:
"""Terminate the process opened for serial pty script"""
if self.serial_pty_proc:
self.serial_pty_proc.terminate()
self.serial_pty_proc.communicate()
logger.info('Process %s terminated', self.device_config.serial_pty)
self.serial_pty_proc = None
self.device_log_path: Path = Path(device_config.build_dir) / 'device.log'
self._initialize_log_file(self.device_log_path)
def generate_command(self) -> None:
"""Return command to flash."""
west = shutil.which('west')
if west is None:
raise TwisterHarnessException('west not found')
command = [
west,
self.west,
'flash',
'--skip-rebuild',
'--build-dir', str(self.device_config.build_dir),
@ -122,102 +50,221 @@ class HardwareAdapter(DeviceAdapter):
if self.device_config.west_flash_extra_args:
command_extra_args.extend(self.device_config.west_flash_extra_args)
if runner := self.device_config.runner:
command.extend(['--runner', runner])
if board_id := self.device_config.id:
if runner == 'pyocd':
command_extra_args.append('--board-id')
command_extra_args.append(board_id)
elif runner == 'nrfjprog':
command_extra_args.append('--dev-id')
command_extra_args.append(board_id)
elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'hla_serial {board_id}')
elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'cmsis_dap_serial {board_id}')
elif runner == 'jlink':
command.append(f'--tool-opt=-SelectEmuBySN {board_id}')
elif runner == 'stm32cubeprogrammer':
command.append(f'--tool-opt=sn={board_id}')
if self.device_config.runner:
runner_base_args, runner_extra_args = self._prepare_runner_args()
command.extend(runner_base_args)
command_extra_args.extend(runner_extra_args)
if command_extra_args:
command.append('--')
command.extend(command_extra_args)
self.command = command
@staticmethod
def run_custom_script(script, timeout: float) -> None:
with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
try:
stdout, stderr = proc.communicate(timeout=timeout)
logger.debug(stdout.decode())
if proc.returncode != 0:
logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
def _prepare_runner_args(self) -> tuple[list[str], list[str]]:
base_args: list[str] = []
extra_args: list[str] = []
runner = self.device_config.runner
base_args.extend(['--runner', runner])
if board_id := self.device_config.id:
if runner == 'pyocd':
extra_args.append('--board-id')
extra_args.append(board_id)
elif runner == 'nrfjprog':
extra_args.append('--dev-id')
extra_args.append(board_id)
elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
extra_args.append('--cmd-pre-init')
extra_args.append(f'hla_serial {board_id}')
elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
extra_args.append('--cmd-pre-init')
extra_args.append(f'cmsis_dap_serial {board_id}')
elif runner == 'jlink':
base_args.append(f'--tool-opt=-SelectEmuBySN {board_id}')
elif runner == 'stm32cubeprogrammer':
base_args.append(f'--tool-opt=sn={board_id}')
return base_args, extra_args
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
logger.error("{} timed out".format(script))
def flash_and_run(self) -> None:
def _flash_and_run(self) -> None:
"""Flash application on a device."""
if not self.command:
msg = 'Flash command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
if self.device_config.pre_script:
self._run_custom_script(self.device_config.pre_script, self.base_timeout)
if self.device_config.id:
logger.info('Flashing device %s', self.device_config.id)
log_command(logger, 'Flashing command', self.command, level=logging.INFO)
logger.debug('Flashing device %s', self.device_config.id)
log_command(logger, 'Flashing command', self.command, level=logging.DEBUG)
process = stdout = None
try:
process = subprocess.Popen(
self.command,
**self.process_kwargs
)
except subprocess.CalledProcessError:
logger.error('Error while flashing device')
raise TwisterHarnessException('Could not flash device')
else:
stdout = stderr = None
try:
stdout, stderr = process.communicate(timeout=self.connection_timeout)
except subprocess.TimeoutExpired:
process.kill()
finally:
if stdout:
self.device_log_file.handle(data=stdout)
logger.debug(stdout.decode(errors='ignore'))
if stderr:
self.device_log_file.handle(data=stderr)
if process.returncode == 0:
logger.info('Flashing finished')
else:
raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
process = subprocess.Popen(self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.env)
stdout, _ = process.communicate(timeout=self._flashing_timeout)
except subprocess.TimeoutExpired as exc:
process.kill()
msg = f'Timeout occurred ({self._flashing_timeout}s) during flashing.'
logger.error(msg)
raise TwisterHarnessTimeoutException(msg) from exc
except subprocess.SubprocessError as exc:
msg = f'Flashing subprocess failed due to SubprocessError {exc}'
logger.error(msg)
raise TwisterHarnessTimeoutException(msg) from exc
finally:
if stdout is not None:
stdout_decoded = stdout.decode(errors='ignore')
with open(self.device_log_path, 'a+') as log_file:
log_file.write(stdout_decoded)
logger.debug(f'Flashing output:\n{stdout_decoded}')
if self.device_config.post_flash_script:
self.run_custom_script(self.device_config.post_flash_script, self.connection_timeout)
self._run_custom_script(self.device_config.post_flash_script, self.base_timeout)
if process is not None and process.returncode == 0:
logger.debug('Flashing finished')
else:
msg = f'Could not flash device {self.device_config.id}'
logger.error(msg)
raise TwisterHarnessException(msg)
def iter_stdout_lines(self) -> Generator[str, None, None]:
"""Return output from serial."""
if not self.connection:
return
self.connection.flush()
self.connection.reset_input_buffer()
while self.connection and self.connection.is_open:
stream = self.connection.readline()
self.handler_log_file.handle(data=stream)
yield stream.decode(errors='ignore').strip()
self.iter_object = None
def _connect_device(self) -> None:
serial_name = self._open_serial_pty() or self.device_config.serial
logger.debug('Opening serial connection for %s', serial_name)
try:
self._serial_connection = serial.Serial(
serial_name,
baudrate=self.device_config.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=self.base_timeout,
)
except serial.SerialException as exc:
logger.exception('Cannot open connection: %s', exc)
self._close_serial_pty()
raise
def write(self, data: bytes) -> None:
"""Write data to serial"""
if self.connection:
self.connection.write(data)
self._serial_connection.flush()
self._serial_connection.reset_input_buffer()
self._serial_connection.reset_output_buffer()
def initialize_log_files(self) -> None:
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
self.device_log_file.handle(start_msg)
def _open_serial_pty(self) -> str | None:
"""Open a pty pair, run process and return tty name"""
if not self.device_config.serial_pty:
return None
master, slave = pty.openpty()
try:
self._serial_pty_proc = subprocess.Popen(
re.split(',| ', self.device_config.serial_pty),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as exc:
logger.exception('Failed to run subprocess %s, error %s', self.device_config.serial_pty, str(exc))
raise
return os.ttyname(slave)
def _disconnect_device(self) -> None:
if self._serial_connection:
serial_name = self._serial_connection.port
self._serial_connection.close()
# self._serial_connection = None
logger.debug('Closed serial connection for %s', serial_name)
self._close_serial_pty()
def _close_serial_pty(self) -> None:
"""Terminate the process opened for serial pty script"""
if self._serial_pty_proc:
self._serial_pty_proc.terminate()
self._serial_pty_proc.communicate(timeout=self.base_timeout)
logger.debug('Process %s terminated', self.device_config.serial_pty)
self._serial_pty_proc = None
def _close_device(self) -> None:
if self.device_config.post_script:
self._run_custom_script(self.device_config.post_script, self.base_timeout)
def is_device_running(self) -> bool:
return self._device_run.is_set()
def is_device_connected(self) -> bool:
return bool(
self.is_device_running()
and self._device_connected.is_set()
and self._serial_connection
and self._serial_connection.is_open
)
def _read_device_output(self) -> bytes:
try:
output = self._readline_serial()
except (serial.SerialException, TypeError, IOError):
# serial was probably disconnected
output = b''
return output
def _readline_serial(self) -> bytes:
"""
This method was created to avoid using PySerial built-in readline
method which cause blocking reader thread even if there is no data to
read. Instead for this, following implementation try to read data only
if they are available. Inspiration for this code was taken from this
comment:
https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
"""
line = self._readline_from_serial_buffer()
if line is not None:
return line
while True:
if self._serial_connection is None or not self._serial_connection.is_open:
return b''
elif self._serial_connection.in_waiting == 0:
time.sleep(0.05)
continue
else:
bytes_to_read = max(1, min(2048, self._serial_connection.in_waiting))
output = self._serial_connection.read(bytes_to_read)
self._serial_buffer.extend(output)
line = self._readline_from_serial_buffer()
if line is not None:
return line
def _readline_from_serial_buffer(self) -> bytes | None:
idx = self._serial_buffer.find(b"\n")
if idx >= 0:
line = self._serial_buffer[:idx+1]
self._serial_buffer = self._serial_buffer[idx+1:]
return bytes(line)
else:
return None
def _write_to_device(self, data: bytes) -> None:
self._serial_connection.write(data)
def _flush_device_output(self) -> None:
if self.is_device_connected():
self._serial_connection.flush()
self._serial_connection.reset_input_buffer()
def _clear_internal_resources(self) -> None:
super()._clear_internal_resources()
self._serial_connection = None
self._serial_pty_proc = None
@staticmethod
def _run_custom_script(script_path: str | Path, timeout: float) -> None:
with subprocess.Popen(str(script_path), stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
try:
stdout, stderr = proc.communicate(timeout=timeout)
logger.debug(stdout.decode())
if proc.returncode != 0:
msg = f'Custom script failure: \n{stderr.decode(errors="ignore")}'
logger.error(msg)
raise TwisterHarnessException(msg)
except subprocess.TimeoutExpired as exc:
terminate_process(proc)
proc.communicate(timeout=timeout)
msg = f'Timeout occurred ({timeout}s) during execution custom script: {script_path}'
logger.error(msg)
raise TwisterHarnessTimeoutException(msg) from exc

View file

@ -5,135 +5,40 @@
from __future__ import annotations
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from typing import Generator
import psutil
from twister_harness.constants import QEMU_FIFO_FILE_NAME
from twister_harness.device.device_adapter import DeviceAdapter
from twister_harness.device.fifo_handler import FifoHandler
from twister_harness.device.binary_adapter import BinaryAdapterBase
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class QemuAdapter(DeviceAdapter):
"""Adapter for Qemu simulator"""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
super().__init__(device_config, **kwargs)
self._process: subprocess.Popen | None = None
self._process_ended_with_timeout: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self._emulation_was_finished: bool = False
self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
self.command: list[str] = []
self.booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds
class QemuAdapter(BinaryAdapterBase):
def __init__(self, device_config: DeviceConfig) -> None:
super().__init__(device_config)
qemu_fifo_file_path = Path(self.device_config.build_dir) / 'qemu-fifo'
self._fifo_connection: FifoHandler = FifoHandler(qemu_fifo_file_path)
def generate_command(self) -> None:
"""Return command to flash."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
else:
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
"""Set command to run."""
self.command = [self.west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
if 'stdin' in self.process_kwargs:
self.process_kwargs.pop('stdin')
def connect(self) -> None:
logger.debug('Opening connection')
self.connection.connect()
def _flash_and_run(self) -> None:
super()._flash_and_run()
self._create_fifo_connection()
def flash_and_run(self) -> None:
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_command, args=(self.connection_timeout,), daemon=True)
self._thread.start()
# Give a time to start subprocess before test is executed
time.sleep(0.1)
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_command(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
try:
self._process = subprocess.Popen(
self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=self.env
)
stdout, _ = self._process.communicate(timeout=timeout)
return_code: int = self._process.returncode
except subprocess.TimeoutExpired:
logger.error('Running simulation finished after timeout: %s seconds', timeout)
self._process_ended_with_timeout = True
# we don't want to raise Timeout exception, but allowed a test to parse the output
# and set proper status
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
else:
if return_code == 0:
logger.info('Running simulation finished with return code %s', return_code)
elif return_code == -15:
logger.info('Running simulation terminated')
else:
logger.warning('Running simulation finished with return code %s', return_code)
for line in stdout.decode('utf-8').split('\n'):
logger.info(line)
finally:
self._emulation_was_finished = True
def disconnect(self):
logger.debug('Closing connection')
self.connection.disconnect()
def stop(self) -> None:
"""Stop device."""
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s', self._process.pid)
# kill all child subprocesses
for child in psutil.Process(self._process.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
# kill subprocess if it is still running
os.kill(self._process.pid, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
def _wait_for_fifo(self):
for _ in range(int(self.booting_timeout_in_ms / 10) or 1):
if self.connection.is_open:
def _create_fifo_connection(self) -> None:
self._fifo_connection.connect()
booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds
for _ in range(int(booting_timeout_in_ms / 10) or 1):
if self._fifo_connection.is_open:
break
elif self._emulation_was_finished:
elif not self._is_binary_running():
msg = 'Problem with starting QEMU'
logger.error(msg)
raise TwisterHarnessException(msg)
@ -143,55 +48,25 @@ class QemuAdapter(DeviceAdapter):
logger.error(msg)
raise TwisterHarnessException(msg)
def iter_stdout_lines(self) -> Generator[str, None, None]:
if not self.connection:
return
if not self.connection.is_open:
self._wait_for_fifo()
def _stop_subprocess(self) -> None:
super()._stop_subprocess()
self._fifo_connection.disconnect()
# create unblocking reading from fifo file
q: Queue = Queue()
def read_lines():
while self.connection and self.connection.is_open:
try:
line = self.connection.readline().decode('UTF-8').strip()
except (OSError, ValueError):
# file could be closed already so we should stop reading
break
if len(line) != 0:
q.put(line)
t = threading.Thread(target=read_lines, daemon=True)
t.start()
end_time = time.time() + self.connection_timeout
def _read_device_output(self) -> bytes:
try:
while True:
try:
stream = q.get(timeout=0.1)
self.handler_log_file.handle(data=stream + '\n')
yield stream
except Empty: # timeout appeared
pass
if time.time() > end_time:
break
except KeyboardInterrupt:
# let thread to finish smoothly
pass
finally:
t.join(1)
self.iter_object = None
output = self._fifo_connection.readline()
except (OSError, ValueError):
# emulation was probably finished and thus fifo file was closed too
output = b''
return output
def write(self, data: bytes) -> None:
"""Write data to serial"""
if not self.connection:
return
if not self.connection.is_open:
self._wait_for_fifo()
self.connection.write(data)
def _write_to_device(self, data: bytes) -> None:
self._fifo_connection.write(data)
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
def _flush_device_output(self) -> None:
if self.is_device_running():
self._fifo_connection.flush()
def is_device_connected(self) -> bool:
"""Return true if device is connected."""
return bool(super().is_device_connected() and self._fifo_connection.is_open)

View file

@ -4,3 +4,7 @@
class TwisterHarnessException(Exception):
"""General Twister harness exception."""
class TwisterHarnessTimeoutException(TwisterHarnessException):
"""Twister harness timeout exception"""

View file

@ -26,14 +26,9 @@ def dut(request: pytest.FixtureRequest) -> Generator[DeviceAdapter, None, None]:
device = device_class(device_config)
try:
device.connect()
device.generate_command()
device.initialize_log_files()
device.flash_and_run()
device.connect()
device.launch()
yield device
except KeyboardInterrupt:
pass
finally: # to make sure we close all running processes after user broke execution
device.disconnect()
device.stop()
device.close()

View file

@ -5,9 +5,14 @@
from __future__ import annotations
import logging
import os.path
import os
import platform
import shlex
import signal
import subprocess
import time
import psutil
_WINDOWS = platform.system() == 'Windows'
@ -38,3 +43,18 @@ def normalize_filename(filename: str) -> str:
filename = os.path.expanduser(os.path.expandvars(filename))
filename = os.path.normpath(os.path.abspath(filename))
return filename
def terminate_process(proc: subprocess.Popen) -> None:
"""
Try to terminate provided process and all its subprocesses recursively.
"""
for child in psutil.Process(proc.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
proc.terminate()
# sleep for a while before attempting to kill
time.sleep(0.5)
proc.kill()

View file

@ -87,10 +87,12 @@ def pytest_addoption(parser: pytest.Parser):
'will translate to "west flash -- --board-id=foobar --erase"'
)
twister_harness_group.addoption(
'--connection-timeout',
'--base-timeout',
type=float,
default=60.0,
help='Set timeout for the connection with device in seconds.'
help='Set base timeout (in seconds) used during monitoring if some '
'operations are finished in a finite amount of time (e.g. waiting '
'for flashing).'
)
twister_harness_group.addoption(
'--build-dir',

View file

@ -24,7 +24,7 @@ class DeviceConfig:
product: str = ''
serial_pty: str = ''
west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
connection_timeout: float = 60.0 # [s]
base_timeout: float = 60.0 # [s]
build_dir: Path | str = ''
binary_file: Path | str = ''
name: str = ''
@ -59,7 +59,7 @@ class TwisterHarnessConfig:
product=config.option.device_product,
serial_pty=config.option.device_serial_pty,
west_flash_extra_args=west_flash_extra_args,
connection_timeout=config.option.connection_timeout,
base_timeout=config.option.base_timeout,
build_dir=config.option.build_dir,
binary_file=config.option.binary_file,
pre_script=config.option.pre_script,

View file

@ -21,3 +21,12 @@ def copy_example(pytester) -> Path:
resources_dir = Path(__file__).parent / 'data'
pytester.copy_example(str(resources_dir))
return pytester.path
def readlines_until(device, line_pattern: str):
lines = []
while True:
line = device.readline()
lines.append(line)
if line_pattern in line:
return lines

View file

@ -9,13 +9,13 @@ from unittest import mock
import pytest
from conftest import readlines_until
from twister_harness.device.binary_adapter import (
CustomSimulatorAdapter,
NativeSimulatorAdapter,
UnitSimulatorAdapter,
)
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
from twister_harness.exceptions import TwisterHarnessException, TwisterHarnessTimeoutException
from twister_harness.twister_harness_config import DeviceConfig
@ -24,6 +24,78 @@ def fixture_adapter(tmp_path) -> NativeSimulatorAdapter:
return NativeSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
def test_if_simulator_adapter_runs_without_errors(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""
Run script which prints text line by line and ends without errors.
Verify if subprocess was ended without errors, and without timeout.
"""
script_path = resources.joinpath('mock_script.py')
# patching original command by mock_script.py to simulate same behaviour as zephyr.exe
device.command = ['python3', str(script_path)]
device.launch()
lines = readlines_until(device=device, line_pattern='Returns with code')
device.close()
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_path)
with open(device.handler_log_path, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
assert file_lines[-2:] == lines[-2:]
def test_if_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
script_path = resources.joinpath('mock_script.py')
device.base_timeout = 1.0
device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
device.launch()
with pytest.raises(TwisterHarnessTimeoutException, match='Read from device timeout occurred'):
readlines_until(device=device, line_pattern='Returns with code')
device.close()
assert device._process is None
with open(device.handler_log_path, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
# this message should not be printed because script has been terminated due to timeout
assert 'End of script' not in file_lines, 'Script has not been terminated before end'
def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
device.command = []
exception_msg = 'Run command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device._flash_and_run()
@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
patched_popen, device: NativeSimulatorAdapter
) -> None:
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Exception message'):
device._flash_and_run()
@mock.patch('subprocess.Popen', side_effect=FileNotFoundError(1, 'File not found', 'fake_file.txt'))
def test_if_simulator_adapter_raises_exception_file_not_found(
patched_popen, device: NativeSimulatorAdapter
) -> None:
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='fake_file.txt'):
device._flash_and_run()
@mock.patch('subprocess.Popen', side_effect=Exception(1, 'Raised other exception'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
patched_run, device: NativeSimulatorAdapter
) -> None:
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Raised other exception'):
device._flash_and_run()
def test_if_native_simulator_adapter_get_command_returns_proper_string(
device: NativeSimulatorAdapter, resources: Path
) -> None:
@ -33,111 +105,23 @@ def test_if_native_simulator_adapter_get_command_returns_proper_string(
assert device.command == [str(resources.joinpath('zephyr', 'zephyr.exe'))]
def test_if_native_simulator_adapter_runs_without_errors(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""
Run script which prints text line by line and ends without errors.
Verify if subprocess was ended without errors, and without timeout.
"""
script_path = resources.joinpath('mock_script.py')
# patching original command by mock_script.py to simulate same behaviour as zephyr.exe
device.connection_timeout = 4
device.command = ['python3', str(script_path)]
device.initialize_log_files()
device.flash_and_run()
lines = list(device.iter_stdout) # give it time before close thread
device.stop()
assert device._process_ended_with_timeout is False
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_file.filename)
with open(device.handler_log_file.filename, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
assert file_lines[-2:] == lines[-2:]
def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
resources: Path, device: NativeSimulatorAdapter
) -> None:
"""Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
script_path = resources.joinpath('mock_script.py')
device.connection_timeout = 0.5
device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
device.initialize_log_files()
device.flash_and_run()
lines = list(device.iter_stdout)
device.stop()
assert device._process_ended_with_timeout is True
assert device._exc is None
# this message should not be printed because script has been terminated due to timeout
assert 'End of script' not in lines, 'Script has not been terminated before end'
def test_if_native_simulator_adapter_raises_exception_file_not_found(device: NativeSimulatorAdapter) -> None:
device.connection_timeout = 0.1
device.command = ['dummy']
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
device.flash_and_run()
device.stop()
assert device._exc is not None
assert isinstance(device._exc, TwisterHarnessException)
def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
device.connection_timeout = 0.1
device.command = []
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run()
def test_handler_and_device_log_correct_initialized_on_simulators(device: NativeSimulatorAdapter) -> None:
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, NullLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
@mock.patch('asyncio.run', side_effect=subprocess.SubprocessError(1, 'Exception message'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
patched_run, device: NativeSimulatorAdapter
):
device.connection_timeout = 0.1
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Exception message'):
device.flash_and_run()
device.stop()
@mock.patch('asyncio.run', side_effect=Exception(1, 'Raised other exception'))
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
patched_run, device: NativeSimulatorAdapter
):
device.connection_timeout = 0.1
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Raised other exception'):
device.flash_and_run()
device.stop()
@mock.patch('shutil.which', return_value='west')
def test_if_custom_simulator_adapter_get_command_returns_proper_string(patched_which) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
def test_if_custom_simulator_adapter_get_command_returns_proper_string(patched_which, tmp_path: Path) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run']
assert device.command == ['west', 'build', '-d', str(tmp_path), '-t', 'run']
@mock.patch('shutil.which', return_value=None)
def test_if_custom_simulator_adapter_get_command_returns_empty_string(patched_which) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == []
def test_if_custom_simulator_adapter_raise_exception_when_west_not_found(patched_which, tmp_path: Path) -> None:
device = CustomSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
with pytest.raises(TwisterHarnessException, match='west not found'):
device.generate_command()
def test_if_unit_simulator_adapter_get_command_returns_proper_string(resources: Path) -> None:
device = UnitSimulatorAdapter(DeviceConfig(build_dir=resources))
def test_if_unit_simulator_adapter_get_command_returns_proper_string(tmp_path: Path) -> None:
device = UnitSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [str(resources.joinpath('testbinary'))]
assert device.command == [str(tmp_path / 'testbinary')]

View file

@ -92,7 +92,7 @@ def main():
with FifoFile(read_path, 'rb'), FifoFile(write_path, 'wb') as wf:
for line in content.split('\n'):
wf.write(f'{line}\n'.encode('utf-8'))
time.sleep(0.1)
time.sleep(1) # give a moment for external programs to collect all outputs
return 0

View file

@ -61,6 +61,7 @@ def main() -> int:
print('End of script', flush=True)
print('Returns with code', args.return_code, flush=True)
time.sleep(1) # give a moment for external programs to collect all outputs
return args.return_code

View file

@ -10,32 +10,39 @@ import pytest
from twister_harness.device.hardware_adapter import HardwareAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
@pytest.fixture(name='device')
def fixture_adapter() -> HardwareAdapter:
def fixture_adapter(tmp_path) -> HardwareAdapter:
build_dir = tmp_path / 'build_dir'
os.mkdir(build_dir)
device_config = DeviceConfig(
runner='runner',
build_dir=Path('build'),
build_dir=build_dir,
platform='platform',
id='p_id',
)
return HardwareAdapter(device_config)
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value=None)
def test_if_hardware_adapter_raise_exception_when_west_not_found(patched_which, device: HardwareAdapter) -> None:
with pytest.raises(TwisterHarnessException, match='west not found'):
device.generate_command()
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_1(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.generate_command()
assert isinstance(device.command, list)
assert device.command == ['west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'runner']
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_2(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'pyocd'
device.generate_command()
assert isinstance(device.command, list)
@ -44,16 +51,9 @@ def test_if_get_command_returns_proper_string_2(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_raise_exception_if_west_is_not_installed(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = None
with pytest.raises(TwisterHarnessException, match='west not found'):
device.generate_command()
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_3(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'nrfjprog'
device.generate_command()
assert isinstance(device.command, list)
@ -62,9 +62,9 @@ def test_if_get_command_returns_proper_string_3(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_4(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'openocd'
device.device_config.product = 'STM32 STLink'
device.generate_command()
@ -75,9 +75,9 @@ def test_if_get_command_returns_proper_string_4(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_5(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'openocd'
device.device_config.product = 'EDBG CMSIS-DAP'
device.generate_command()
@ -88,9 +88,9 @@ def test_if_get_command_returns_proper_string_5(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_6(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'jlink'
device.generate_command()
assert isinstance(device.command, list)
@ -100,9 +100,9 @@ def test_if_get_command_returns_proper_string_6(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_7(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'stm32cubeprogrammer'
device.generate_command()
assert isinstance(device.command, list)
@ -112,9 +112,9 @@ def test_if_get_command_returns_proper_string_7(patched_which, device: HardwareA
]
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_8(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.build_dir = 'build'
device.device_config.runner = 'openocd'
device.device_config.product = 'STLINK-V3'
device.generate_command()
@ -125,37 +125,43 @@ def test_if_get_command_returns_proper_string_8(patched_which, device: HardwareA
]
@mock.patch('shutil.which', return_value='west')
def test_if_get_command_returns_proper_string_with_west_flash_extra_args(
patched_which, device: HardwareAdapter
) -> None:
device.device_config.build_dir = 'build'
device.device_config.west_flash_extra_args = ['--board-id=foobar', '--erase']
device.device_config.runner = 'pyocd'
device.device_config.id = ''
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd',
'--', '--board-id=foobar', '--erase'
]
def test_if_hardware_adapter_raises_exception_empty_command(device: HardwareAdapter) -> None:
device.command = []
exception_msg = 'Flash command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run()
def test_handler_and_device_log_correct_initialized_on_hardware(device: HardwareAdapter, tmp_path: Path) -> None:
device.device_config.build_dir = tmp_path
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, DeviceLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
assert device.device_log_file.filename.endswith('device.log') # type: ignore[union-attr]
device._flash_and_run()
@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
def test_device_log_correct_error_handle(patched_popen, device: HardwareAdapter, tmp_path: Path) -> None:
popen_mock = mock.Mock()
popen_mock.communicate.return_value = (b'', b'flashing error')
popen_mock.communicate.return_value = (b'flashing error', b'')
patched_popen.return_value = popen_mock
device.device_config.build_dir = tmp_path
device.initialize_log_files()
device.command = [
'west', 'flash', '--skip-rebuild', '--build-dir', str(tmp_path),
'--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
]
with pytest.raises(expected_exception=TwisterHarnessException, match='Could not flash device p_id'):
device.flash_and_run()
assert os.path.isfile(device.device_log_file.filename)
with open(device.device_log_file.filename, 'r') as file:
device._flash_and_run()
assert os.path.isfile(device.device_log_path)
with open(device.device_log_path, 'r') as file:
assert 'flashing error' in file.readlines()
@ -177,9 +183,10 @@ def test_if_hardware_adapter_uses_serial_pty(
serial_mock.port = '/pty/ttytest/456'
patched_serial.return_value = serial_mock
device._device_run.set()
device.connect()
assert device.connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
assert device.serial_pty_proc
assert device._serial_connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
assert device._serial_pty_proc
patched_popen.assert_called_with(
['script.py'],
stdout=123,
@ -188,19 +195,4 @@ def test_if_hardware_adapter_uses_serial_pty(
)
device.disconnect()
assert not device.connection
assert not device.serial_pty_proc
@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
def test_if_get_command_returns_proper_string_with_west_flash(patched_which, device: HardwareAdapter) -> None:
patched_which.return_value = 'west'
device.device_config.west_flash_extra_args = ['--board-id=foobar', '--erase']
device.device_config.runner = 'pyocd'
device.device_config.id = ''
device.generate_command()
assert isinstance(device.command, list)
assert device.command == [
'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd',
'--', '--board-id=foobar', '--erase'
]
assert not device._serial_pty_proc

View file

@ -3,98 +3,56 @@
# SPDX-License-Identifier: Apache-2.0
import os
import subprocess
from typing import Generator
from unittest import mock
from unittest.mock import patch
import pytest
from conftest import readlines_until
from twister_harness.device.qemu_adapter import QemuAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
from twister_harness.twister_harness_config import DeviceConfig
@pytest.fixture(name='device')
def fixture_device_adapter(tmp_path) -> Generator[QemuAdapter, None, None]:
build_dir = tmp_path / 'build_dir'
adapter = QemuAdapter(DeviceConfig(build_dir=build_dir))
yield adapter
os.mkdir(build_dir)
device = QemuAdapter(DeviceConfig(build_dir=build_dir))
try:
adapter.stop() # to make sure all running processes are closed
except TwisterHarnessException:
pass
yield device
finally:
device.close() # to make sure all running processes are closed
@patch('shutil.which', return_value='/usr/bin/west')
def test_if_generate_command_creates_proper_command(patched_which):
adapter = QemuAdapter(DeviceConfig(build_dir='build_dir'))
adapter.generate_command()
assert adapter.command == ['/usr/bin/west', 'build', '-d', 'build_dir', '-t', 'run']
@patch('shutil.which', return_value='west')
def test_if_generate_command_creates_proper_command(patched_which, device: QemuAdapter):
device.device_config.build_dir = 'build_dir'
device.generate_command()
assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run']
@patch('shutil.which', return_value=None)
def test_if_generate_command_creates_empty_listy_if_west_is_not_installed(patched_which):
adapter = QemuAdapter(DeviceConfig())
adapter.generate_command()
assert adapter.command == []
def test_if_qemu_adapter_raises_exception_for_empty_command(device) -> None:
device.command = []
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
with pytest.raises(TwisterHarnessException, match=exception_msg):
device.flash_and_run()
def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
device.command = ['dummy']
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
device.flash_and_run()
device.stop()
assert device._exc is not None
assert isinstance(device._exc, TwisterHarnessException)
@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
def test_if_qemu_adapter_raises_exception_when_subprocess_raised_an_error(patched_run, device):
device.command = ['echo', 'TEST']
with pytest.raises(TwisterHarnessException, match='Exception message'):
device.flash_and_run()
device.stop()
def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
fifo_file_path = str(tmp_path / 'qemu-fifo')
def test_if_qemu_adapter_runs_without_errors(resources, device: QemuAdapter) -> None:
fifo_file_path = str(device.device_config.build_dir / 'qemu-fifo')
script_path = resources.joinpath('fifo_mock.py')
device = QemuAdapter(DeviceConfig(build_dir=str(tmp_path)))
device.connection_timeout = 1
device.booting_timeout_in_ms = 1000
# device.base_timeout = 1
# device.booting_timeout_in_ms = 1000
device.command = ['python', str(script_path), fifo_file_path]
device.connect()
device.initialize_log_files()
device.flash_and_run()
lines = list(device.iter_stdout)
device.launch()
lines = readlines_until(device=device, line_pattern='Namespaces are one honking great idea')
device.close()
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_file.filename)
with open(device.handler_log_file.filename, 'r') as file:
assert os.path.isfile(device.handler_log_path)
with open(device.handler_log_path, 'r') as file:
file_lines = [line.strip() for line in file.readlines()]
assert file_lines[-2:] == lines[-2:]
device.disconnect()
def test_if_qemu_adapter_finishes_after_timeout(device) -> None:
device.connection_timeout = 0.1
device.command = ['sleep', '0.3']
device.flash_and_run()
device.stop()
assert device._process_ended_with_timeout is True
def test_handler_and_device_log_correct_initialized_on_qemu(device, tmp_path) -> None:
device.device_config.build_dir = tmp_path
device.initialize_log_files()
assert isinstance(device.handler_log_file, HandlerLogFile)
assert isinstance(device.device_log_file, NullLogFile)
assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
def test_if_qemu_adapter_raise_exception_due_to_no_fifo_connection(device: QemuAdapter) -> None:
device.base_timeout = 0.3
device.command = ['sleep', '1']
with pytest.raises(TwisterHarnessException, match='Problem with starting QEMU'):
device._flash_and_run()
device._close_device()
assert not os.path.exists(device._fifo_connection._fifo_in)
assert not os.path.exists(device._fifo_connection._fifo_out)

View file

@ -231,7 +231,7 @@ class Pytest(Harness):
def pytest_run(self, timeout):
try:
cmd = self.generate_command(timeout)
cmd = self.generate_command()
self.run_command(cmd, timeout)
except PytestHarnessException as pytest_exception:
logger.error(str(pytest_exception))
@ -242,7 +242,7 @@ class Pytest(Harness):
self.instance.handler.make_device_available(self.reserved_serial)
self._update_test_status()
def generate_command(self, timeout):
def generate_command(self):
config = self.instance.testsuite.harness_config
pytest_root = config.get('pytest_root', 'pytest') if config else 'pytest'
pytest_args = config.get('pytest_args', []) if config else []
@ -253,8 +253,7 @@ class Pytest(Harness):
'-q',
os.path.join(self.source_dir, pytest_root),
f'--build-dir={self.running_dir}',
f'--junit-xml={self.report_file}',
f'--connection-timeout={timeout}'
f'--junit-xml={self.report_file}'
]
command.extend(pytest_args)