scripts: twister: adaptation for pytest plugin

Making the necessary changes to enable the new pytest plugin.
By default Twister should work without the pytest-twister-harness
plugin installed. To achieve this, each time Twister calls pytest,
the PYTHONPATH environment variable is expanded and the
`-p twister_harness.plugin` option is added to the pytest command.

Co-authored-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
Signed-off-by: Grzegorz Chwierut <grzegorz.chwierut@nordicsemi.no>
This commit is contained in:
Grzegorz Chwierut 2023-05-26 11:43:36 +02:00 committed by Anas Nashif
commit f1f305f4ae
5 changed files with 207 additions and 117 deletions

View file

@ -6,6 +6,7 @@
# SPDX-License-Identifier: Apache-2.0
import os
import pkg_resources
import sys
from pathlib import Path
import json
@ -38,6 +39,9 @@ import zephyr_module
# Note "normalization" is different from canonicalization, see os.path.
canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE)
installed_packages = [pkg.project_name for pkg in pkg_resources.working_set] # pylint: disable=not-an-iterable
PYTEST_PLUGIN_INSTALLED = 'pytest-twister-harness' in installed_packages
def add_parse_arguments(parser = None):
if parser is None:
@ -240,6 +244,11 @@ Artificially long but functional example:
files in the directory will be processed. The directory should have the same
structure in the main Zephyr tree: boards/<arch>/<board_name>/""")
parser.add_argument(
"--allow-installed-plugin", action="store_true", default=None,
help="Allow to use pytest plugin installed by pip for pytest tests."
)
parser.add_argument(
"-a", "--arch", action="append",
help="Arch filter for testing. Takes precedence over --platform. "
@ -765,6 +774,16 @@ def parse_arguments(parser, args, options = None):
# Strip off the initial "--" following validation.
options.extra_test_args = options.extra_test_args[1:]
if not options.allow_installed_plugin and PYTEST_PLUGIN_INSTALLED:
logger.error("By default Twister should work without pytest-twister-harness "
"plugin being installed, so please, uninstall it by "
"`pip uninstall pytest-twister-harness` and `git clean "
"-dxf scripts/pylib/pytest-twister-harness`.")
sys.exit(1)
elif options.allow_installed_plugin and PYTEST_PLUGIN_INSTALLED:
logger.warning("You work with installed version of "
"pytest-twister-harness plugin.")
return options

View file

@ -41,17 +41,6 @@ logger.setLevel(logging.DEBUG)
SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native"]
class HarnessImporter:
def __init__(self, name):
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister/twisterlib"))
module = __import__("harness")
if name:
my_class = getattr(module, name)
else:
my_class = getattr(module, "Test")
self.instance = my_class()
class Handler:
def __init__(self, instance, type_str="build"):
@ -187,10 +176,6 @@ class BinaryHandler(Handler):
self.line = proc.stdout.readline()
def _output_handler(self, proc, harness):
if harness.is_pytest:
harness.handle(None)
return
with open(self.log, "wt") as log_out_fp:
timeout_extended = False
timeout_time = time.time() + self.timeout
@ -225,12 +210,7 @@ class BinaryHandler(Handler):
except subprocess.TimeoutExpired:
self.terminate(proc)
def handle(self):
harness_name = self.instance.testsuite.harness.capitalize()
harness_import = HarnessImporter(harness_name)
harness = harness_import.instance
harness.configure(self.instance)
def handle(self, harness):
robot_test = getattr(harness, "is_robot_test", False)
@ -304,9 +284,6 @@ class BinaryHandler(Handler):
if sys.stdout.isatty():
subprocess.call(["stty", "sane"], stdin=sys.stdout)
if harness.is_pytest:
harness.pytest_run(self.log)
self.instance.execution_time = handler_time
if not self.terminated and self.returncode != 0:
self.instance.status = "failed"
@ -353,10 +330,6 @@ class DeviceHandler(Handler):
super().__init__(instance, type_str)
def monitor_serial(self, ser, halt_event, harness):
if harness.is_pytest:
harness.handle(None)
return
log_out_fp = open(self.log, "wt")
if self.options.coverage:
@ -465,9 +438,7 @@ class DeviceHandler(Handler):
proc.communicate()
logger.error("{} timed out".format(script))
def handle(self):
runner = None
def get_hardware(self):
try:
hardware = self.device_is_available(self.instance)
while not hardware:
@ -477,6 +448,12 @@ class DeviceHandler(Handler):
self.instance.status = "failed"
self.instance.reason = str(error)
logger.error(self.instance.reason)
return hardware
def handle(self, harness):
runner = None
hardware = self.get_hardware()
if not hardware:
return
runner = hardware.runner or self.options.west_runner
@ -583,10 +560,6 @@ class DeviceHandler(Handler):
self.make_device_available(serial_device)
return
harness_name = self.instance.testsuite.harness.capitalize()
harness_import = HarnessImporter(harness_name)
harness = harness_import.instance
harness.configure(self.instance)
halt_monitor_evt = threading.Event()
t = threading.Thread(target=self.monitor_serial, daemon=True,
@ -656,9 +629,6 @@ class DeviceHandler(Handler):
handler_time = time.time() - start_time
if harness.is_pytest:
harness.pytest_run(self.log)
self.instance.execution_time = handler_time
if harness.state:
self.instance.status = harness.state
@ -778,11 +748,6 @@ class QEMUHandler(Handler):
if pid == 0 and os.path.exists(pid_fn):
pid = int(open(pid_fn).read())
if harness.is_pytest:
harness.handle(None)
out_state = harness.state
break
try:
c = in_fp.read(1).decode("utf-8")
except UnicodeDecodeError:
@ -826,10 +791,6 @@ class QEMUHandler(Handler):
timeout_time = time.time() + 2
line = ""
if harness.is_pytest:
harness.pytest_run(logfile)
out_state = harness.state
handler_time = time.time() - start_time
logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
@ -861,7 +822,7 @@ class QEMUHandler(Handler):
os.unlink(fifo_in)
os.unlink(fifo_out)
def handle(self):
def handle(self, harness):
self.results = {}
self.run = True
@ -889,10 +850,6 @@ class QEMUHandler(Handler):
self.log_fn = self.log
harness_import = HarnessImporter(self.instance.testsuite.harness.capitalize())
harness = harness_import.instance
harness.configure(self.instance)
self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
args=(self, self.timeout, self.build_dir,
self.log_fn, self.fifo_fn,

View file

@ -1,18 +1,27 @@
# SPDX-License-Identifier: Apache-2.0
from asyncio.log import logger
import platform
import re
import os
import sys
import subprocess
import shlex
from collections import OrderedDict
import xml.etree.ElementTree as ET
import logging
import time
import sys
from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
_WINDOWS = platform.system() == 'Windows'
SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
# pylint: disable=anomalous-backslash-in-string
result_re = re.compile(".*(PASS|FAIL|SKIP) - (test_)?(.*) in (\\d*[.,]?\\d*) seconds")
class Harness:
@ -48,7 +57,6 @@ class Harness:
self.recording = []
self.fieldnames = []
self.ztest = False
self.is_pytest = False
self.detected_suite_names = []
self.run_id = None
self.matched_run_id = False
@ -207,64 +215,120 @@ class Console(Harness):
else:
tc.status = "failed"
class PytestHarnessException(Exception):
"""General exception for pytest."""
class Pytest(Harness):
def configure(self, instance):
super(Pytest, self).configure(instance)
self.running_dir = instance.build_dir
self.source_dir = instance.testsuite.source_dir
self.pytest_root = 'pytest'
self.pytest_args = []
self.is_pytest = True
config = instance.testsuite.harness_config
self.report_file = os.path.join(self.running_dir, 'report.xml')
self.reserved_serial = None
if config:
self.pytest_root = config.get('pytest_root', 'pytest')
self.pytest_args = config.get('pytest_args', [])
def pytest_run(self):
try:
cmd = self.generate_command()
if not cmd:
logger.error('Pytest command not generated, check logs')
return
self.run_command(cmd)
except PytestHarnessException as pytest_exception:
logger.error(str(pytest_exception))
finally:
if self.reserved_serial:
self.instance.handler.make_device_available(self.reserved_serial)
self._apply_instance_status()
def handle(self, line):
''' Test cases that make use of pytest more care about results given
by pytest tool which is called in pytest_run(), so works of this
handle is trying to give a PASS or FAIL to avoid timeout, nothing
is writen into handler.log
'''
self.state = "passed"
tc = self.instance.get_case_or_create(self.id)
tc.status = "passed"
def pytest_run(self, log_file):
''' To keep artifacts of pytest in self.running_dir, pass this directory
by "--cmdopt". On pytest end, add a command line option and provide
the cmdopt through a fixture function
If pytest harness report failure, twister will direct user to see
handler.log, this method writes test result in handler.log
'''
cmd = [
'pytest',
'-s',
os.path.join(self.source_dir, self.pytest_root),
'--cmdopt',
self.running_dir,
'--junit-xml',
os.path.join(self.running_dir, 'report.xml'),
'-q'
def generate_command(self):
config = self.instance.testsuite.harness_config
pytest_root = config.get('pytest_root', 'pytest') if config else 'pytest'
pytest_args = config.get('pytest_args', []) if config else []
command = [
'pytest',
'--twister-harness',
'-s',
'-q',
os.path.join(self.source_dir, pytest_root),
f'--build-dir={self.running_dir}',
f'--junit-xml={self.report_file}'
]
command.extend(pytest_args)
for arg in self.pytest_args:
cmd.append(arg)
handler = self.instance.handler
log = open(log_file, "a")
outs = []
errs = []
if handler.options.verbose > 1:
command.append('--log-level=DEBUG')
if handler.type_str == 'device':
command.extend(
self._generate_parameters_for_hardware(handler)
)
elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
command.append(f'--device-type={handler.type_str}')
elif handler.type_str == 'build':
command.append('--device-type=custom')
else:
raise PytestHarnessException(f'Handling of handler {handler.type_str} not implemented yet')
return command
def _generate_parameters_for_hardware(self, handler):
command = ['--device-type=hardware']
hardware = handler.get_hardware()
if not hardware:
raise PytestHarnessException('Hardware is not available')
self.reserved_serial = hardware.serial_pty or hardware.serial
if hardware.serial_pty:
command.append(f'--device-serial-pty={hardware.serial_pty}')
else:
command.extend([
f'--device-serial={hardware.serial}',
f'--device-serial-baud={hardware.baud}'
])
options = handler.options
if runner := hardware.runner or options.west_runner:
command.append(f'--runner={runner}')
if options.west_flash and options.west_flash != []:
command.append(f'--west-flash-extra-args={options.west_flash}')
if board_id := hardware.probe_id or hardware.id:
command.append(f'--device-id={board_id}')
if hardware.product:
command.append(f'--device-product={hardware.product}')
if hardware.pre_script:
command.append(f'--pre-script={hardware.pre_script}')
if hardware.post_flash_script:
command.append(f'--post-flash-script={hardware.post_flash_script}')
if hardware.post_script:
command.append(f'--post-script={hardware.post_script}')
return command
def run_command(self, cmd):
cmd, env = self._update_command_with_env_dependencies(cmd)
logger.debug(
"Running pytest command: %s",
" ".join(shlex.quote(a) for a in cmd))
with subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE) as proc:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env) as proc:
try:
outs, errs = proc.communicate()
tree = ET.parse(os.path.join(self.running_dir, "report.xml"))
while proc.stdout.readable() and proc.poll() is None:
line = proc.stdout.readline().decode().strip()
if not line:
continue
logger.debug("PYTEST: %s", line)
proc.communicate()
tree = ET.parse(self.report_file)
root = tree.getroot()
for child in root:
if child.tag == 'testsuite':
@ -273,34 +337,62 @@ class Pytest(Harness):
elif child.attrib['skipped'] != '0':
self.state = "skipped"
elif child.attrib['errors'] != '0':
self.state = "errors"
self.state = "error"
else:
self.state = "passed"
self.instance.execution_time = float(child.attrib['time'])
except subprocess.TimeoutExpired:
proc.kill()
self.state = "failed"
except ET.ParseError:
self.state = "failed"
except IOError:
log.write("Can't access report.xml\n")
logger.warning("Can't access report.xml")
self.state = "failed"
tc = self.instance.get_case_or_create(self.id)
if self.state == "passed":
tc.status = "passed"
log.write("Pytest cases passed\n")
logger.debug("Pytest cases passed")
elif self.state == "skipped":
tc.status = "skipped"
log.write("Pytest cases skipped\n")
log.write("Please refer report.xml for detail")
logger.debug("Pytest cases skipped.")
else:
tc.status = "failed"
log.write("Pytest cases failed\n")
logger.info("Pytest cases failed.")
log.write("\nOutput from pytest:\n")
log.write(outs.decode('UTF-8'))
log.write(errs.decode('UTF-8'))
log.close()
@staticmethod
def _update_command_with_env_dependencies(cmd):
'''
If python plugin wasn't installed by pip, then try to indicate it to
pytest by update PYTHONPATH and append -p argument to pytest command.
'''
env = os.environ.copy()
if not PYTEST_PLUGIN_INSTALLED:
cmd.extend(['-p', 'twister_harness.plugin'])
pytest_plugin_path = os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'pytest-twister-harness', 'src')
env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
if _WINDOWS:
cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
else:
cmd_append_python_path = f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
else:
cmd_append_python_path = ''
cmd_to_print = cmd_append_python_path + shlex.join(cmd)
logger.debug('Running pytest command: %s', cmd_to_print)
return cmd, env
def _apply_instance_status(self):
if self.state:
self.instance.status = self.state
if self.state in ["error", "failed"]:
self.instance.reason = "Pytest failed"
else:
self.instance.status = "failed"
self.instance.reason = "Pytest timeout"
if self.instance.status in ["error", "failed"]:
self.instance.add_missing_case_status("blocked", self.instance.reason)
class Gtest(Harness):
@ -439,5 +531,18 @@ class Test(Harness):
else:
tc.status = "failed"
class Ztest(Test):
pass
class HarnessImporter:
@staticmethod
def get_harness(harness_name):
thismodule = sys.modules[__name__]
if harness_name:
harness_class = getattr(thismodule, harness_name)
else:
harness_class = getattr(thismodule, 'Test')
return harness_class()

View file

@ -40,6 +40,7 @@ if sys.platform == 'linux':
from twisterlib.log_helper import log_command
from twisterlib.testinstance import TestInstance
from twisterlib.testplan import change_skip_to_error_if_integration
from twisterlib.harness import HarnessImporter, Pytest
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
@ -1027,7 +1028,12 @@ class ProjectBuilder(FilterBuilder):
if self.options.extra_test_args and instance.platform.arch == "posix":
instance.handler.extra_test_args = self.options.extra_test_args
instance.handler.handle()
harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
harness.configure(instance)
if isinstance(harness, Pytest):
harness.pytest_run()
else:
instance.handler.handle(harness)
sys.stdout.flush()

View file

@ -15,6 +15,7 @@ from twisterlib.testsuite import TestCase
from twisterlib.error import BuildError
from twisterlib.size_calc import SizeCalculator
from twisterlib.handlers import Handler, SimulationHandler, BinaryHandler, QEMUHandler, DeviceHandler, SUPPORTED_SIMS
from twisterlib.harness import SUPPORTED_SIMS_IN_PYTEST
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
@ -195,14 +196,16 @@ class TestInstance:
self.platform.simulation in SUPPORTED_SIMS or \
filter == 'runnable')
for sim in ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native']:
if self.platform.simulation == sim and self.platform.simulation_exec:
if not shutil.which(self.platform.simulation_exec):
target_ready = False
break
else:
target_ready = True
# check if test is runnable in pytest
if self.testsuite.harness == 'pytest':
target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST)
SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native']
if filter != 'runnable' and \
self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \
self.platform.simulation_exec:
if not shutil.which(self.platform.simulation_exec):
target_ready = False
testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)