scripts: pytest: unify timeouts
Add possibility of passing timeouts from Twister to pytest and unify various timeouts used in adapters to point to one main timeout. Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
This commit is contained in:
parent
2990586b45
commit
b486b2acab
9 changed files with 52 additions and 55 deletions
|
@ -23,6 +23,7 @@ class DeviceAbstract(abc.ABC):
|
|||
:param device_config: device configuration
|
||||
"""
|
||||
self.device_config: DeviceConfig = device_config
|
||||
self.connection_timeout: float = device_config.connection_timeout
|
||||
self.handler_log_file: LogFile = NullLogFile.create()
|
||||
self.device_log_file: LogFile = NullLogFile.create()
|
||||
self.iter_object: Generator[str, None, None] | None = None
|
||||
|
@ -36,7 +37,7 @@ class DeviceAbstract(abc.ABC):
|
|||
return env
|
||||
|
||||
@abc.abstractmethod
|
||||
def connect(self, timeout: float = 1) -> None:
|
||||
def connect(self) -> None:
|
||||
"""Connect with the device (e.g. via UART)"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -45,16 +46,10 @@ class DeviceAbstract(abc.ABC):
|
|||
|
||||
@abc.abstractmethod
|
||||
def generate_command(self) -> None:
|
||||
"""
|
||||
Generate command which will be used during flashing or running device.
|
||||
"""
|
||||
"""Generate command which will be used during flashing or running device."""
|
||||
|
||||
def flash_and_run(self, timeout: float = 60.0) -> None:
|
||||
"""
|
||||
Flash and run application on a device.
|
||||
|
||||
:param timeout: time out in seconds
|
||||
"""
|
||||
def flash_and_run(self) -> None:
|
||||
"""Flash and run application on a device."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def write(self, data: bytes) -> None:
|
||||
|
@ -62,9 +57,7 @@ class DeviceAbstract(abc.ABC):
|
|||
|
||||
@abc.abstractmethod
|
||||
def initialize_log_files(self):
|
||||
"""
|
||||
Initialize file to store logs.
|
||||
"""
|
||||
"""Initialize file to store logs."""
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop device."""
|
||||
|
|
|
@ -38,18 +38,15 @@ class HardwareAdapter(DeviceAbstract):
|
|||
}
|
||||
self.serial_pty_proc: subprocess.Popen | None = None
|
||||
|
||||
def connect(self, timeout: float = 1) -> None:
|
||||
"""
|
||||
Open serial connection.
|
||||
def connect(self) -> None:
|
||||
"""Open serial connection."""
|
||||
|
||||
:param timeout: Read timeout value in seconds
|
||||
"""
|
||||
if self.connection:
|
||||
# already opened
|
||||
return
|
||||
|
||||
if self.device_config.pre_script:
|
||||
self.run_custom_script(self.device_config.pre_script, 30)
|
||||
self.run_custom_script(self.device_config.pre_script, self.connection_timeout)
|
||||
|
||||
serial_name = self._open_serial_pty() or self.device_config.serial
|
||||
logger.info('Opening serial connection for %s', serial_name)
|
||||
|
@ -60,7 +57,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
timeout=timeout
|
||||
timeout=self.connection_timeout
|
||||
)
|
||||
except serial.SerialException as e:
|
||||
logger.exception('Cannot open connection: %s', e)
|
||||
|
@ -80,7 +77,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
|
||||
def stop(self) -> None:
|
||||
if self.device_config.post_script:
|
||||
self.run_custom_script(self.device_config.post_script, 30)
|
||||
self.run_custom_script(self.device_config.post_script, self.connection_timeout)
|
||||
|
||||
def _open_serial_pty(self) -> str | None:
|
||||
"""Open a pty pair, run process and return tty name"""
|
||||
|
@ -152,7 +149,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
self.command = command
|
||||
|
||||
@staticmethod
|
||||
def run_custom_script(script, timeout):
|
||||
def run_custom_script(script, timeout: float) -> None:
|
||||
with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
|
||||
try:
|
||||
stdout, stderr = proc.communicate(timeout=timeout)
|
||||
|
@ -165,7 +162,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
proc.communicate()
|
||||
logger.error("{} timed out".format(script))
|
||||
|
||||
def flash_and_run(self, timeout: float = 60.0) -> None:
|
||||
def flash_and_run(self) -> None:
|
||||
if not self.command:
|
||||
msg = 'Flash command is empty, please verify if it was generated properly.'
|
||||
logger.error(msg)
|
||||
|
@ -184,7 +181,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
else:
|
||||
stdout = stderr = None
|
||||
try:
|
||||
stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout)
|
||||
stdout, stderr = process.communicate(timeout=self.connection_timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
process.kill()
|
||||
finally:
|
||||
|
@ -199,7 +196,7 @@ class HardwareAdapter(DeviceAbstract):
|
|||
raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
|
||||
finally:
|
||||
if self.device_config.post_flash_script:
|
||||
self.run_custom_script(self.device_config.post_flash_script, 30)
|
||||
self.run_custom_script(self.device_config.post_flash_script, self.connection_timeout)
|
||||
|
||||
def iter_stdout_lines(self) -> Generator[str, None, None]:
|
||||
"""Return output from serial."""
|
||||
|
|
|
@ -41,7 +41,6 @@ class QemuAdapter(DeviceAbstract):
|
|||
self._emulation_was_finished: bool = False
|
||||
self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
|
||||
self.command: list[str] = []
|
||||
self.timeout: float = 60 # running timeout in seconds
|
||||
self.booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds
|
||||
|
||||
def generate_command(self) -> None:
|
||||
|
@ -52,18 +51,17 @@ class QemuAdapter(DeviceAbstract):
|
|||
else:
|
||||
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
|
||||
|
||||
def connect(self, timeout: float = 1) -> None:
|
||||
def connect(self) -> None:
|
||||
logger.debug('Opening connection')
|
||||
self.connection.connect()
|
||||
|
||||
def flash_and_run(self, timeout: float = 60.0) -> None:
|
||||
self.timeout = timeout
|
||||
def flash_and_run(self) -> None:
|
||||
if not self.command:
|
||||
msg = 'Run simulation command is empty, please verify if it was generated properly.'
|
||||
logger.error(msg)
|
||||
raise TwisterHarnessException(msg)
|
||||
|
||||
self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True)
|
||||
self._thread = threading.Thread(target=self._run_command, args=(self.connection_timeout,), daemon=True)
|
||||
self._thread.start()
|
||||
# Give a time to start subprocess before test is executed
|
||||
time.sleep(0.1)
|
||||
|
@ -167,7 +165,7 @@ class QemuAdapter(DeviceAbstract):
|
|||
t = threading.Thread(target=read_lines, daemon=True)
|
||||
t.start()
|
||||
|
||||
end_time = time.time() + self.timeout
|
||||
end_time = time.time() + self.connection_timeout
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
|
|
|
@ -72,15 +72,15 @@ class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
|
|||
}
|
||||
self._data_to_send: bytes | None = None
|
||||
|
||||
def connect(self, timeout: float = 1) -> None:
|
||||
def connect(self) -> None:
|
||||
pass # pragma: no cover
|
||||
|
||||
def flash_and_run(self, timeout: float = 60.0) -> None:
|
||||
def flash_and_run(self) -> None:
|
||||
if not self.command:
|
||||
msg = 'Run simulation command is empty, please verify if it was generated properly.'
|
||||
logger.error(msg)
|
||||
raise TwisterHarnessException(msg)
|
||||
self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
|
||||
self._thread = threading.Thread(target=self._run_simulation, args=(self.connection_timeout,), daemon=True)
|
||||
self._thread.start()
|
||||
# Give a time to start subprocess before test is executed
|
||||
time.sleep(0.1)
|
||||
|
@ -112,7 +112,7 @@ class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
|
|||
finally:
|
||||
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
|
||||
|
||||
async def _run_command(self, timeout: float = 60.):
|
||||
async def _run_command(self, timeout: float):
|
||||
assert isinstance(self.command, (list, tuple, set))
|
||||
# to avoid stupid and difficult to debug mistakes
|
||||
# we are using asyncio to run subprocess to be able to read from stdout
|
||||
|
|
|
@ -87,10 +87,10 @@ def pytest_addoption(parser: pytest.Parser):
|
|||
'will translate to "west flash -- --board-id=foobar --erase"'
|
||||
)
|
||||
twister_harness_group.addoption(
|
||||
'--flashing-timeout',
|
||||
type=int,
|
||||
default=60,
|
||||
help='Set timeout for the device flash operation in seconds.'
|
||||
'--connection-timeout',
|
||||
type=float,
|
||||
default=60.0,
|
||||
help='Set timeout for the connection with device in seconds.'
|
||||
)
|
||||
twister_harness_group.addoption(
|
||||
'--build-dir',
|
||||
|
|
|
@ -24,7 +24,7 @@ class DeviceConfig:
|
|||
product: str = ''
|
||||
serial_pty: str = ''
|
||||
west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
|
||||
flashing_timeout: int = 60 # [s]
|
||||
connection_timeout: float = 60.0 # [s]
|
||||
build_dir: Path | str = ''
|
||||
binary_file: Path | str = ''
|
||||
name: str = ''
|
||||
|
@ -59,7 +59,7 @@ class TwisterHarnessConfig:
|
|||
product=config.option.device_product,
|
||||
serial_pty=config.option.device_serial_pty,
|
||||
west_flash_extra_args=west_flash_extra_args,
|
||||
flashing_timeout=config.option.flashing_timeout,
|
||||
connection_timeout=config.option.connection_timeout,
|
||||
build_dir=config.option.build_dir,
|
||||
binary_file=config.option.binary_file,
|
||||
pre_script=config.option.pre_script,
|
||||
|
|
|
@ -45,13 +45,13 @@ def test_if_qemu_adapter_raises_exception_for_empty_command(device) -> None:
|
|||
device.command = []
|
||||
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
|
||||
with pytest.raises(TwisterHarnessException, match=exception_msg):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
|
||||
|
||||
def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
|
||||
device.command = ['dummy']
|
||||
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
assert device._exc is not None
|
||||
assert isinstance(device._exc, TwisterHarnessException)
|
||||
|
@ -61,7 +61,7 @@ def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
|
|||
def test_if_qemu_adapter_raises_exception_when_subprocess_raised_an_error(patched_run, device):
|
||||
device.command = ['echo', 'TEST']
|
||||
with pytest.raises(TwisterHarnessException, match='Exception message'):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
|
||||
|
||||
|
@ -69,11 +69,12 @@ def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
|
|||
fifo_file_path = str(tmp_path / 'qemu-fifo')
|
||||
script_path = resources.joinpath('fifo_mock.py')
|
||||
device = QemuAdapter(DeviceConfig(build_dir=str(tmp_path)))
|
||||
device.connection_timeout = 1
|
||||
device.booting_timeout_in_ms = 1000
|
||||
device.command = ['python', str(script_path), fifo_file_path]
|
||||
device.connect()
|
||||
device.initialize_log_files()
|
||||
device.flash_and_run(timeout=1)
|
||||
device.flash_and_run()
|
||||
lines = list(device.iter_stdout)
|
||||
assert 'Readability counts.' in lines
|
||||
assert os.path.isfile(device.handler_log_file.filename)
|
||||
|
@ -84,8 +85,9 @@ def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
|
|||
|
||||
|
||||
def test_if_qemu_adapter_finishes_after_timeout(device) -> None:
|
||||
device.connection_timeout = 0.1
|
||||
device.command = ['sleep', '0.3']
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
assert device._process_ended_with_timeout is True
|
||||
|
||||
|
|
|
@ -42,9 +42,10 @@ def test_if_native_simulator_adapter_runs_without_errors(
|
|||
"""
|
||||
script_path = resources.joinpath('mock_script.py')
|
||||
# patching original command by mock_script.py to simulate same behaviour as zephyr.exe
|
||||
device.connection_timeout = 4
|
||||
device.command = ['python3', str(script_path)]
|
||||
device.initialize_log_files()
|
||||
device.flash_and_run(timeout=4)
|
||||
device.flash_and_run()
|
||||
lines = list(device.iter_stdout) # give it time before close thread
|
||||
device.stop()
|
||||
assert device._process_ended_with_timeout is False
|
||||
|
@ -60,9 +61,10 @@ def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_da
|
|||
) -> None:
|
||||
"""Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
|
||||
script_path = resources.joinpath('mock_script.py')
|
||||
device.connection_timeout = 0.5
|
||||
device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
|
||||
device.initialize_log_files()
|
||||
device.flash_and_run(timeout=0.5)
|
||||
device.flash_and_run()
|
||||
lines = list(device.iter_stdout)
|
||||
device.stop()
|
||||
assert device._process_ended_with_timeout is True
|
||||
|
@ -72,19 +74,21 @@ def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_da
|
|||
|
||||
|
||||
def test_if_native_simulator_adapter_raises_exception_file_not_found(device: NativeSimulatorAdapter) -> None:
|
||||
device.connection_timeout = 0.1
|
||||
device.command = ['dummy']
|
||||
with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
assert device._exc is not None
|
||||
assert isinstance(device._exc, TwisterHarnessException)
|
||||
|
||||
|
||||
def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
|
||||
device.connection_timeout = 0.1
|
||||
device.command = []
|
||||
exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
|
||||
with pytest.raises(TwisterHarnessException, match=exception_msg):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
|
||||
|
||||
def test_handler_and_device_log_correct_initialized_on_simulators(device: NativeSimulatorAdapter) -> None:
|
||||
|
@ -98,9 +102,10 @@ def test_handler_and_device_log_correct_initialized_on_simulators(device: Native
|
|||
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
|
||||
patched_run, device: NativeSimulatorAdapter
|
||||
):
|
||||
device.connection_timeout = 0.1
|
||||
device.command = ['echo', 'TEST']
|
||||
with pytest.raises(TwisterHarnessException, match='Exception message'):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
|
||||
|
||||
|
@ -108,9 +113,10 @@ def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess
|
|||
def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
|
||||
patched_run, device: NativeSimulatorAdapter
|
||||
):
|
||||
device.connection_timeout = 0.1
|
||||
device.command = ['echo', 'TEST']
|
||||
with pytest.raises(TwisterHarnessException, match='Raised other exception'):
|
||||
device.flash_and_run(timeout=0.1)
|
||||
device.flash_and_run()
|
||||
device.stop()
|
||||
|
||||
|
||||
|
|
|
@ -231,7 +231,7 @@ class Pytest(Harness):
|
|||
|
||||
def pytest_run(self, timeout):
|
||||
try:
|
||||
cmd = self.generate_command()
|
||||
cmd = self.generate_command(timeout)
|
||||
self.run_command(cmd, timeout)
|
||||
except PytestHarnessException as pytest_exception:
|
||||
logger.error(str(pytest_exception))
|
||||
|
@ -242,7 +242,7 @@ class Pytest(Harness):
|
|||
self.instance.handler.make_device_available(self.reserved_serial)
|
||||
self._update_test_status()
|
||||
|
||||
def generate_command(self):
|
||||
def generate_command(self, timeout):
|
||||
config = self.instance.testsuite.harness_config
|
||||
pytest_root = config.get('pytest_root', 'pytest') if config else 'pytest'
|
||||
pytest_args = config.get('pytest_args', []) if config else []
|
||||
|
@ -253,7 +253,8 @@ class Pytest(Harness):
|
|||
'-q',
|
||||
os.path.join(self.source_dir, pytest_root),
|
||||
f'--build-dir={self.running_dir}',
|
||||
f'--junit-xml={self.report_file}'
|
||||
f'--junit-xml={self.report_file}',
|
||||
f'--connection-timeout={timeout}'
|
||||
]
|
||||
command.extend(pytest_args)
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue