This fixes ruff linting error E501, indicating overlong lines. Signed-off-by: Lukasz Mrugala <lukaszx.mrugala@intel.com>
961 lines
38 KiB
Python
961 lines
38 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from collections import OrderedDict
|
|
from enum import Enum
|
|
|
|
from pytest import ExitCode
|
|
from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST
|
|
from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE
|
|
from twisterlib.error import ConfigurationError, StatusAttributeError
|
|
from twisterlib.handlers import Handler, terminate_process
|
|
from twisterlib.reports import ReportStatus
|
|
from twisterlib.statuses import TwisterStatus
|
|
from twisterlib.testinstance import TestInstance
|
|
|
|
logger = logging.getLogger('twister')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
_WINDOWS = platform.system() == 'Windows'
|
|
|
|
|
|
class Harness:
|
|
GCOV_START = "GCOV_COVERAGE_DUMP_START"
|
|
GCOV_END = "GCOV_COVERAGE_DUMP_END"
|
|
FAULT = "ZEPHYR FATAL ERROR"
|
|
RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
|
|
RUN_FAILED = "PROJECT EXECUTION FAILED"
|
|
run_id_pattern = r"RunID: (?P<run_id>.*)"
|
|
|
|
def __init__(self):
|
|
self._status = TwisterStatus.NONE
|
|
self.reason = None
|
|
self.type = None
|
|
self.regex = []
|
|
self.matches = OrderedDict()
|
|
self.ordered = True
|
|
self.id = None
|
|
self.fail_on_fault = True
|
|
self.fault = False
|
|
self.capture_coverage = False
|
|
self.next_pattern = 0
|
|
self.record = None
|
|
self.record_pattern = None
|
|
self.record_as_json = None
|
|
self.recording = []
|
|
self.ztest = False
|
|
self.detected_suite_names = []
|
|
self.run_id = None
|
|
self.started_suites = {}
|
|
self.started_cases = {}
|
|
self.matched_run_id = False
|
|
self.run_id_exists = False
|
|
self.instance: TestInstance | None = None
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
|
|
|
|
@property
|
|
def trace(self) -> bool:
|
|
return self.instance.handler.options.verbose > 2
|
|
|
|
@property
|
|
def status(self) -> TwisterStatus:
|
|
return self._status
|
|
|
|
@status.setter
|
|
def status(self, value : TwisterStatus) -> None:
|
|
# Check for illegal assignments by value
|
|
try:
|
|
key = value.name if isinstance(value, Enum) else value
|
|
self._status = TwisterStatus[key]
|
|
except KeyError as err:
|
|
raise StatusAttributeError(self.__class__, value) from err
|
|
|
|
def configure(self, instance):
|
|
self.instance = instance
|
|
config = instance.testsuite.harness_config
|
|
self.id = instance.testsuite.id
|
|
self.run_id = instance.run_id
|
|
if instance.testsuite.ignore_faults:
|
|
self.fail_on_fault = False
|
|
|
|
if config:
|
|
self.type = config.get('type', None)
|
|
self.regex = config.get('regex', [])
|
|
self.ordered = config.get('ordered', True)
|
|
self.record = config.get('record', {})
|
|
if self.record:
|
|
self.record_pattern = re.compile(self.record.get("regex", ""))
|
|
self.record_as_json = self.record.get("as_json")
|
|
|
|
def build(self):
|
|
pass
|
|
|
|
def get_testcase_name(self):
|
|
"""
|
|
Get current TestCase name.
|
|
"""
|
|
return self.id
|
|
|
|
def translate_record(self, record: dict) -> dict:
|
|
if self.record_as_json:
|
|
for k in self.record_as_json:
|
|
if k not in record:
|
|
continue
|
|
try:
|
|
record[k] = json.loads(record[k]) if record[k] else {}
|
|
except json.JSONDecodeError as parse_error:
|
|
logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
|
|
f" {parse_error} for '{k}':'{record[k]}'")
|
|
# Don't set the Harness state to failed for recordings.
|
|
record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
|
|
return record
|
|
|
|
def parse_record(self, line) -> re.Match:
|
|
match = None
|
|
if self.record_pattern:
|
|
match = self.record_pattern.search(line)
|
|
if match:
|
|
rec = self.translate_record(
|
|
{ k:v.strip() for k,v in match.groupdict(default="").items() }
|
|
)
|
|
self.recording.append(rec)
|
|
return match
|
|
#
|
|
|
|
def process_test(self, line):
|
|
|
|
self.parse_record(line)
|
|
|
|
runid_match = re.search(self.run_id_pattern, line)
|
|
if runid_match:
|
|
run_id = runid_match.group("run_id")
|
|
self.run_id_exists = True
|
|
if run_id == str(self.run_id):
|
|
self.matched_run_id = True
|
|
|
|
if self.RUN_PASSED in line:
|
|
if self.fault:
|
|
self.status = TwisterStatus.FAIL
|
|
self.reason = "Fault detected while running test"
|
|
else:
|
|
self.status = TwisterStatus.PASS
|
|
|
|
if self.RUN_FAILED in line:
|
|
self.status = TwisterStatus.FAIL
|
|
self.reason = "Testsuite failed"
|
|
|
|
if self.fail_on_fault and line == self.FAULT:
|
|
self.fault = True
|
|
|
|
if self.GCOV_START in line:
|
|
self.capture_coverage = True
|
|
elif self.GCOV_END in line:
|
|
self.capture_coverage = False
|
|
|
|
class Robot(Harness):
|
|
|
|
is_robot_test = True
|
|
|
|
def configure(self, instance):
|
|
super().configure(instance)
|
|
self.instance = instance
|
|
|
|
config = instance.testsuite.harness_config
|
|
if config:
|
|
self.path = config.get('robot_testsuite', None)
|
|
self.option = config.get('robot_option', None)
|
|
|
|
def handle(self, line):
|
|
''' Test cases that make use of this harness care about results given
|
|
by Robot Framework which is called in run_robot_test(), so works of this
|
|
handle is trying to give a PASS or FAIL to avoid timeout, nothing
|
|
is writen into handler.log
|
|
'''
|
|
self.instance.status = TwisterStatus.PASS
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
tc.status = TwisterStatus.PASS
|
|
|
|
def run_robot_test(self, command, handler):
|
|
start_time = time.time()
|
|
env = os.environ.copy()
|
|
|
|
if self.option:
|
|
if isinstance(self.option, list):
|
|
for option in self.option:
|
|
for v in str(option).split():
|
|
command.append(f'{v}')
|
|
else:
|
|
for v in str(self.option).split():
|
|
command.append(f'{v}')
|
|
|
|
if self.path is None:
|
|
raise PytestHarnessException('The parameter robot_testsuite is mandatory')
|
|
|
|
if isinstance(self.path, list):
|
|
for suite in self.path:
|
|
command.append(os.path.join(handler.sourcedir, suite))
|
|
else:
|
|
command.append(os.path.join(handler.sourcedir, self.path))
|
|
|
|
with subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
|
|
out, _ = renode_test_proc.communicate()
|
|
|
|
self.instance.execution_time = time.time() - start_time
|
|
|
|
if renode_test_proc.returncode == 0:
|
|
self.instance.status = TwisterStatus.PASS
|
|
# all tests in one Robot file are treated as a single test case,
|
|
# so its status should be set accordingly to the instance status
|
|
# please note that there should be only one testcase in testcases list
|
|
self.instance.testcases[0].status = TwisterStatus.PASS
|
|
else:
|
|
logger.error(
|
|
f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}"
|
|
)
|
|
self.instance.status = TwisterStatus.FAIL
|
|
self.instance.testcases[0].status = TwisterStatus.FAIL
|
|
|
|
if out:
|
|
with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log:
|
|
log_msg = out.decode(sys.getdefaultencoding())
|
|
log.write(log_msg)
|
|
|
|
class Console(Harness):
|
|
|
|
def get_testcase_name(self):
|
|
'''
|
|
Get current TestCase name.
|
|
|
|
Console Harness id has only TestSuite id without TestCase name suffix.
|
|
Only the first TestCase name might be taken if available when a Ztest with
|
|
a single test case is configured to use this harness type for simplified
|
|
output parsing instead of the Ztest harness as Ztest suite should do.
|
|
'''
|
|
if self.instance and len(self.instance.testcases) == 1:
|
|
return self.instance.testcases[0].name
|
|
return super().get_testcase_name()
|
|
|
|
def configure(self, instance):
|
|
super().configure(instance)
|
|
if self.regex is None or len(self.regex) == 0:
|
|
self.status = TwisterStatus.FAIL
|
|
tc = self.instance.set_case_status_by_name(
|
|
self.get_testcase_name(),
|
|
TwisterStatus.FAIL,
|
|
f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
|
|
)
|
|
raise ConfigurationError(self.instance.name, tc.reason)
|
|
if self.type == "one_line":
|
|
self.pattern = re.compile(self.regex[0])
|
|
self.patterns_expected = 1
|
|
elif self.type == "multi_line":
|
|
self.patterns = []
|
|
for r in self.regex:
|
|
self.patterns.append(re.compile(r))
|
|
self.patterns_expected = len(self.patterns)
|
|
else:
|
|
self.status = TwisterStatus.FAIL
|
|
tc = self.instance.set_case_status_by_name(
|
|
self.get_testcase_name(),
|
|
TwisterStatus.FAIL,
|
|
f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
|
|
)
|
|
raise ConfigurationError(self.instance.name, tc.reason)
|
|
#
|
|
|
|
def handle(self, line):
|
|
if self.type == "one_line":
|
|
if self.pattern.search(line):
|
|
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
|
|
f"'{self.pattern.pattern}'")
|
|
self.next_pattern += 1
|
|
self.status = TwisterStatus.PASS
|
|
elif self.type == "multi_line" and self.ordered:
|
|
if (self.next_pattern < len(self.patterns) and
|
|
self.patterns[self.next_pattern].search(line)):
|
|
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
|
|
f"{self.next_pattern + 1}/{self.patterns_expected}):"
|
|
f"'{self.patterns[self.next_pattern].pattern}'")
|
|
self.next_pattern += 1
|
|
if self.next_pattern >= len(self.patterns):
|
|
self.status = TwisterStatus.PASS
|
|
elif self.type == "multi_line" and not self.ordered:
|
|
for i, pattern in enumerate(self.patterns):
|
|
r = self.regex[i]
|
|
if pattern.search(line) and r not in self.matches:
|
|
self.matches[r] = line
|
|
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
|
|
f"{len(self.matches)}/{self.patterns_expected}):"
|
|
f"'{pattern.pattern}'")
|
|
if len(self.matches) == len(self.regex):
|
|
self.status = TwisterStatus.PASS
|
|
else:
|
|
logger.error("Unknown harness_config type")
|
|
|
|
if self.fail_on_fault and self.FAULT in line:
|
|
self.fault = True
|
|
|
|
if self.GCOV_START in line:
|
|
self.capture_coverage = True
|
|
elif self.GCOV_END in line:
|
|
self.capture_coverage = False
|
|
|
|
self.process_test(line)
|
|
# Reset the resulting test state to FAIL when not all of the patterns were
|
|
# found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
|
|
# It might happen because of the pattern sequence diverged from the
|
|
# test code, the test platform has console issues, or even some other
|
|
# test image was executed.
|
|
# TODO: Introduce explicit match policy type to reject
|
|
# unexpected console output, allow missing patterns, deny duplicates.
|
|
if self.status == TwisterStatus.PASS and \
|
|
self.ordered and \
|
|
self.next_pattern < self.patterns_expected:
|
|
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
|
|
f" {self.next_pattern} of {self.patterns_expected}"
|
|
f" expected ordered patterns.")
|
|
self.status = TwisterStatus.FAIL
|
|
self.reason = "patterns did not match (ordered)"
|
|
if self.status == TwisterStatus.PASS and \
|
|
not self.ordered and \
|
|
len(self.matches) < self.patterns_expected:
|
|
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
|
|
f" {len(self.matches)} of {self.patterns_expected}"
|
|
f" expected unordered patterns.")
|
|
self.status = TwisterStatus.FAIL
|
|
self.reason = "patterns did not match (unordered)"
|
|
|
|
tc = self.instance.get_case_or_create(self.get_testcase_name())
|
|
if self.status == TwisterStatus.PASS:
|
|
tc.status = TwisterStatus.PASS
|
|
else:
|
|
tc.status = TwisterStatus.FAIL
|
|
|
|
|
|
class PytestHarnessException(Exception):
|
|
"""General exception for pytest."""
|
|
|
|
|
|
class Pytest(Harness):
|
|
|
|
def configure(self, instance: TestInstance):
|
|
super().configure(instance)
|
|
self.running_dir = instance.build_dir
|
|
self.source_dir = instance.testsuite.source_dir
|
|
self.report_file = os.path.join(self.running_dir, 'report.xml')
|
|
self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
|
|
self.reserved_dut = None
|
|
self._output = []
|
|
|
|
def pytest_run(self, timeout):
|
|
try:
|
|
cmd = self.generate_command()
|
|
self.run_command(cmd, timeout)
|
|
except PytestHarnessException as pytest_exception:
|
|
logger.error(str(pytest_exception))
|
|
self.status = TwisterStatus.FAIL
|
|
self.instance.reason = str(pytest_exception)
|
|
finally:
|
|
self.instance.record(self.recording)
|
|
self._update_test_status()
|
|
if self.reserved_dut:
|
|
self.instance.handler.make_dut_available(self.reserved_dut)
|
|
|
|
def generate_command(self):
|
|
config = self.instance.testsuite.harness_config
|
|
handler: Handler = self.instance.handler
|
|
pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
|
|
pytest_args_yaml = config.get('pytest_args', []) if config else []
|
|
pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
|
|
command = [
|
|
'pytest',
|
|
'--twister-harness',
|
|
'-s', '-v',
|
|
f'--build-dir={self.running_dir}',
|
|
f'--junit-xml={self.report_file}',
|
|
'--log-file-level=DEBUG',
|
|
'--log-file-format=%(asctime)s.%(msecs)d:%(levelname)s:%(name)s: %(message)s',
|
|
f'--log-file={self.pytest_log_file_path}',
|
|
f'--platform={self.instance.platform.name}'
|
|
]
|
|
command.extend([os.path.normpath(os.path.join(
|
|
self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
|
|
|
|
if pytest_dut_scope:
|
|
command.append(f'--dut-scope={pytest_dut_scope}')
|
|
|
|
# Always pass output from the pytest test and the test image up to Twister log.
|
|
command.extend([
|
|
'--log-cli-level=DEBUG',
|
|
'--log-cli-format=%(levelname)s: %(message)s'
|
|
])
|
|
|
|
# Use the test timeout as the base timeout for pytest
|
|
base_timeout = handler.get_test_timeout()
|
|
command.append(f'--base-timeout={base_timeout}')
|
|
|
|
if handler.type_str == 'device':
|
|
command.extend(
|
|
self._generate_parameters_for_hardware(handler)
|
|
)
|
|
elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
|
|
command.append(f'--device-type={handler.type_str}')
|
|
elif handler.type_str == 'build':
|
|
command.append('--device-type=custom')
|
|
else:
|
|
raise PytestHarnessException(
|
|
f'Support for handler {handler.type_str} not implemented yet'
|
|
)
|
|
|
|
if handler.type_str != 'device':
|
|
for fixture in handler.options.fixture:
|
|
command.append(f'--twister-fixture={fixture}')
|
|
|
|
command.extend(pytest_args_yaml)
|
|
|
|
if handler.options.pytest_args:
|
|
command.extend(handler.options.pytest_args)
|
|
|
|
return command
|
|
|
|
def _generate_parameters_for_hardware(self, handler: Handler):
|
|
command = ['--device-type=hardware']
|
|
hardware = handler.get_hardware()
|
|
if not hardware:
|
|
raise PytestHarnessException('Hardware is not available')
|
|
# update the instance with the device id to have it in the summary report
|
|
self.instance.dut = hardware.id
|
|
|
|
self.reserved_dut = hardware
|
|
if hardware.serial_pty:
|
|
command.append(f'--device-serial-pty={hardware.serial_pty}')
|
|
else:
|
|
command.extend([
|
|
f'--device-serial={hardware.serial}',
|
|
f'--device-serial-baud={hardware.baud}'
|
|
])
|
|
|
|
if hardware.flash_timeout:
|
|
command.append(f'--flash-timeout={hardware.flash_timeout}')
|
|
|
|
options = handler.options
|
|
if runner := hardware.runner or options.west_runner:
|
|
command.append(f'--runner={runner}')
|
|
|
|
if hardware.runner_params:
|
|
for param in hardware.runner_params:
|
|
command.append(f'--runner-params={param}')
|
|
|
|
if options.west_flash and options.west_flash != []:
|
|
command.append(f'--west-flash-extra-args={options.west_flash}')
|
|
|
|
if board_id := hardware.probe_id or hardware.id:
|
|
command.append(f'--device-id={board_id}')
|
|
|
|
if hardware.product:
|
|
command.append(f'--device-product={hardware.product}')
|
|
|
|
if hardware.pre_script:
|
|
command.append(f'--pre-script={hardware.pre_script}')
|
|
|
|
if hardware.post_flash_script:
|
|
command.append(f'--post-flash-script={hardware.post_flash_script}')
|
|
|
|
if hardware.post_script:
|
|
command.append(f'--post-script={hardware.post_script}')
|
|
|
|
if hardware.flash_before:
|
|
command.append(f'--flash-before={hardware.flash_before}')
|
|
|
|
for fixture in hardware.fixtures:
|
|
command.append(f'--twister-fixture={fixture}')
|
|
|
|
return command
|
|
|
|
def run_command(self, cmd, timeout):
|
|
cmd, env = self._update_command_with_env_dependencies(cmd)
|
|
with subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
env=env
|
|
) as proc:
|
|
try:
|
|
reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
|
|
reader_t.start()
|
|
reader_t.join(timeout)
|
|
if reader_t.is_alive():
|
|
terminate_process(proc)
|
|
logger.warning('Timeout has occurred. Can be extended in testspec file. '
|
|
f'Currently set to {timeout} seconds.')
|
|
self.instance.reason = 'Pytest timeout'
|
|
self.status = TwisterStatus.FAIL
|
|
proc.wait(timeout)
|
|
except subprocess.TimeoutExpired:
|
|
self.status = TwisterStatus.FAIL
|
|
proc.kill()
|
|
|
|
if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
|
|
self.status = TwisterStatus.ERROR
|
|
self.instance.reason = f'Pytest error - return code {proc.returncode}'
|
|
with open(self.pytest_log_file_path, 'w') as log_file:
|
|
log_file.write(shlex.join(cmd) + '\n\n')
|
|
log_file.write('\n'.join(self._output))
|
|
|
|
@staticmethod
|
|
def _update_command_with_env_dependencies(cmd):
|
|
'''
|
|
If python plugin wasn't installed by pip, then try to indicate it to
|
|
pytest by update PYTHONPATH and append -p argument to pytest command.
|
|
'''
|
|
env = os.environ.copy()
|
|
if not PYTEST_PLUGIN_INSTALLED:
|
|
cmd.extend(['-p', 'twister_harness.plugin'])
|
|
pytest_plugin_path = os.path.join(
|
|
ZEPHYR_BASE,
|
|
'scripts',
|
|
'pylib',
|
|
'pytest-twister-harness',
|
|
'src'
|
|
)
|
|
env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
|
|
if _WINDOWS:
|
|
cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
|
|
else:
|
|
cmd_append_python_path = (
|
|
f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
|
|
)
|
|
else:
|
|
cmd_append_python_path = ''
|
|
cmd_to_print = cmd_append_python_path + shlex.join(cmd)
|
|
logger.debug(f'Running pytest command: {cmd_to_print}')
|
|
|
|
return cmd, env
|
|
|
|
def _output_reader(self, proc):
|
|
self._output = []
|
|
while proc.stdout.readable() and proc.poll() is None:
|
|
line = proc.stdout.readline().decode().strip()
|
|
if not line:
|
|
continue
|
|
self._output.append(line)
|
|
logger.debug(f'PYTEST: {line}')
|
|
self.parse_record(line)
|
|
proc.communicate()
|
|
|
|
def _update_test_status(self):
|
|
if self.status == TwisterStatus.NONE:
|
|
self.instance.testcases = []
|
|
try:
|
|
self._parse_report_file(self.report_file)
|
|
except Exception as e:
|
|
logger.error(f'Error when parsing file {self.report_file}: {e}')
|
|
self.status = TwisterStatus.FAIL
|
|
finally:
|
|
if not self.instance.testcases:
|
|
self.instance.init_cases()
|
|
|
|
self.instance.status = self.status if self.status != TwisterStatus.NONE else \
|
|
TwisterStatus.FAIL
|
|
if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
|
|
self.instance.reason = self.instance.reason or 'Pytest failed'
|
|
self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
|
|
|
|
def _parse_report_file(self, report):
|
|
tree = ET.parse(report)
|
|
root = tree.getroot()
|
|
|
|
if (elem_ts := root.find('testsuite')) is not None:
|
|
if elem_ts.get('failures') != '0':
|
|
self.status = TwisterStatus.FAIL
|
|
self.instance.reason = (
|
|
f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
|
|
)
|
|
elif elem_ts.get('errors') != '0':
|
|
self.status = TwisterStatus.ERROR
|
|
self.instance.reason = 'Error during pytest execution'
|
|
elif elem_ts.get('skipped') == elem_ts.get('tests'):
|
|
self.status = TwisterStatus.SKIP
|
|
else:
|
|
self.status = TwisterStatus.PASS
|
|
self.instance.execution_time = float(elem_ts.get('time'))
|
|
|
|
for elem_tc in elem_ts.findall('testcase'):
|
|
tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
|
|
tc.duration = float(elem_tc.get('time'))
|
|
elem = elem_tc.find('*')
|
|
if elem is None:
|
|
tc.status = TwisterStatus.PASS
|
|
else:
|
|
if elem.tag == ReportStatus.SKIP:
|
|
tc.status = TwisterStatus.SKIP
|
|
elif elem.tag == ReportStatus.FAIL:
|
|
tc.status = TwisterStatus.FAIL
|
|
else:
|
|
tc.status = TwisterStatus.ERROR
|
|
tc.reason = elem.get('message')
|
|
tc.output = elem.text
|
|
else:
|
|
self.status = TwisterStatus.SKIP
|
|
self.instance.reason = 'No tests collected'
|
|
|
|
|
|
class Gtest(Harness):
|
|
ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
_NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
|
|
_SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
|
|
TEST_START_PATTERN = f".*\\[ RUN \\] {_SUITE_TEST_NAME_PATTERN}"
|
|
TEST_PASS_PATTERN = f".*\\[ OK \\] {_SUITE_TEST_NAME_PATTERN}"
|
|
TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}"
|
|
TEST_FAIL_PATTERN = f".*\\[ FAILED \\] {_SUITE_TEST_NAME_PATTERN}"
|
|
FINISHED_PATTERN = (
|
|
".*(?:\\[==========\\] Done running all tests\\.|"
|
|
+ "\\[----------\\] Global test environment tear-down)"
|
|
)
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.tc = None
|
|
self.has_failures = False
|
|
|
|
def handle(self, line):
|
|
# Strip the ANSI characters, they mess up the patterns
|
|
non_ansi_line = self.ANSI_ESCAPE.sub('', line)
|
|
|
|
if self.status != TwisterStatus.NONE:
|
|
return
|
|
|
|
# Check if we started running a new test
|
|
test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
|
|
if test_start_match:
|
|
# Add the suite name
|
|
suite_name = test_start_match.group("suite_name")
|
|
if suite_name not in self.detected_suite_names:
|
|
self.detected_suite_names.append(suite_name)
|
|
|
|
# Generate the internal name of the test
|
|
name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
|
|
|
|
# Assert that we don't already have a running test
|
|
assert (
|
|
self.tc is None
|
|
), f"gTest error, {self.tc} didn't finish"
|
|
|
|
# Check that the instance doesn't exist yet (prevents re-running)
|
|
tc = self.instance.get_case_by_name(name)
|
|
assert tc is None, f"gTest error, {tc} running twice"
|
|
|
|
# Create the test instance and set the context
|
|
tc = self.instance.get_case_or_create(name)
|
|
self.tc = tc
|
|
self.tc.status = TwisterStatus.STARTED
|
|
self.testcase_output += line + "\n"
|
|
self._match = True
|
|
|
|
# Check if the test run finished
|
|
finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
|
|
if finished_match:
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
if self.has_failures or self.tc is not None:
|
|
self.status = TwisterStatus.FAIL
|
|
tc.status = TwisterStatus.FAIL
|
|
else:
|
|
self.status = TwisterStatus.PASS
|
|
tc.status = TwisterStatus.PASS
|
|
return
|
|
|
|
# Check if the individual test finished
|
|
state, name = self._check_result(non_ansi_line)
|
|
if state == TwisterStatus.NONE or name is None:
|
|
# Nothing finished, keep processing lines
|
|
return
|
|
|
|
# Get the matching test and make sure it's the same as the current context
|
|
tc = self.instance.get_case_by_name(name)
|
|
assert (
|
|
tc is not None and tc == self.tc
|
|
), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}"
|
|
|
|
# Test finished, clear the context
|
|
self.tc = None
|
|
|
|
# Update the status of the test
|
|
tc.status = state
|
|
if tc.status == TwisterStatus.FAIL:
|
|
self.has_failures = True
|
|
tc.output = self.testcase_output
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
|
|
def _check_result(self, line):
|
|
test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
|
|
if test_pass_match:
|
|
return TwisterStatus.PASS, \
|
|
"{}.{}.{}".format(
|
|
self.id, test_pass_match.group("suite_name"),
|
|
test_pass_match.group("test_name")
|
|
)
|
|
test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
|
|
if test_skip_match:
|
|
return TwisterStatus.SKIP, \
|
|
"{}.{}.{}".format(
|
|
self.id, test_skip_match.group("suite_name"),
|
|
test_skip_match.group("test_name")
|
|
)
|
|
test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
|
|
if test_fail_match:
|
|
return TwisterStatus.FAIL, \
|
|
"{}.{}.{}".format(
|
|
self.id, test_fail_match.group("suite_name"),
|
|
test_fail_match.group("test_name")
|
|
)
|
|
return None, None
|
|
|
|
|
|
class Test(Harness):
|
|
__test__ = False # for pytest to skip this class when collects tests
|
|
|
|
test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)")
|
|
test_suite_end_pattern = re.compile(
|
|
r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)"
|
|
)
|
|
test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)")
|
|
test_case_end_pattern = re.compile(
|
|
r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds"
|
|
)
|
|
test_suite_summary_pattern = re.compile(
|
|
r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:"
|
|
r" .* duration = (\d*[.,]?\d*) seconds"
|
|
)
|
|
test_case_summary_pattern = re.compile(
|
|
r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds"
|
|
)
|
|
|
|
|
|
def get_testcase(self, tc_name, phase, ts_name=None):
|
|
""" Search a Ztest case among detected in the test image binary
|
|
expecting the same test names as already known from the ELF.
|
|
Track suites and cases unexpectedly found in the log.
|
|
"""
|
|
ts_names = self.started_suites.keys()
|
|
if ts_name:
|
|
if ts_name not in self.instance.testsuite.ztest_suite_names:
|
|
logger.warning(f"On {phase}: unexpected Ztest suite '{ts_name}' "
|
|
f"not present among: {self.instance.testsuite.ztest_suite_names}")
|
|
if ts_name not in self.detected_suite_names:
|
|
if self.trace:
|
|
logger.debug(f"On {phase}: detected new Ztest suite '{ts_name}'")
|
|
self.detected_suite_names.append(ts_name)
|
|
ts_names = [ ts_name ] if ts_name in ts_names else []
|
|
|
|
# First, try to match the test case ID to the first running Ztest suite with this test name.
|
|
for ts_name_ in ts_names:
|
|
if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1):
|
|
continue
|
|
tc_fq_id = f"{self.id}.{ts_name_}.{tc_name}"
|
|
if tc := self.instance.get_case_by_name(tc_fq_id):
|
|
if self.trace:
|
|
logger.debug(f"On {phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}")
|
|
return tc
|
|
logger.debug(
|
|
f"On {phase}: Ztest case '{tc_name}' is not known"
|
|
f" in {self.started_suites} running suite(s)."
|
|
)
|
|
tc_id = f"{self.id}.{tc_name}"
|
|
return self.instance.get_case_or_create(tc_id)
|
|
|
|
def start_suite(self, suite_name):
|
|
if suite_name not in self.detected_suite_names:
|
|
self.detected_suite_names.append(suite_name)
|
|
if suite_name not in self.instance.testsuite.ztest_suite_names:
|
|
logger.warning(f"Unexpected Ztest suite '{suite_name}'")
|
|
if suite_name in self.started_suites:
|
|
if self.started_suites[suite_name]['count'] > 0:
|
|
logger.warning(f"Already STARTED '{suite_name}':{self.started_suites[suite_name]}")
|
|
elif self.trace:
|
|
logger.debug(f"START suite '{suite_name}'")
|
|
self.started_suites[suite_name]['count'] += 1
|
|
self.started_suites[suite_name]['repeat'] += 1
|
|
else:
|
|
self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 }
|
|
|
|
def end_suite(self, suite_name, phase='', suite_status=None):
|
|
if suite_name in self.started_suites:
|
|
if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0:
|
|
return
|
|
if self.started_suites[suite_name]['count'] < 1:
|
|
logger.error(
|
|
f"Already ENDED {phase} suite '{suite_name}':{self.started_suites[suite_name]}"
|
|
)
|
|
elif self.trace:
|
|
logger.debug(f"END {phase} suite '{suite_name}':{self.started_suites[suite_name]}")
|
|
self.started_suites[suite_name]['count'] -= 1
|
|
elif suite_status == 'SKIP':
|
|
self.start_suite(suite_name) # register skipped suites at their summary end
|
|
self.started_suites[suite_name]['count'] -= 1
|
|
else:
|
|
logger.warning(f"END {phase} suite '{suite_name}' without START detected")
|
|
|
|
def start_case(self, tc_name):
|
|
if tc_name in self.started_cases:
|
|
if self.started_cases[tc_name]['count'] > 0:
|
|
logger.warning(f"Already STARTED '{tc_name}':{self.started_cases[tc_name]}")
|
|
self.started_cases[tc_name]['count'] += 1
|
|
else:
|
|
self.started_cases[tc_name] = { 'count': 1 }
|
|
|
|
def end_case(self, tc_name, phase=''):
|
|
if tc_name in self.started_cases:
|
|
if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0:
|
|
return
|
|
if self.started_cases[tc_name]['count'] < 1:
|
|
logger.error(
|
|
f"Already ENDED {phase} case '{tc_name}':{self.started_cases[tc_name]}"
|
|
)
|
|
elif self.trace:
|
|
logger.debug(f"END {phase} case '{tc_name}':{self.started_cases[tc_name]}")
|
|
self.started_cases[tc_name]['count'] -= 1
|
|
elif phase != 'TS_SUM':
|
|
logger.warning(f"END {phase} case '{tc_name}' without START detected")
|
|
|
|
|
|
def handle(self, line):
|
|
testcase_match = None
|
|
if self._match:
|
|
self.testcase_output += line + "\n"
|
|
|
|
if test_suite_start_match := re.search(self.test_suite_start_pattern, line):
|
|
self.start_suite(test_suite_start_match.group("suite_name"))
|
|
elif test_suite_end_match := re.search(self.test_suite_end_pattern, line):
|
|
suite_name=test_suite_end_match.group("suite_name")
|
|
self.end_suite(suite_name, 'TS_END')
|
|
elif testcase_match := re.search(self.test_case_start_pattern, line):
|
|
tc_name = testcase_match.group(2)
|
|
tc = self.get_testcase(tc_name, 'TC_START')
|
|
self.start_case(tc.name)
|
|
# Mark the test as started, if something happens here, it is mostly
|
|
# due to this tests, for example timeout. This should in this case
|
|
# be marked as failed and not blocked (not run).
|
|
tc.status = TwisterStatus.STARTED
|
|
if not self._match:
|
|
self.testcase_output += line + "\n"
|
|
self._match = True
|
|
# some testcases are skipped based on predicates and do not show up
|
|
# during test execution, however they are listed in the summary. Parse
|
|
# the summary for status and use that status instead.
|
|
elif result_match := self.test_case_end_pattern.match(line):
|
|
matched_status = result_match.group(1)
|
|
tc_name = result_match.group(3)
|
|
tc = self.get_testcase(tc_name, 'TC_END')
|
|
self.end_case(tc.name)
|
|
tc.status = TwisterStatus[matched_status]
|
|
if tc.status == TwisterStatus.SKIP:
|
|
tc.reason = "ztest skip"
|
|
tc.duration = float(result_match.group(4))
|
|
if tc.status == TwisterStatus.FAIL:
|
|
tc.output = self.testcase_output
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
self.ztest = True
|
|
elif test_suite_summary_match := self.test_suite_summary_pattern.match(line):
|
|
suite_name=test_suite_summary_match.group("suite_name")
|
|
suite_status=test_suite_summary_match.group("suite_status")
|
|
self._match = False
|
|
self.ztest = True
|
|
self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status)
|
|
elif test_case_summary_match := self.test_case_summary_pattern.match(line):
|
|
matched_status = test_case_summary_match.group(1)
|
|
suite_name = test_case_summary_match.group(2)
|
|
tc_name = test_case_summary_match.group(4)
|
|
tc = self.get_testcase(tc_name, 'TS_SUM', suite_name)
|
|
self.end_case(tc.name, 'TS_SUM')
|
|
tc.status = TwisterStatus[matched_status]
|
|
if tc.status == TwisterStatus.SKIP:
|
|
tc.reason = "ztest skip"
|
|
tc.duration = float(test_case_summary_match.group(5))
|
|
if tc.status == TwisterStatus.FAIL:
|
|
tc.output = self.testcase_output
|
|
self.testcase_output = ""
|
|
self._match = False
|
|
self.ztest = True
|
|
|
|
self.process_test(line)
|
|
|
|
if not self.ztest and self.status != TwisterStatus.NONE:
|
|
logger.debug(f"not a ztest and no state for {self.id}")
|
|
tc = self.instance.get_case_or_create(self.id)
|
|
if self.status == TwisterStatus.PASS:
|
|
tc.status = TwisterStatus.PASS
|
|
else:
|
|
tc.status = TwisterStatus.FAIL
|
|
tc.reason = "Test failure"
|
|
|
|
|
|
class Ztest(Test):
|
|
pass
|
|
|
|
|
|
class Bsim(Harness):
|
|
|
|
def build(self):
|
|
"""
|
|
Copying the application executable to BabbleSim's bin directory enables
|
|
running multidevice bsim tests after twister has built them.
|
|
"""
|
|
|
|
if self.instance is None:
|
|
return
|
|
|
|
original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
|
|
if not os.path.exists(original_exe_path):
|
|
logger.warning('Cannot copy bsim exe - cannot find original executable.')
|
|
return
|
|
|
|
bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
|
|
if not bsim_out_path:
|
|
logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
|
|
return
|
|
|
|
new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
|
|
if new_exe_name:
|
|
new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
|
|
else:
|
|
new_exe_name = self.instance.name
|
|
new_exe_name = f'bs_{new_exe_name}'
|
|
|
|
new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
|
|
|
|
new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
|
|
logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
|
|
shutil.copy(original_exe_path, new_exe_path)
|
|
|
|
|
|
class HarnessImporter:
|
|
|
|
@staticmethod
|
|
def get_harness(harness_name):
|
|
thismodule = sys.modules[__name__]
|
|
try:
|
|
if harness_name:
|
|
harness_class = getattr(thismodule, harness_name)
|
|
else:
|
|
harness_class = thismodule.Test
|
|
return harness_class()
|
|
except AttributeError as e:
|
|
logger.debug(f"harness {harness_name} not implemented: {e}")
|
|
return None
|