Status errors previously logged an error, but didn't fail the running test. This commit changes that and introduces a new StatusAttributeError to use there. One test is modified so it follows proper status form. One test for the new error has been added. Status errors now will properly mark the Instance as ERROR and not run TestCases as SKIP. This necessitated some code layout changes in runner.py Signed-off-by: Lukasz Mrugala <lukaszx.mrugala@intel.com>
386 lines
14 KiB
Python
386 lines
14 KiB
Python
# vim: set syntax=python ts=4 :
|
|
#
|
|
# Copyright (c) 2018-2022 Intel Corporation
|
|
# Copyright 2022 NXP
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
from __future__ import annotations
|
|
from enum import Enum
|
|
import os
|
|
import hashlib
|
|
import random
|
|
import logging
|
|
import shutil
|
|
import glob
|
|
import csv
|
|
|
|
from twisterlib.testsuite import TestCase, TestSuite
|
|
from twisterlib.platform import Platform
|
|
from twisterlib.error import BuildError, StatusAttributeError
|
|
from twisterlib.size_calc import SizeCalculator
|
|
from twisterlib.statuses import TwisterStatus
|
|
from twisterlib.handlers import (
|
|
Handler,
|
|
SimulationHandler,
|
|
BinaryHandler,
|
|
QEMUHandler,
|
|
QEMUWinHandler,
|
|
DeviceHandler,
|
|
SUPPORTED_SIMS,
|
|
SUPPORTED_SIMS_IN_PYTEST,
|
|
)
|
|
|
|
logger = logging.getLogger('twister')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
class TestInstance:
|
|
"""Class representing the execution of a particular TestSuite on a platform
|
|
|
|
@param test The TestSuite object we want to build/execute
|
|
@param platform Platform object that we want to build and run against
|
|
@param base_outdir Base directory for all test results. The actual
|
|
out directory used is <outdir>/<platform>/<test case name>
|
|
"""
|
|
|
|
__test__ = False
|
|
|
|
def __init__(self, testsuite, platform, outdir):
|
|
|
|
self.testsuite: TestSuite = testsuite
|
|
self.platform: Platform = platform
|
|
|
|
self._status = TwisterStatus.NONE
|
|
self.reason = "Unknown"
|
|
self.metrics = dict()
|
|
self.handler = None
|
|
self.recording = None
|
|
self.outdir = outdir
|
|
self.execution_time = 0
|
|
self.build_time = 0
|
|
self.retries = 0
|
|
|
|
self.name = os.path.join(platform.name, testsuite.name)
|
|
self.dut = None
|
|
|
|
if testsuite.detailed_test_id:
|
|
self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name)
|
|
else:
|
|
# if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure
|
|
source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
|
|
self.build_dir = os.path.join(outdir, platform.normalized_name, source_dir_rel, testsuite.name)
|
|
self.run_id = self._get_run_id()
|
|
self.domains = None
|
|
# Instance need to use sysbuild if a given suite or a platform requires it
|
|
self.sysbuild = testsuite.sysbuild or platform.sysbuild
|
|
|
|
self.run = False
|
|
self.testcases: list[TestCase] = []
|
|
self.init_cases()
|
|
self.filters = []
|
|
self.filter_type = None
|
|
|
|
def record(self, recording, fname_csv="recording.csv"):
|
|
if recording:
|
|
if self.recording is None:
|
|
self.recording = recording.copy()
|
|
else:
|
|
self.recording.extend(recording)
|
|
|
|
filename = os.path.join(self.build_dir, fname_csv)
|
|
with open(filename, "wt") as csvfile:
|
|
cw = csv.DictWriter(csvfile,
|
|
fieldnames = self.recording[0].keys(),
|
|
lineterminator = os.linesep,
|
|
quoting = csv.QUOTE_NONNUMERIC)
|
|
cw.writeheader()
|
|
cw.writerows(self.recording)
|
|
|
|
@property
|
|
def status(self) -> TwisterStatus:
|
|
return self._status
|
|
|
|
@status.setter
|
|
def status(self, value : TwisterStatus) -> None:
|
|
# Check for illegal assignments by value
|
|
try:
|
|
key = value.name if isinstance(value, Enum) else value
|
|
self._status = TwisterStatus[key]
|
|
except KeyError:
|
|
raise StatusAttributeError(self.__class__, value)
|
|
|
|
def add_filter(self, reason, filter_type):
|
|
self.filters.append({'type': filter_type, 'reason': reason })
|
|
self.status = TwisterStatus.FILTER
|
|
self.reason = reason
|
|
self.filter_type = filter_type
|
|
|
|
# Fix an issue with copying objects from testsuite, need better solution.
|
|
def init_cases(self):
|
|
for c in self.testsuite.testcases:
|
|
self.add_testcase(c.name, freeform=c.freeform)
|
|
|
|
def _get_run_id(self):
|
|
""" generate run id from instance unique identifier and a random
|
|
number
|
|
If exist, get cached run id from previous run."""
|
|
run_id = ""
|
|
run_id_file = os.path.join(self.build_dir, "run_id.txt")
|
|
if os.path.exists(run_id_file):
|
|
with open(run_id_file, "r") as fp:
|
|
run_id = fp.read()
|
|
else:
|
|
hash_object = hashlib.md5(self.name.encode())
|
|
random_str = f"{random.getrandbits(64)}".encode()
|
|
hash_object.update(random_str)
|
|
run_id = hash_object.hexdigest()
|
|
os.makedirs(self.build_dir, exist_ok=True)
|
|
with open(run_id_file, 'w+') as fp:
|
|
fp.write(run_id)
|
|
return run_id
|
|
|
|
def add_missing_case_status(self, status, reason=None):
|
|
for case in self.testcases:
|
|
if case.status == TwisterStatus.STARTED:
|
|
case.status = TwisterStatus.FAIL
|
|
elif case.status == TwisterStatus.NONE:
|
|
case.status = status
|
|
if reason:
|
|
case.reason = reason
|
|
else:
|
|
case.reason = self.reason
|
|
|
|
def __getstate__(self):
|
|
d = self.__dict__.copy()
|
|
return d
|
|
|
|
def __setstate__(self, d):
|
|
self.__dict__.update(d)
|
|
|
|
def __lt__(self, other):
|
|
return self.name < other.name
|
|
|
|
def set_case_status_by_name(self, name, status, reason=None):
|
|
tc = self.get_case_or_create(name)
|
|
tc.status = status
|
|
if reason:
|
|
tc.reason = reason
|
|
return tc
|
|
|
|
def add_testcase(self, name, freeform=False):
|
|
tc = TestCase(name=name)
|
|
tc.freeform = freeform
|
|
self.testcases.append(tc)
|
|
return tc
|
|
|
|
def get_case_by_name(self, name):
|
|
for c in self.testcases:
|
|
if c.name == name:
|
|
return c
|
|
return None
|
|
|
|
def get_case_or_create(self, name):
|
|
for c in self.testcases:
|
|
if c.name == name:
|
|
return c
|
|
|
|
logger.debug(f"Could not find a matching testcase for {name}")
|
|
tc = TestCase(name=name)
|
|
self.testcases.append(tc)
|
|
return tc
|
|
|
|
@staticmethod
|
|
def testsuite_runnable(testsuite, fixtures):
|
|
can_run = False
|
|
# console harness allows us to run the test and capture data.
|
|
if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']:
|
|
can_run = True
|
|
# if we have a fixture that is also being supplied on the
|
|
# command-line, then we need to run the test, not just build it.
|
|
fixture = testsuite.harness_config.get('fixture')
|
|
if fixture:
|
|
can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
|
|
|
|
return can_run
|
|
|
|
def setup_handler(self, env):
|
|
if self.handler:
|
|
return
|
|
|
|
options = env.options
|
|
handler = Handler(self, "")
|
|
if options.device_testing:
|
|
handler = DeviceHandler(self, "device")
|
|
handler.call_make_run = False
|
|
handler.ready = True
|
|
elif self.platform.simulation != "na":
|
|
if self.platform.simulation == "qemu":
|
|
if os.name != "nt":
|
|
handler = QEMUHandler(self, "qemu")
|
|
else:
|
|
handler = QEMUWinHandler(self, "qemu")
|
|
handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
|
|
handler.ready = True
|
|
else:
|
|
handler = SimulationHandler(self, self.platform.simulation)
|
|
|
|
if self.platform.simulation_exec and shutil.which(self.platform.simulation_exec):
|
|
handler.ready = True
|
|
elif self.testsuite.type == "unit":
|
|
handler = BinaryHandler(self, "unit")
|
|
handler.binary = os.path.join(self.build_dir, "testbinary")
|
|
if options.enable_coverage:
|
|
handler.args.append("COVERAGE=1")
|
|
handler.call_make_run = False
|
|
handler.ready = True
|
|
|
|
if handler:
|
|
handler.options = options
|
|
handler.generator_cmd = env.generator_cmd
|
|
handler.suite_name_check = not options.disable_suite_name_check
|
|
self.handler = handler
|
|
|
|
# Global testsuite parameters
|
|
def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[], hardware_map=None):
|
|
|
|
if os.name == 'nt':
|
|
# running on simulators is currently supported only for QEMU on Windows
|
|
if self.platform.simulation not in ('na', 'qemu'):
|
|
return False
|
|
|
|
# check presence of QEMU on Windows
|
|
if self.platform.simulation == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
|
|
return False
|
|
|
|
# we asked for build-only on the command line
|
|
if self.testsuite.build_only:
|
|
return False
|
|
|
|
# Do not run slow tests:
|
|
skip_slow = self.testsuite.slow and not enable_slow
|
|
if skip_slow:
|
|
return False
|
|
|
|
target_ready = bool(self.testsuite.type == "unit" or \
|
|
self.platform.type == "native" or \
|
|
(self.platform.simulation in SUPPORTED_SIMS and \
|
|
self.platform.simulation not in self.testsuite.simulation_exclude) or \
|
|
filter == 'runnable')
|
|
|
|
# check if test is runnable in pytest
|
|
if self.testsuite.harness == 'pytest':
|
|
target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST)
|
|
|
|
SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native']
|
|
if filter != 'runnable' and \
|
|
self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \
|
|
self.platform.simulation_exec:
|
|
if not shutil.which(self.platform.simulation_exec):
|
|
target_ready = False
|
|
|
|
testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
|
|
|
|
if hardware_map:
|
|
for h in hardware_map.duts:
|
|
if (h.platform == self.platform.name and
|
|
self.testsuite_runnable(self.testsuite, h.fixtures)):
|
|
testsuite_runnable = True
|
|
break
|
|
|
|
return testsuite_runnable and target_ready
|
|
|
|
def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
|
|
# Create this in a "twister/" subdirectory otherwise this
|
|
# will pass this overlay to kconfig.py *twice* and kconfig.cmake
|
|
# will silently give that second time precedence over any
|
|
# --extra-args=CONFIG_*
|
|
subdir = os.path.join(self.build_dir, "twister")
|
|
|
|
content = ""
|
|
|
|
if self.testsuite.extra_configs:
|
|
new_config_list = []
|
|
# some configs might be conditional on arch or platform, see if we
|
|
# have a namespace defined and apply only if the namespace matches.
|
|
# we currently support both arch: and platform:
|
|
for config in self.testsuite.extra_configs:
|
|
cond_config = config.split(":")
|
|
if cond_config[0] == "arch" and len(cond_config) == 3:
|
|
if self.platform.arch == cond_config[1]:
|
|
new_config_list.append(cond_config[2])
|
|
elif cond_config[0] == "platform" and len(cond_config) == 3:
|
|
if self.platform.name == cond_config[1]:
|
|
new_config_list.append(cond_config[2])
|
|
else:
|
|
new_config_list.append(config)
|
|
|
|
content = "\n".join(new_config_list)
|
|
|
|
if enable_coverage:
|
|
if platform.name in coverage_platform:
|
|
content = content + "\nCONFIG_COVERAGE=y"
|
|
content = content + "\nCONFIG_COVERAGE_DUMP=y"
|
|
|
|
if enable_asan:
|
|
if platform.type == "native":
|
|
content = content + "\nCONFIG_ASAN=y"
|
|
|
|
if enable_ubsan:
|
|
if platform.type == "native":
|
|
content = content + "\nCONFIG_UBSAN=y"
|
|
|
|
if content:
|
|
os.makedirs(subdir, exist_ok=True)
|
|
file = os.path.join(subdir, "testsuite_extra.conf")
|
|
with open(file, "w", encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return content
|
|
|
|
def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator:
|
|
"""Get the RAM/ROM sizes of a test case.
|
|
|
|
This can only be run after the instance has been executed by
|
|
MakeGenerator, otherwise there won't be any binaries to measure.
|
|
|
|
@return A SizeCalculator object
|
|
"""
|
|
elf_filepath = self.get_elf_file()
|
|
buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
|
|
return SizeCalculator(elf_filename=elf_filepath,
|
|
extra_sections=self.testsuite.extra_sections,
|
|
buildlog_filepath=buildlog_filepath,
|
|
generate_warning=generate_warning)
|
|
|
|
def get_elf_file(self) -> str:
|
|
|
|
if self.sysbuild:
|
|
build_dir = self.domains.get_default_domain().build_dir
|
|
else:
|
|
build_dir = self.build_dir
|
|
|
|
fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
|
|
fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
|
|
blocklist = [
|
|
'remapped', # used for xtensa plaforms
|
|
'zefi', # EFI for Zephyr
|
|
'qemu', # elf files generated after running in qemu
|
|
'_pre']
|
|
fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
|
|
if not fns:
|
|
raise BuildError("Missing output binary")
|
|
elif len(fns) > 1:
|
|
logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
|
|
return fns[0]
|
|
|
|
def get_buildlog_file(self) -> str:
|
|
"""Get path to build.log file.
|
|
|
|
@raises BuildError: Incorrect amount (!=1) of build logs.
|
|
@return: Path to build.log (str).
|
|
"""
|
|
buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
|
|
if len(buildlog_paths) != 1:
|
|
raise BuildError("Missing/multiple build.log file.")
|
|
return buildlog_paths[0]
|
|
|
|
def __repr__(self):
|
|
return "<TestSuite %s on %s>" % (self.testsuite.name, self.platform.name)
|