scripts: pytest: add readlines_until

Add readlines_until method to DeviceAdapter class to simplify using
adapters in tests by give user possibility to read output from device
until following conditions:
- provided regex was found
- provided number of lines was already read
- timeout was occurred

Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
This commit is contained in:
Piotr Golyzniak 2023-08-04 16:39:23 +02:00 committed by Anas Nashif
commit a4a550888e
5 changed files with 50 additions and 26 deletions

View file

@ -11,16 +11,6 @@ from twister_harness.exceptions import TwisterHarnessTimeoutException
logger = logging.getLogger(__name__)
def wait_for_message(dut: DeviceAdapter, message, timeout=20):
time_started = time.time()
while True:
line = dut.readline(timeout=timeout)
if message in line:
return True
if time.time() > time_started + timeout:
return False
def wait_for_prompt(dut: DeviceAdapter, prompt='uart:~$', timeout=20):
time_started = time.time()
while True:
@ -40,10 +30,10 @@ def wait_for_prompt(dut: DeviceAdapter, prompt='uart:~$', timeout=20):
def test_shell_print_help(dut: DeviceAdapter):
wait_for_prompt(dut)
dut.write(b'help\n')
assert wait_for_message(dut, "Available commands")
dut.readlines_until(regex="Available commands", timeout=5)
def test_shell_print_version(dut: DeviceAdapter):
wait_for_prompt(dut)
dut.write(b'kernel version\n')
assert wait_for_message(dut, "Zephyr version")
dut.readlines_until(regex="Zephyr version", timeout=5)

View file

@ -8,6 +8,7 @@ import abc
import logging
import os
import queue
import re
import shutil
import threading
import time
@ -112,6 +113,50 @@ class DeviceAdapter(abc.ABC):
logger.debug('#: %s', data)
return data
def readlines_until(
self,
regex: str | None = None,
num_of_lines: int | None = None,
timeout: float | None = None,
print_output: bool = True,
) -> list[str]:
"""
Read available output lines produced by device from internal buffer
until following conditions:
1. If regex is provided - read until regex regex is found in read
line (or until timeout)
2. If num_of_lines is provided - read until number of read lines is
equal to num_of_lines (or until timeout)
3. If none of above is provided - return immediately lines collected so
far in internal queue
If timeout is not provided, then use base_timeout
"""
timeout = timeout or self.base_timeout
if regex:
regex_compiled = re.compile(regex)
lines: list[str] = []
if regex or num_of_lines:
timeout_time: float = time.time() + timeout
while time.time() < timeout_time:
try:
line = self.readline(0.1, print_output)
except TwisterHarnessTimeoutException:
continue
lines.append(line)
if regex and regex_compiled.search(line):
break
if num_of_lines and len(lines) == num_of_lines:
break
else:
msg = 'Read from device timeout occurred'
logger.error(msg)
raise TwisterHarnessTimeoutException(msg)
else:
lines = self.readlines(print_output)
return lines
def readlines(self, print_output: bool = True) -> list[str]:
"""
Read all available output lines produced by device from internal buffer.

View file

@ -21,12 +21,3 @@ def copy_example(pytester) -> Path:
resources_dir = Path(__file__).parent / 'data'
pytester.copy_example(str(resources_dir))
return pytester.path
def readlines_until(device, line_pattern: str):
lines = []
while True:
line = device.readline()
lines.append(line)
if line_pattern in line:
return lines

View file

@ -9,7 +9,6 @@ from unittest import mock
import pytest
from conftest import readlines_until
from twister_harness.device.binary_adapter import (
CustomSimulatorAdapter,
NativeSimulatorAdapter,
@ -35,7 +34,7 @@ def test_if_simulator_adapter_runs_without_errors(
# patching original command by mock_script.py to simulate same behaviour as zephyr.exe
device.command = ['python3', str(script_path)]
device.launch()
lines = readlines_until(device=device, line_pattern='Returns with code')
lines = device.readlines_until(regex='Returns with code')
device.close()
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_path)
@ -53,7 +52,7 @@ def test_if_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from
device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
device.launch()
with pytest.raises(TwisterHarnessTimeoutException, match='Read from device timeout occurred'):
readlines_until(device=device, line_pattern='Returns with code')
device.readlines_until(regex='Returns with code')
device.close()
assert device._process is None
with open(device.handler_log_path, 'r') as file:

View file

@ -9,7 +9,6 @@ from unittest.mock import patch
import pytest
from conftest import readlines_until
from twister_harness.device.qemu_adapter import QemuAdapter
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.twister_harness_config import DeviceConfig
@ -38,7 +37,7 @@ def test_if_qemu_adapter_runs_without_errors(resources, device: QemuAdapter) ->
script_path = resources.joinpath('fifo_mock.py')
device.command = ['python', str(script_path), fifo_file_path]
device.launch()
lines = readlines_until(device=device, line_pattern='Namespaces are one honking great idea')
lines = device.readlines_until(regex='Namespaces are one honking great idea')
device.close()
assert 'Readability counts.' in lines
assert os.path.isfile(device.handler_log_path)