From 89d4aa85592dac196e25a157566cd6f67877dd1d Mon Sep 17 00:00:00 2001 From: Lukasz Mrugala Date: Wed, 27 Nov 2024 17:50:07 +0000 Subject: [PATCH] scripts: Fix twisterlib for ruff - E501 This fixes ruff linting error E501, indicating overlong lines. Signed-off-by: Lukasz Mrugala --- .ruff-excludes.toml | 43 ---- .../pylib/twister/twisterlib/cmakecache.py | 5 +- scripts/pylib/twister/twisterlib/coverage.py | 17 +- .../pylib/twister/twisterlib/environment.py | 62 ++++-- scripts/pylib/twister/twisterlib/handlers.py | 94 ++++++-- .../pylib/twister/twisterlib/hardwaremap.py | 29 ++- scripts/pylib/twister/twisterlib/harness.py | 56 +++-- scripts/pylib/twister/twisterlib/platform.py | 12 +- .../pylib/twister/twisterlib/quarantine.py | 12 +- scripts/pylib/twister/twisterlib/reports.py | 188 ++++++++++++---- scripts/pylib/twister/twisterlib/runner.py | 203 +++++++++++++----- scripts/pylib/twister/twisterlib/size_calc.py | 38 +++- .../pylib/twister/twisterlib/testinstance.py | 29 ++- scripts/pylib/twister/twisterlib/testplan.py | 201 +++++++++++++---- scripts/pylib/twister/twisterlib/testsuite.py | 48 +++-- .../pylib/twister/twisterlib/twister_main.py | 10 +- 16 files changed, 778 insertions(+), 269 deletions(-) diff --git a/.ruff-excludes.toml b/.ruff-excludes.toml index e7ed7bb5520..651170422c4 100644 --- a/.ruff-excludes.toml +++ b/.ruff-excludes.toml @@ -756,53 +756,10 @@ "UP015", # https://docs.astral.sh/ruff/rules/redundant-open-modes "UP031", # https://docs.astral.sh/ruff/rules/printf-string-formatting ] -"./scripts/pylib/twister/twisterlib/cmakecache.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/coverage.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/environment.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/handlers.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/hardwaremap.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/harness.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/platform.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/quarantine.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/reports.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/runner.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/size_calc.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/testinstance.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] "./scripts/pylib/twister/twisterlib/testplan.py" = [ "E402", # https://docs.astral.sh/ruff/rules/module-import-not-at-top-of-file - "E501", # https://docs.astral.sh/ruff/rules/line-too-long "F401", # https://docs.astral.sh/ruff/rules/unused-import ] -"./scripts/pylib/twister/twisterlib/testsuite.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] -"./scripts/pylib/twister/twisterlib/twister_main.py" = [ - "E501", # https://docs.astral.sh/ruff/rules/line-too-long -] "./scripts/pylint/checkers/argparse-checker.py" = [ "F821", # https://docs.astral.sh/ruff/rules/undefined-name "I001", # https://docs.astral.sh/ruff/rules/unsorted-imports diff --git a/scripts/pylib/twister/twisterlib/cmakecache.py b/scripts/pylib/twister/twisterlib/cmakecache.py index 4a35006c85a..16e18669675 100644 --- a/scripts/pylib/twister/twisterlib/cmakecache.py +++ b/scripts/pylib/twister/twisterlib/cmakecache.py @@ -52,7 +52,10 @@ class CMakeCacheEntry: val = val.upper() if val in ('ON', 'YES', 'TRUE', 'Y'): return 1 - elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', '') or val.endswith('-NOTFOUND'): + elif ( + val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', '') + or val.endswith('-NOTFOUND') + ): return 0 else: try: diff --git a/scripts/pylib/twister/twisterlib/coverage.py b/scripts/pylib/twister/twisterlib/coverage.py index 48a8b729853..dd647c60ddc 100644 --- a/scripts/pylib/twister/twisterlib/coverage.py +++ b/scripts/pylib/twister/twisterlib/coverage.py @@ -388,13 +388,18 @@ class Gcovr(CoverageTool): "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"], "csv": ["--csv", os.path.join(subdir, "coverage.csv")], "txt": ["--txt", os.path.join(subdir, "coverage.txt")], - "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), "--coveralls-pretty"], + "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), + "--coveralls-pretty"], "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")] } - gcovr_options = self._flatten_list([report_options[r] for r in self.output_formats.split(',')]) + gcovr_options = self._flatten_list( + [report_options[r] for r in self.output_formats.split(',')] + ) - return subprocess.call(["gcovr", "-r", self.base_dir] + mode_options + gcovr_options + tracefiles, - stdout=coveragelog) + return subprocess.call( + ["gcovr", "-r", self.base_dir] \ + + mode_options + gcovr_options + tracefiles, stdout=coveragelog + ) @@ -427,7 +432,9 @@ def run_coverage(testplan, options): elif os.path.exists(zephyr_sdk_gcov_tool): gcov_tool = zephyr_sdk_gcov_tool else: - logger.error("Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR.") + logger.error( + "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR." + ) sys.exit(1) else: gcov_tool = str(options.gcov_tool) diff --git a/scripts/pylib/twister/twisterlib/environment.py b/scripts/pylib/twister/twisterlib/environment.py index c1c6cb0a932..b161096d562 100644 --- a/scripts/pylib/twister/twisterlib/environment.py +++ b/scripts/pylib/twister/twisterlib/environment.py @@ -119,15 +119,18 @@ Artificially long but functional example: "--save-tests", metavar="FILENAME", action="store", - help="Write a list of tests and platforms to be run to %(metavar)s file and stop execution. " - "The resulting file will have the same content as 'testplan.json'.") + help="Write a list of tests and platforms to be run to %(metavar)s file and stop execution." + " The resulting file will have the same content as 'testplan.json'." + ) case_select.add_argument( "-F", "--load-tests", metavar="FILENAME", action="store", - help="Load a list of tests and platforms to be run from a JSON file ('testplan.json' schema).") + help="Load a list of tests and platforms to be run" + "from a JSON file ('testplan.json' schema)." + ) case_select.add_argument( "-T", "--testsuite-root", action="append", default=[], type = norm_path, @@ -219,8 +222,12 @@ Artificially long but functional example: """) test_or_build.add_argument( - "-b", "--build-only", action="store_true", default="--prep-artifacts-for-testing" in sys.argv, - help="Only build the code, do not attempt to run the code on targets.") + "-b", + "--build-only", + action="store_true", + default="--prep-artifacts-for-testing" in sys.argv, + help="Only build the code, do not attempt to run the code on targets." + ) test_or_build.add_argument( "--prep-artifacts-for-testing", action="store_true", @@ -353,15 +360,23 @@ structure in the main Zephyr tree: boards///""") parser.add_argument("--coverage-tool", choices=['lcov', 'gcovr'], default='gcovr', help="Tool to use to generate coverage report.") - parser.add_argument("--coverage-formats", action="store", default=None, # default behavior is set in run_coverage - help="Output formats to use for generated coverage reports, as a comma-separated list. " + - "Valid options for 'gcovr' tool are: " + - ','.join(supported_coverage_formats['gcovr']) + " (html - default)." + - " Valid options for 'lcov' tool are: " + - ','.join(supported_coverage_formats['lcov']) + " (html,lcov - default).") + parser.add_argument( + "--coverage-formats", + action="store", + default=None, # default behavior is set in run_coverage + help="Output formats to use for generated coverage reports, as a comma-separated list. " + + "Valid options for 'gcovr' tool are: " + + ','.join(supported_coverage_formats['gcovr']) + " (html - default)." + + " Valid options for 'lcov' tool are: " + + ','.join(supported_coverage_formats['lcov']) + " (html,lcov - default)." + ) - parser.add_argument("--test-config", action="store", default=os.path.join(ZEPHYR_BASE, "tests", "test_config.yaml"), - help="Path to file with plans and test configurations.") + parser.add_argument( + "--test-config", + action="store", + default=os.path.join(ZEPHYR_BASE, "tests", "test_config.yaml"), + help="Path to file with plans and test configurations." + ) parser.add_argument("--level", action="store", help="Test level to be used. By default, no levels are used for filtering" @@ -814,7 +829,12 @@ structure in the main Zephyr tree: boards///""") return parser -def parse_arguments(parser: argparse.ArgumentParser, args, options = None, on_init=True) -> argparse.Namespace: +def parse_arguments( + parser: argparse.ArgumentParser, + args, + options = None, + on_init=True +) -> argparse.Namespace: if options is None: options = parser.parse_args(args) @@ -875,11 +895,19 @@ def parse_arguments(parser: argparse.ArgumentParser, args, options = None, on_in logger.error("valgrind enabled but valgrind executable not found") sys.exit(1) - if (not options.device_testing) and (options.device_serial or options.device_serial_pty or options.hardware_map): - logger.error("Use --device-testing with --device-serial, or --device-serial-pty, or --hardware-map.") + if ( + (not options.device_testing) + and (options.device_serial or options.device_serial_pty or options.hardware_map) + ): + logger.error( + "Use --device-testing with --device-serial, or --device-serial-pty, or --hardware-map." + ) sys.exit(1) - if options.device_testing and (options.device_serial or options.device_serial_pty) and len(options.platform) != 1: + if ( + options.device_testing + and (options.device_serial or options.device_serial_pty) and len(options.platform) != 1 + ): logger.error("When --device-testing is used with --device-serial " "or --device-serial-pty, exactly one platform must " "be specified") diff --git a/scripts/pylib/twister/twisterlib/handlers.py b/scripts/pylib/twister/twisterlib/handlers.py index e9b4a9865c2..c67ec07349a 100755 --- a/scripts/pylib/twister/twisterlib/handlers.py +++ b/scripts/pylib/twister/twisterlib/handlers.py @@ -125,7 +125,7 @@ class Handler: set(_d_suite) != set(expected_suite_names) and not set(_d_suite).issubset(set(expected_suite_names)) ): - self._missing_suite_name(expected_suite_names, handler_time) + self._missing_suite_name(expected_suite_names, handler_time) def _missing_suite_name(self, expected_suite_names, handler_time): """ @@ -147,7 +147,11 @@ class Handler: # only for Ztest tests: harness_class_name = type(harness).__name__ if self.suite_name_check and harness_class_name == "Test": - self._verify_ztest_suite_name(harness.status, harness.detected_suite_names, handler_time) + self._verify_ztest_suite_name( + harness.status, + harness.detected_suite_names, + handler_time + ) if self.instance.status == TwisterStatus.FAIL: return if not harness.matched_run_id and harness.run_id_exists: @@ -174,8 +178,14 @@ class Handler: class BinaryHandler(Handler): - def __init__(self, instance, type_str: str, options: argparse.Namespace, generator_cmd: str | None = None, - suite_name_check: bool = True): + def __init__( + self, + instance, + type_str: str, + options: argparse.Namespace, + generator_cmd: str | None = None, + suite_name_check: bool = True + ): """Constructor @param instance Test Instance @@ -344,8 +354,12 @@ class BinaryHandler(Handler): return stderr_log = f"{self.instance.build_dir}/handler_stderr.log" - with open(stderr_log, "w+") as stderr_log_fp, subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=stderr_log_fp, cwd=self.build_dir, env=env) as proc: + with ( + open(stderr_log, "w+") as stderr_log_fp, + subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=stderr_log_fp, cwd=self.build_dir, env=env + ) as proc, + ): logger.debug(f"Spawning BinaryHandler Thread for {self.name}") t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True) t.start() @@ -373,8 +387,14 @@ class BinaryHandler(Handler): class SimulationHandler(BinaryHandler): - def __init__(self, instance, type_str: str, options: argparse.Namespace, generator_cmd: str | None = None, - suite_name_check: bool = True): + def __init__( + self, + instance, + type_str: str, + options: argparse.Namespace, + generator_cmd: str | None = None, + suite_name_check: bool = True, + ): """Constructor @param instance Test Instance @@ -484,7 +504,8 @@ class DeviceHandler(Handler): # Select an available DUT with less failures for d in sorted(duts_found, key=lambda _dut: _dut.failures): - duts_shared_hw = [_d for _d in self.duts if _d.id == d.id] # get all DUTs with the same id + # get all DUTs with the same id + duts_shared_hw = [_d for _d in self.duts if _d.id == d.id] with self.acquire_dut_locks(duts_shared_hw): avail = False if d.available: @@ -504,7 +525,8 @@ class DeviceHandler(Handler): dut.failures_increment() logger.debug(f"Release DUT:{dut.platform}, Id:{dut.id}, " f"counter:{dut.counter}, failures:{dut.failures}") - duts_shared_hw = [_d for _d in self.duts if _d.id == dut.id] # get all DUTs with the same id + # get all DUTs with the same id + duts_shared_hw = [_d for _d in self.duts if _d.id == dut.id] with self.acquire_dut_locks(duts_shared_hw): for _d in duts_shared_hw: _d.available = 1 @@ -548,7 +570,12 @@ class DeviceHandler(Handler): if runner in ("pyocd", "nrfjprog", "nrfutil"): command_extra_args.append("--dev-id") command_extra_args.append(board_id) - elif runner == "openocd" and product == "STM32 STLink" or runner == "openocd" and product == "STLINK-V3": + elif ( + runner == "openocd" + and product == "STM32 STLink" + or runner == "openocd" + and product == "STLINK-V3" + ): command_extra_args.append("--cmd-pre-init") command_extra_args.append(f"hla_serial {board_id}") elif runner == "openocd" and product == "EDBG CMSIS-DAP": @@ -794,7 +821,9 @@ class DeviceHandler(Handler): t.join(0.1) if t.is_alive(): - logger.debug(f"Timed out while monitoring serial output on {self.instance.platform.name}") + logger.debug( + f"Timed out while monitoring serial output on {self.instance.platform.name}" + ) if ser.isOpen(): ser.close() @@ -826,8 +855,14 @@ class QEMUHandler(Handler): for these to collect whether the test passed or failed. """ - def __init__(self, instance, type_str: str, options: argparse.Namespace, generator_cmd: str | None = None, - suite_name_check: bool = True): + def __init__( + self, + instance, + type_str: str, + options: argparse.Namespace, + generator_cmd: str | None = None, + suite_name_check: bool = True, + ): """Constructor @param instance Test instance @@ -1063,7 +1098,12 @@ class QEMUHandler(Handler): is_timeout = False qemu_pid = None - with subprocess.Popen(command, stdout=open(self.stdout_fn, "w"), stderr=open(self.stderr_fn, "w"), cwd=self.build_dir) as proc: + with subprocess.Popen( + command, + stdout=open(self.stdout_fn, "w"), + stderr=open(self.stderr_fn, "w"), + cwd=self.build_dir + ) as proc: logger.debug(f"Spawning QEMUHandler Thread for {self.name}") try: @@ -1116,8 +1156,14 @@ class QEMUWinHandler(Handler): for these to collect whether the test passed or failed. """ - def __init__(self, instance, type_str: str, options: argparse.Namespace, generator_cmd: str | None = None, - suite_name_check: bool = True): + def __init__( + self, + instance, + type_str: str, + options: argparse.Namespace, + generator_cmd: str | None = None, + suite_name_check: bool = True, + ): """Constructor @param instance Test instance @@ -1222,7 +1268,15 @@ class QEMUWinHandler(Handler): finally: queue.put(c) - def _monitor_output(self, queue, timeout, logfile, pid_fn, harness, ignore_unexpected_eof=False): + def _monitor_output( + self, + queue, + timeout, + logfile, + pid_fn, + harness, + ignore_unexpected_eof=False + ): start_time = time.time() timeout_time = start_time + timeout _status = TwisterStatus.NONE @@ -1318,7 +1372,9 @@ class QEMUWinHandler(Handler): self.stop_thread = True handler_time = time.time() - start_time - logger.debug(f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds") + logger.debug( + f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds" + ) self._monitor_update_instance_info(self, handler_time, _status, _reason) self._close_log_file(log_out_fp) self._stop_qemu_process(self.pid) diff --git a/scripts/pylib/twister/twisterlib/hardwaremap.py b/scripts/pylib/twister/twisterlib/hardwaremap.py index 637d1947144..1121301c577 100644 --- a/scripts/pylib/twister/twisterlib/hardwaremap.py +++ b/scripts/pylib/twister/twisterlib/hardwaremap.py @@ -234,10 +234,26 @@ class HardwareMap: print(tabulate(table, headers=header, tablefmt="github")) - def add_device(self, serial, platform, pre_script, is_pty, baud=None, flash_timeout=60, flash_with_test=False, flash_before=False): - device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud, - flash_timeout=flash_timeout, flash_with_test=flash_with_test, flash_before=flash_before - ) + def add_device( + self, + serial, + platform, + pre_script, + is_pty, + baud=None, + flash_timeout=60, + flash_with_test=False, + flash_before=False + ): + device = DUT( + platform=platform, + connected=True, + pre_script=pre_script, + serial_baud=baud, + flash_timeout=flash_timeout, + flash_with_test=flash_with_test, + flash_before=flash_before + ) if is_pty: device.serial_pty = serial else: @@ -330,7 +346,10 @@ class HardwareMap: serial_devices = list_ports.comports() logger.info("Scanning connected hardware...") for d in serial_devices: - if d.manufacturer and d.manufacturer.casefold() in [m.casefold() for m in self.manufacturer]: + if ( + d.manufacturer + and d.manufacturer.casefold() in [m.casefold() for m in self.manufacturer] + ): # TI XDS110 can have multiple serial devices for a single board # assume endpoint 0 is the serial, skip all others diff --git a/scripts/pylib/twister/twisterlib/harness.py b/scripts/pylib/twister/twisterlib/harness.py index a9a2134570e..f7d0e51fae4 100644 --- a/scripts/pylib/twister/twisterlib/harness.py +++ b/scripts/pylib/twister/twisterlib/harness.py @@ -129,7 +129,9 @@ class Harness: if self.record_pattern: match = self.record_pattern.search(line) if match: - rec = self.translate_record({ k:v.strip() for k,v in match.groupdict(default="").items() }) + rec = self.translate_record( + { k:v.strip() for k,v in match.groupdict(default="").items() } + ) self.recording.append(rec) return match # @@ -416,7 +418,9 @@ class Pytest(Harness): elif handler.type_str == 'build': command.append('--device-type=custom') else: - raise PytestHarnessException(f'Support for handler {handler.type_str} not implemented yet') + raise PytestHarnessException( + f'Support for handler {handler.type_str} not implemented yet' + ) if handler.type_str != 'device': for fixture in handler.options.fixture: @@ -522,12 +526,20 @@ class Pytest(Harness): env = os.environ.copy() if not PYTEST_PLUGIN_INSTALLED: cmd.extend(['-p', 'twister_harness.plugin']) - pytest_plugin_path = os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'pytest-twister-harness', 'src') + pytest_plugin_path = os.path.join( + ZEPHYR_BASE, + 'scripts', + 'pylib', + 'pytest-twister-harness', + 'src' + ) env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '') if _WINDOWS: cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && ' else: - cmd_append_python_path = f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && ' + cmd_append_python_path = ( + f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && ' + ) else: cmd_append_python_path = '' cmd_to_print = cmd_append_python_path + shlex.join(cmd) @@ -571,7 +583,9 @@ class Pytest(Harness): if (elem_ts := root.find('testsuite')) is not None: if elem_ts.get('failures') != '0': self.status = TwisterStatus.FAIL - self.instance.reason = f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed" + self.instance.reason = ( + f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed" + ) elif elem_ts.get('errors') != '0': self.status = TwisterStatus.ERROR self.instance.reason = 'Error during pytest execution' @@ -717,11 +731,20 @@ class Test(Harness): __test__ = False # for pytest to skip this class when collects tests test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P\S*)") - test_suite_end_pattern = re.compile(r"TESTSUITE (?P\S*)\s+(?Psucceeded|failed)") + test_suite_end_pattern = re.compile( + r"TESTSUITE (?P\S*)\s+(?Psucceeded|failed)" + ) test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)") - test_case_end_pattern = re.compile(r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds") - test_suite_summary_pattern = re.compile(r"SUITE (?P\S*) - .* \[(?P\S*)\]: .* duration = (\d*[.,]?\d*) seconds") - test_case_summary_pattern = re.compile(r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds") + test_case_end_pattern = re.compile( + r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds" + ) + test_suite_summary_pattern = re.compile( + r"SUITE (?P\S*) - .* \[(?P\S*)\]:" + r" .* duration = (\d*[.,]?\d*) seconds" + ) + test_case_summary_pattern = re.compile( + r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds" + ) def get_testcase(self, tc_name, phase, ts_name=None): @@ -740,7 +763,7 @@ class Test(Harness): self.detected_suite_names.append(ts_name) ts_names = [ ts_name ] if ts_name in ts_names else [] - # Firstly try to match the test case ID to the first running Ztest suite with this test name. + # First, try to match the test case ID to the first running Ztest suite with this test name. for ts_name_ in ts_names: if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1): continue @@ -749,7 +772,10 @@ class Test(Harness): if self.trace: logger.debug(f"On {phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}") return tc - logger.debug(f"On {phase}: Ztest case '{tc_name}' is not known in {self.started_suites} running suite(s).") + logger.debug( + f"On {phase}: Ztest case '{tc_name}' is not known" + f" in {self.started_suites} running suite(s)." + ) tc_id = f"{self.id}.{tc_name}" return self.instance.get_case_or_create(tc_id) @@ -773,7 +799,9 @@ class Test(Harness): if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0: return if self.started_suites[suite_name]['count'] < 1: - logger.error(f"Already ENDED {phase} suite '{suite_name}':{self.started_suites[suite_name]}") + logger.error( + f"Already ENDED {phase} suite '{suite_name}':{self.started_suites[suite_name]}" + ) elif self.trace: logger.debug(f"END {phase} suite '{suite_name}':{self.started_suites[suite_name]}") self.started_suites[suite_name]['count'] -= 1 @@ -796,7 +824,9 @@ class Test(Harness): if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0: return if self.started_cases[tc_name]['count'] < 1: - logger.error(f"Already ENDED {phase} case '{tc_name}':{self.started_cases[tc_name]}") + logger.error( + f"Already ENDED {phase} case '{tc_name}':{self.started_cases[tc_name]}" + ) elif self.trace: logger.debug(f"END {phase} case '{tc_name}':{self.started_cases[tc_name]}") self.started_cases[tc_name]['count'] -= 1 diff --git a/scripts/pylib/twister/twisterlib/platform.py b/scripts/pylib/twister/twisterlib/platform.py index 58ff4162445..325520d1513 100644 --- a/scripts/pylib/twister/twisterlib/platform.py +++ b/scripts/pylib/twister/twisterlib/platform.py @@ -45,8 +45,9 @@ class Platform: Maps directly to BOARD when building""" - platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE, - "scripts", "schemas", "twister", "platform-schema.yaml")) + platform_schema = scl.yaml_load( + os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "platform-schema.yaml") + ) def __init__(self): """Constructor. @@ -132,7 +133,12 @@ class Platform: self.tier = variant_data.get("tier", data.get("tier", self.tier)) self.type = variant_data.get('type', data.get('type', self.type)) - self.simulators = [Simulator(data) for data in variant_data.get('simulation', data.get('simulation', self.simulators))] + self.simulators = [ + Simulator(data) for data in variant_data.get( + 'simulation', + data.get('simulation', self.simulators) + ) + ] default_sim = self.simulator_by_name(None) if default_sim: self.simulation = default_sim.name diff --git a/scripts/pylib/twister/twisterlib/quarantine.py b/scripts/pylib/twister/twisterlib/quarantine.py index 88a13f4d7d0..c8295e5b6b3 100644 --- a/scripts/pylib/twister/twisterlib/quarantine.py +++ b/scripts/pylib/twister/twisterlib/quarantine.py @@ -125,11 +125,15 @@ class QuarantineData: if (qelem.platforms and (matched := _is_element_matched(platform, qelem.re_platforms)) is False): continue - if (qelem.architectures - and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False): + if ( + qelem.architectures + and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False + ): continue - if (qelem.simulations - and (matched := _is_element_matched(simulator_name, qelem.re_simulations)) is False): + if ( + qelem.simulations + and (matched := _is_element_matched(simulator_name, qelem.re_simulations)) is False + ): continue if matched: diff --git a/scripts/pylib/twister/twisterlib/reports.py b/scripts/pylib/twister/twisterlib/reports.py index 6519e97b0c3..c64ddfefef3 100644 --- a/scripts/pylib/twister/twisterlib/reports.py +++ b/scripts/pylib/twister/twisterlib/reports.py @@ -65,7 +65,19 @@ class Reporting: @staticmethod - def xunit_testcase(eleTestsuite, name, classname, status: TwisterStatus, ts_status: TwisterStatus, reason, duration, runnable, stats, log, build_only_as_skip): + def xunit_testcase( + eleTestsuite, + name, + classname, + status: TwisterStatus, + ts_status: TwisterStatus, + reason, + duration, + runnable, + stats, + log, + build_only_as_skip + ): fails, passes, errors, skips = stats if status in [TwisterStatus.SKIP, TwisterStatus.FILTER]: @@ -106,7 +118,12 @@ class Reporting: else: if status == TwisterStatus.NONE: logger.debug(f"{name}: No status") - ET.SubElement(eleTestcase, ReportStatus.SKIP, type="untested", message="No results captured, testsuite misconfiguration?") + ET.SubElement( + eleTestcase, + ReportStatus.SKIP, + type="untested", + message="No results captured, testsuite misconfiguration?" + ) else: logger.error(f"{name}: Unknown status '{status}'") @@ -129,7 +146,9 @@ class Reporting: suites_to_report = all_suites # do not create entry if everything is filtered out if not self.env.options.detailed_skipped_report: - suites_to_report = list(filter(lambda d: TwisterStatus(d.get('status')) != TwisterStatus.FILTER, all_suites)) + suites_to_report = list( + filter(lambda d: TwisterStatus(d.get('status')) != TwisterStatus.FILTER, all_suites) + ) for suite in suites_to_report: duration = 0 @@ -199,7 +218,9 @@ class Reporting: suites = list(filter(lambda d: d['platform'] == platform, all_suites)) # do not create entry if everything is filtered out if not self.env.options.detailed_skipped_report: - non_filtered = list(filter(lambda d: TwisterStatus(d.get('status')) != TwisterStatus.FILTER, suites)) + non_filtered = list( + filter(lambda d: TwisterStatus(d.get('status')) != TwisterStatus.FILTER, suites) + ) if not non_filtered: continue @@ -225,7 +246,10 @@ class Reporting: ts_status = TwisterStatus(ts.get('status')) # Do not report filtered testcases - if ts_status == TwisterStatus.FILTER and not self.env.options.detailed_skipped_report: + if ( + ts_status == TwisterStatus.FILTER + and not self.env.options.detailed_skipped_report + ): continue if full_report: for tc in ts.get("testcases", []): @@ -289,13 +313,17 @@ class Reporting: continue if (filters and 'allow_status' in filters and \ instance.status not in [TwisterStatus[s] for s in filters['allow_status']]): - logger.debug(f"Skip test suite '{instance.testsuite.name}' status '{instance.status}' " - f"not allowed for {filename}") + logger.debug( + f"Skip test suite '{instance.testsuite.name}'" + f" status '{instance.status}' not allowed for {filename}" + ) continue if (filters and 'deny_status' in filters and \ instance.status in [TwisterStatus[s] for s in filters['deny_status']]): - logger.debug(f"Skip test suite '{instance.testsuite.name}' status '{instance.status}' " - f"denied for {filename}") + logger.debug( + f"Skip test suite '{instance.testsuite.name}'" + f" status '{instance.status}' denied for {filename}" + ) continue suite = {} handler_log = os.path.join(instance.build_dir, "handler.log") @@ -377,7 +405,11 @@ class Reporting: # if we discover those at runtime, the fallback testcase wont be # needed anymore and can be removed from the output, it does # not have a status and would otherwise be reported as skipped. - if case.freeform and case.status == TwisterStatus.NONE and len(instance.testcases) > 1: + if ( + case.freeform + and case.status == TwisterStatus.NONE + and len(instance.testcases) > 1 + ): continue testcase = {} testcase['identifier'] = case.name @@ -408,9 +440,15 @@ class Reporting: if instance.recording is not None: suite['recording'] = instance.recording - if (instance.status not in [TwisterStatus.NONE, TwisterStatus.ERROR, TwisterStatus.FILTER] - and self.env.options.create_rom_ram_report - and self.env.options.footprint_report is not None): + if ( + instance.status not in [ + TwisterStatus.NONE, + TwisterStatus.ERROR, + TwisterStatus.FILTER + ] + and self.env.options.create_rom_ram_report + and self.env.options.footprint_report is not None + ): # Init as empty data preparing for filtering properties. suite['footprint'] = {} @@ -506,7 +544,9 @@ class Reporting: if show_footprint: logger.log( logging.INFO if all_deltas else logging.WARNING, - f"{i.platform.name:<25} {i.testsuite.name:<60} {metric} {delta:<+4}, is now {value:6} {percentage:+.2%}") + f"{i.platform.name:<25} {i.testsuite.name:<60} {metric} {delta:<+4}," + f" is now {value:6} {percentage:+.2%}" + ) warnings += 1 @@ -523,7 +563,9 @@ class Reporting: count = self.env.options.report_summary log_txt = "The following issues were found " if count > self.instance_fail_count: - log_txt += f"(presenting {self.instance_fail_count} out of the {count} items requested):" + log_txt += ( + f"(presenting {self.instance_fail_count} out of the {count} items requested):" + ) else: log_txt += f"(showing the {count} of {self.instance_fail_count} items):" else: @@ -533,7 +575,12 @@ class Reporting: example_instance = None detailed_test_id = self.env.options.detailed_test_id for instance in self.instances.values(): - if instance.status not in [TwisterStatus.PASS, TwisterStatus.FILTER, TwisterStatus.SKIP, TwisterStatus.NOTRUN]: + if instance.status not in [ + TwisterStatus.PASS, + TwisterStatus.FILTER, + TwisterStatus.SKIP, + TwisterStatus.NOTRUN + ]: cnt += 1 if cnt == 1: logger.info("-+" * 40) @@ -543,7 +590,10 @@ class Reporting: if self.env.options.report_summary is not None and \ status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: status = Fore.RED + status.upper() + Fore.RESET - logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {status} ({instance.reason})") + logger.info( + f"{cnt}) {instance.testsuite.name} on {instance.platform.name}" + f" {status} ({instance.reason})" + ) example_instance = instance if cnt == count: break @@ -559,10 +609,16 @@ class Reporting: extra_parameters = '' if detailed_test_id else ' --no-detailed-test-id' logger.info(f"west twister -p -s {extra_parameters}, for example:") logger.info("") - logger.info(f"west twister -p {example_instance.platform.name} -s {example_instance.testsuite.name}" - f"{extra_parameters}") + logger.info( + f"west twister -p {example_instance.platform.name}" + f" -s {example_instance.testsuite.name}" + f"{extra_parameters}" + ) logger.info("or with west:") - logger.info(f"west build -p -b {example_instance.platform.name} {cwd_rel_path} -T {example_instance.testsuite.id}") + logger.info( + f"west build -p -b {example_instance.platform.name} {cwd_rel_path}" + f" -T {example_instance.testsuite.id}" + ) logger.info("-+" * 40) def summary(self, results, ignore_unrecognized_sections, duration): @@ -589,45 +645,97 @@ class Reporting: else: pass_rate = 0 + passed_color = ( + TwisterStatus.get_color(TwisterStatus.FAIL) + if failed + else TwisterStatus.get_color(TwisterStatus.PASS) + ) + unfiltered_configs = results.total - results.filtered_configs + notrun_number_section = ( + f'{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun}{Fore.RESET}' + if results.notrun + else f'{results.notrun}' + ) + failed_number_section = ( + f'{TwisterStatus.get_color(TwisterStatus.FAIL)}{results.failed}{Fore.RESET}' + if results.failed + else f'{results.failed}' + ) + error_number_section = ( + f'{TwisterStatus.get_color(TwisterStatus.ERROR)}{results.error}{Fore.RESET}' + if results.error + else f'{results.error}' + ) + warnings_number_section = ( + f'{Fore.YELLOW}{self.plan.warnings + results.warnings}{Fore.RESET}' + if (self.plan.warnings + results.warnings) + else 'no' + ) logger.info( - f"{TwisterStatus.get_color(TwisterStatus.FAIL) if failed else TwisterStatus.get_color(TwisterStatus.PASS)}{results.passed}" - f" of {results.total - results.filtered_configs}{Fore.RESET}" + f"{passed_color}{results.passed} of {unfiltered_configs}{Fore.RESET}" f" executed test configurations passed ({pass_rate:.2%})," - f" {f'{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun}{Fore.RESET}' if results.notrun else f'{results.notrun}'} built (not run)," - f" {f'{TwisterStatus.get_color(TwisterStatus.FAIL)}{results.failed}{Fore.RESET}' if results.failed else f'{results.failed}'} failed," - f" {f'{TwisterStatus.get_color(TwisterStatus.ERROR)}{results.error}{Fore.RESET}' if results.error else f'{results.error}'} errored," - f" with {f'{Fore.YELLOW}{self.plan.warnings + results.warnings}{Fore.RESET}' if (self.plan.warnings + results.warnings) else 'no'} warnings" + f" {notrun_number_section} built (not run)," + f" {failed_number_section} failed," + f" {error_number_section} errored," + f" with {warnings_number_section} warnings" f" in {duration:.2f} seconds." ) total_platforms = len(self.platforms) - filtered_platforms = set(instance.platform.name for instance in self.instances.values() - if instance.status not in[TwisterStatus.FILTER, TwisterStatus.NOTRUN, TwisterStatus.SKIP]) + filtered_platforms = set( + instance.platform.name for instance in self.instances.values() + if instance.status not in [ + TwisterStatus.FILTER, + TwisterStatus.NOTRUN, + TwisterStatus.SKIP + ] + ) # if we are only building, do not report about tests being executed. if self.platforms and not self.env.options.build_only: - executed_cases = results.cases - results.filtered_cases - results.skipped_cases - results.notrun_cases + executed_cases = ( + results.cases + - results.filtered_cases + - results.skipped_cases + - results.notrun_cases + ) pass_rate = 100 * (float(results.passed_cases) / float(executed_cases)) \ if executed_cases != 0 else 0 platform_rate = (100 * len(filtered_platforms) / len(self.platforms)) + blocked_after_comma = ", " + str(results.blocked_cases) + " blocked" + failed_after_comma = ", " + str(results.failed_cases) + " failed" + error_after_comma = ", " + str(results.error_cases) + " errored" + none_after_comma = ", " + str(results.none_cases) + " without a status" logger.info( - f'{results.passed_cases} of {executed_cases} executed test cases passed ({pass_rate:02.2f}%)' - f'{", " + str(results.blocked_cases) + " blocked" if results.blocked_cases else ""}' - f'{", " + str(results.failed_cases) + " failed" if results.failed_cases else ""}' - f'{", " + str(results.error_cases) + " errored" if results.error_cases else ""}' - f'{", " + str(results.none_cases) + " without a status" if results.none_cases else ""}' - f' on {len(filtered_platforms)} out of total {total_platforms} platforms ({platform_rate:02.2f}%).' + f'{results.passed_cases} of {executed_cases} executed test cases passed' + f' ({pass_rate:02.2f}%)' + f'{blocked_after_comma if results.blocked_cases else ""}' + f'{failed_after_comma if results.failed_cases else ""}' + f'{error_after_comma if results.error_cases else ""}' + f'{none_after_comma if results.none_cases else ""}' + f' on {len(filtered_platforms)} out of total {total_platforms} platforms' + f' ({platform_rate:02.2f}%).' ) if results.skipped_cases or results.notrun_cases: + not_executed = results.skipped_cases + results.notrun_cases + skipped_after_colon = " " + str(results.skipped_cases) + " skipped" + notrun_after_comma = ( + (", " if results.skipped_cases else " ") + + str(results.notrun_cases) + + " not run (built only)" + ) logger.info( - f'{results.skipped_cases + results.notrun_cases} selected test cases not executed:' \ - f'{" " + str(results.skipped_cases) + " skipped" if results.skipped_cases else ""}' \ - f'{(", " if results.skipped_cases else " ") + str(results.notrun_cases) + " not run (built only)" if results.notrun_cases else ""}' \ + f'{not_executed} selected test cases not executed:' \ + f'{skipped_after_colon if results.skipped_cases else ""}' \ + f'{notrun_after_comma if results.notrun_cases else ""}' \ f'.' ) built_only = results.total - run - results.filtered_configs - logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ -{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{built_only}{Fore.RESET} test configurations were only built.") + logger.info( + f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms," + f" {TwisterStatus.get_color(TwisterStatus.NOTRUN)}{built_only}{Fore.RESET}" + " test configurations were only built." + ) def save_reports(self, name, suffix, report_dir, no_update, platform_reports): if not self.instances: diff --git a/scripts/pylib/twister/twisterlib/runner.py b/scripts/pylib/twister/twisterlib/runner.py index 2d4b64dfc30..76fe29b9b85 100644 --- a/scripts/pylib/twister/twisterlib/runner.py +++ b/scripts/pylib/twister/twisterlib/runner.py @@ -153,7 +153,10 @@ class ExecutionCounter: Node(f"Total test suites: {self.total}", parent=root) processed_suites = Node(f"Processed test suites: {self.done}", parent=root) - filtered_suites = Node(f"Filtered test suites: {self.filtered_configs}", parent=processed_suites) + filtered_suites = Node( + f"Filtered test suites: {self.filtered_configs}", + parent=processed_suites + ) Node(f"Filtered test suites (static): {self.filtered_static}", parent=filtered_suites) Node(f"Filtered test suites (at runtime): {self.filtered_runtime}", parent=filtered_suites) selected_suites = Node(f"Selected test suites: {selected_configs}", parent=processed_suites) @@ -171,10 +174,16 @@ class ExecutionCounter: Node(f"Built only test cases: {self.notrun_cases}", parent=selected_cases_node) Node(f"Blocked test cases: {self.blocked_cases}", parent=selected_cases_node) Node(f"Failed test cases: {self.failed_cases}", parent=selected_cases_node) - error_cases_node = Node(f"Errors in test cases: {self.error_cases}", parent=selected_cases_node) + error_cases_node = Node( + f"Errors in test cases: {self.error_cases}", + parent=selected_cases_node + ) if self.none_cases or self.started_cases: - Node("The following test case statuses should not appear in a proper execution", parent=error_cases_node) + Node( + "The following test case statuses should not appear in a proper execution", + parent=error_cases_node + ) if self.none_cases: Node(f"Statusless test cases: {self.none_cases}", parent=error_cases_node) if self.started_cases: @@ -550,7 +559,10 @@ class CMake: duration = time.time() - start_time self.instance.build_time += duration if p.returncode == 0: - msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" + msg = ( + f"Finished building {self.source_dir} for {self.platform.name}" + f" in {duration:.2f} seconds" + ) logger.debug(msg) if not self.instance.run: @@ -562,7 +574,11 @@ class CMake: if out: log_msg = out.decode(self.default_encoding) - with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: + with open( + os.path.join(self.build_dir, self.log), + "a", + encoding=self.default_encoding + ) as log: log.write(log_msg) else: return None @@ -571,12 +587,22 @@ class CMake: log_msg = "" if out: log_msg = out.decode(self.default_encoding) - with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: + with open( + os.path.join(self.build_dir, self.log), + "a", + encoding=self.default_encoding + ) as log: log.write(log_msg) if log_msg: - overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram\\d_\\d_seg)' overflowed by", log_msg) - imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg) + overflow_found = re.findall( + "region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram\\d_\\d_seg)' overflowed by", + log_msg + ) + imgtool_overflow_found = re.findall( + r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", + log_msg + ) if overflow_found and not self.options.overflow_as_errors: logger.debug(f"Test skipped due to {overflow_found[0]} Overflow") self.instance.status = TwisterStatus.SKIP @@ -650,7 +676,9 @@ class CMake: cmake_args.extend(cmake_opts) if self.instance.testsuite.required_snippets: - cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))] + cmake_opts = [ + '-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets)) + ] cmake_args.extend(cmake_opts) cmake = shutil.which('cmake') @@ -683,7 +711,10 @@ class CMake: if p.returncode == 0: filter_results = self.parse_generated(filter_stages) - msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" + msg = ( + f"Finished running cmake {self.source_dir} for {self.platform.name}" + f" in {duration:.2f} seconds" + ) logger.debug(msg) ret = { 'returncode': p.returncode, @@ -701,7 +732,11 @@ class CMake: if out: os.makedirs(self.build_dir, exist_ok=True) - with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: + with open( + os.path.join(self.build_dir, self.log), + "a", + encoding=self.default_encoding + ) as log: log_msg = out.decode(self.default_encoding) log.write(log_msg) @@ -734,11 +769,12 @@ class FilterBuilder(CMake): edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle") else: cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") - # .config is only available after kconfig stage in cmake. If only dt based filtration is required - # package helper call won't produce .config + # .config is only available after kconfig stage in cmake. + # If only dt based filtration is required package helper call won't produce .config if not filter_stages or "kconfig" in filter_stages: defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") - # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages + # dt is compiled before kconfig, + # so edt_pickle is available regardless of choice of filter stages edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") @@ -811,7 +847,13 @@ class FilterBuilder(CMake): class ProjectBuilder(FilterBuilder): def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs): - super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver) + super().__init__( + instance.testsuite, + instance.platform, + instance.testsuite.source_dir, + instance.build_dir, + jobserver + ) self.log = "build.log" self.instance = instance @@ -976,14 +1018,22 @@ class ProjectBuilder(FilterBuilder): # due to ram/rom overflow. if self.instance.status == TwisterStatus.SKIP: results.skipped_increment() - self.instance.add_missing_case_status(TwisterStatus.SKIP, self.instance.reason) + self.instance.add_missing_case_status( + TwisterStatus.SKIP, + self.instance.reason + ) if ret.get('returncode', 1) > 0: - self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason) + self.instance.add_missing_case_status( + TwisterStatus.BLOCK, + self.instance.reason + ) next_op = 'report' else: if self.instance.testsuite.harness in ['ztest', 'test']: - logger.debug(f"Determine test cases for test instance: {self.instance.name}") + logger.debug( + f"Determine test cases for test instance: {self.instance.name}" + ) try: self.determine_testcases(results) next_op = 'gather_metrics' @@ -1015,9 +1065,15 @@ class ProjectBuilder(FilterBuilder): next_op = 'run' else: if self.instance.status == TwisterStatus.NOTRUN: - run_conditions = f"(run:{self.instance.run}, handler.ready:{self.instance.handler.ready})" + run_conditions = ( + f"(run:{self.instance.run}," + f" handler.ready:{self.instance.handler.ready})" + ) logger.debug(f"Instance {self.instance.name} can't run {run_conditions}") - self.instance.add_missing_case_status(TwisterStatus.NOTRUN, "Nowhere to run") + self.instance.add_missing_case_status( + TwisterStatus.NOTRUN, + "Nowhere to run" + ) next_op = 'report' except StatusAttributeError as sae: logger.error(str(sae)) @@ -1090,7 +1146,10 @@ class ProjectBuilder(FilterBuilder): mode = message.get("mode") if mode == "device": self.cleanup_device_testing_artifacts() - elif mode == "passed" or (mode == "all" and self.instance.reason != "CMake build failure"): + elif ( + mode == "passed" + or (mode == "all" and self.instance.reason != "CMake build failure") + ): self.cleanup_artifacts() except StatusAttributeError as sae: logger.error(str(sae)) @@ -1120,7 +1179,9 @@ class ProjectBuilder(FilterBuilder): yaml_testsuite_name = self.instance.testsuite.id logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}") - logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.") + logger.debug( + f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases." + ) new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)") detected_cases = [] @@ -1154,7 +1215,10 @@ class ProjectBuilder(FilterBuilder): if detected_cases: logger.debug(f"Detected Ztest cases: [{', '.join(detected_cases)}] in {elf_file}") - tc_keeper = {tc.name: {'status': tc.status, 'reason': tc.reason} for tc in self.instance.testcases} + tc_keeper = { + tc.name: {'status': tc.status, 'reason': tc.reason} + for tc in self.instance.testcases + } self.instance.testcases.clear() self.instance.testsuite.testcases.clear() @@ -1400,8 +1464,10 @@ class ProjectBuilder(FilterBuilder): f' test case {tc.name}.') results.warnings_increment(1) case _: - logger.warning(f'An unknown status "{tc.status}" detected in instance {instance.name},' - f' test case {tc.name}.') + logger.warning( + f'An unknown status "{tc.status}" detected in instance {instance.name},' + f' test case {tc.name}.' + ) results.warnings_increment(1) @@ -1415,7 +1481,9 @@ class ProjectBuilder(FilterBuilder): self._add_instance_testcases_to_status_counts(instance, results) - status = f'{TwisterStatus.get_color(instance.status)}{str.upper(instance.status)}{Fore.RESET}' + status = ( + f'{TwisterStatus.get_color(instance.status)}{str.upper(instance.status)}{Fore.RESET}' + ) if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: if instance.status == TwisterStatus.ERROR: @@ -1426,7 +1494,9 @@ class ProjectBuilder(FilterBuilder): status += " " + instance.reason else: logger.error( - f"{instance.platform.name:<25} {instance.testsuite.name:<50} {status}: {instance.reason}") + f"{instance.platform.name:<25} {instance.testsuite.name:<50}" + f" {status}: {instance.reason}" + ) if not self.options.verbose: self.log_info_file(self.options.inline_logs) elif instance.status == TwisterStatus.SKIP: @@ -1479,35 +1549,46 @@ class ProjectBuilder(FilterBuilder): else: completed_perc = 0 if total_to_do > 0: - completed_perc = int((float(results.done - results.filtered_static) / total_to_do) * 100) + completed_perc = int( + (float(results.done - results.filtered_static) / total_to_do) * 100 + ) unfiltered = results.done - results.filtered_static + complete_section = ( + f"{TwisterStatus.get_color(TwisterStatus.PASS)}" + f"{unfiltered:>4}/{total_to_do:>4}" + f"{Fore.RESET} {completed_perc:>2}%" + ) + notrun_section = ( + f"{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun:>4}{Fore.RESET}" + ) filtered_section_color = ( TwisterStatus.get_color(TwisterStatus.SKIP) if results.filtered_configs > 0 else Fore.RESET ) + filtered_section = ( + f"{filtered_section_color}{results.filtered_configs:>4}{Fore.RESET}" + ) failed_section_color = ( TwisterStatus.get_color(TwisterStatus.FAIL) if results.failed > 0 else Fore.RESET ) + failed_section = ( + f"{failed_section_color}{results.failed:>4}{Fore.RESET}" + ) error_section_color = ( TwisterStatus.get_color(TwisterStatus.ERROR) if results.error > 0 else Fore.RESET ) - sys.stdout.write( - f"INFO - Total complete: " - f"{TwisterStatus.get_color(TwisterStatus.PASS)}" - f"{unfiltered:>4}/{total_to_do:>4}" - f"{Fore.RESET} {completed_perc:>2}%" - " built (not run):" - f" {TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun:>4}{Fore.RESET}," - " filtered:" - f" {filtered_section_color}{results.filtered_configs:>4}{Fore.RESET}," - " failed:" - f" {failed_section_color}{results.failed:>4}{Fore.RESET}," - " error:" - f" {error_section_color}{results.error:>4}{Fore.RESET}\r" + error_section = ( + f"{error_section_color}{results.error:>4}{Fore.RESET}" + ) + sys.stdout.write( + f"INFO - Total complete: {complete_section}" + f" built (not run): {notrun_section}," + f" filtered: {filtered_section}," + f" failed: {failed_section}," + f" error: {error_section}\r" ) - sys.stdout.flush() @staticmethod @@ -1648,7 +1729,10 @@ class ProjectBuilder(FilterBuilder): if instance.status not in [TwisterStatus.ERROR, TwisterStatus.FAIL, TwisterStatus.SKIP]: if instance.platform.type not in ["native", "qemu", "unit"]: generate_warning = bool(instance.platform.type == "mcu") - size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning) + size_calc = instance.calculate_sizes( + from_buildlog=from_buildlog, + generate_warning=generate_warning + ) instance.metrics["used_ram"] = size_calc.get_used_ram() instance.metrics["used_rom"] = size_calc.get_used_rom() instance.metrics["available_rom"] = size_calc.get_available_rom() @@ -1773,12 +1857,23 @@ class TwisterRunner: f" {self.results.filtered_configs - self.results.filtered_static} at runtime)." ) - def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False): + def add_tasks_to_queue( + self, + pipeline, + build_only=False, + test_only=False, + retry_build_errors=False + ): for instance in self.instances.values(): if build_only: instance.run = False - no_retry_statuses = [TwisterStatus.PASS, TwisterStatus.SKIP, TwisterStatus.FILTER, TwisterStatus.NOTRUN] + no_retry_statuses = [ + TwisterStatus.PASS, + TwisterStatus.SKIP, + TwisterStatus.FILTER, + TwisterStatus.NOTRUN + ] if not retry_build_errors: no_retry_statuses.append(TwisterStatus.ERROR) @@ -1789,12 +1884,19 @@ class TwisterRunner: instance.status = TwisterStatus.NONE # Previous states should be removed from the stats if self.results.iteration > 1: - ProjectBuilder._add_instance_testcases_to_status_counts(instance, self.results, decrement=True) + ProjectBuilder._add_instance_testcases_to_status_counts( + instance, + self.results, + decrement=True + ) # Check if cmake package_helper script can be run in advance. instance.filter_stages = [] if instance.testsuite.filter: - instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys()) + instance.filter_stages = self.get_cmake_filter_stages( + instance.testsuite.filter, + expr_parser.reserved.keys() + ) if test_only and instance.run: pipeline.put({"op": "run", "test": instance}) @@ -1870,13 +1972,16 @@ class TwisterRunner: @staticmethod def get_cmake_filter_stages(filt, logic_keys): - """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed.""" + """Analyze filter expressions from test yaml + and decide if dts and/or kconfig based filtering will be needed. + """ dts_required = False kconfig_required = False full_required = False filter_stages = [] - # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces + # Compress args in expressions like "function('x', 'y')" + # so they are not split when splitting by whitespaces filt = filt.replace(", ", ",") # Remove logic words for k in logic_keys: diff --git a/scripts/pylib/twister/twisterlib/size_calc.py b/scripts/pylib/twister/twisterlib/size_calc.py index 3767dcb262c..8034dca44ee 100644 --- a/scripts/pylib/twister/twisterlib/size_calc.py +++ b/scripts/pylib/twister/twisterlib/size_calc.py @@ -301,7 +301,10 @@ class SizeCalculator: file_content = file.readlines() else: if self.generate_warning: - logger.error(msg=f"Incorrect path to build.log file to analyze footprints. Please check the path {self.buildlog_filename}.") + logger.error( + msg="Incorrect path to build.log file to analyze footprints." + f" Please check the path {self.buildlog_filename}." + ) file_content = [] return file_content @@ -325,13 +328,16 @@ class SizeCalculator: break # If the file does not contain information about memory footprint, the warning is raised. if result == -1: - logger.warning(msg=f"Information about memory footprint for this test configuration is not found. Please check file {self.buildlog_filename}.") + logger.warning( + msg="Information about memory footprint for this test configuration is not found." + f" Please check file {self.buildlog_filename}." + ) return result def _get_lines_with_footprint(self, start_offset: int, file_content: list[str]) -> list[str]: """Get lines from the file with a memory footprint. - @param start_offset (int) Offset with the first line of the information about memory footprint. + @param start_offset (int) Offset with the memory footprint's first line. @param file_content (list[str]) Content of the build.log file. @return Lines with information about memory footprint (list[str]) """ @@ -368,7 +374,12 @@ class SizeCalculator: result = [] PATTERN_SPLIT_COLUMNS = " +" for line in text_lines: - line = [column.rstrip(":") for column in re.split(pattern=PATTERN_SPLIT_COLUMNS, string=line)] + line = [ + column.rstrip(":") for column in re.split( + pattern=PATTERN_SPLIT_COLUMNS, + string=line + ) + ] result.append(list(filter(None, line))) else: result = [[]] @@ -384,7 +395,10 @@ class SizeCalculator: if len(data_lines) != self.USEFUL_LINES_AMOUNT: data_lines = [[]] if self.generate_warning: - logger.warning(msg=f"Incomplete information about memory footprint. Please check file {self.buildlog_filename}") + logger.warning( + msg="Incomplete information about memory footprint." + f" Please check file {self.buildlog_filename}" + ) else: for idx, line in enumerate(data_lines): # Line with description of the columns @@ -423,13 +437,18 @@ class SizeCalculator: @return Table with information about memory usage (list[list[str]]) """ file_content = self._get_buildlog_file_content() - data_line_start_idx = self._find_offset_of_last_pattern_occurrence(file_content=file_content) + data_line_start_idx = self._find_offset_of_last_pattern_occurrence( + file_content=file_content + ) if data_line_start_idx < 0: data_from_content = [[]] else: # Clean lines and separate information to columns - information_lines = self._get_lines_with_footprint(start_offset=data_line_start_idx, file_content=file_content) + information_lines = self._get_lines_with_footprint( + start_offset=data_line_start_idx, + file_content=file_content + ) information_lines = self._clear_whitespaces_from_lines(text_lines=information_lines) data_from_content = self._divide_text_lines_into_columns(text_lines=information_lines) data_from_content = self._unify_prefixes_on_all_values(data_lines=data_from_content) @@ -446,7 +465,10 @@ class SizeCalculator: self.available_ram = 0 self.available_rom = 0 if self.generate_warning: - logger.warning(msg=f"Missing information about memory footprint. Check file {self.buildlog_filename}.") + logger.warning( + msg="Missing information about memory footprint." + f" Check file {self.buildlog_filename}." + ) else: ROW_RAM_IDX = 2 ROW_ROM_IDX = 1 diff --git a/scripts/pylib/twister/twisterlib/testinstance.py b/scripts/pylib/twister/twisterlib/testinstance.py index 7d309dd6c52..fcfc7562345 100644 --- a/scripts/pylib/twister/twisterlib/testinstance.py +++ b/scripts/pylib/twister/twisterlib/testinstance.py @@ -70,9 +70,15 @@ class TestInstance: if testsuite.detailed_test_id: self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name) else: - # if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure + # if suite is not in zephyr, + # keep only the part after ".." in reconstructed dir structure source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] - self.build_dir = os.path.join(outdir, platform.normalized_name, source_dir_rel, testsuite.name) + self.build_dir = os.path.join( + outdir, + platform.normalized_name, + source_dir_rel, + testsuite.name + ) self.run_id = self._get_run_id() self.domains = None # Instance need to use sysbuild if a given suite or a platform requires it @@ -281,7 +287,9 @@ class TestInstance: # check if test is runnable in pytest if self.testsuite.harness == 'pytest': - target_ready = bool(filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST) + target_ready = bool( + filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST + ) if filter != 'runnable' and \ simulator and \ @@ -300,7 +308,14 @@ class TestInstance: return testsuite_runnable and target_ready - def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=None): + def create_overlay( + self, + platform, + enable_asan=False, + enable_ubsan=False, + enable_coverage=False, + coverage_platform=None + ): if coverage_platform is None: coverage_platform = [] # Create this in a "twister/" subdirectory otherwise this @@ -349,7 +364,11 @@ class TestInstance: return content - def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator: + def calculate_sizes( + self, + from_buildlog: bool = False, + generate_warning: bool = True + ) -> SizeCalculator: """Get the RAM/ROM sizes of a test case. This can only be run after the instance has been executed by diff --git a/scripts/pylib/twister/twisterlib/testplan.py b/scripts/pylib/twister/twisterlib/testplan.py index 09a9c336574..336307f0c0b 100755 --- a/scripts/pylib/twister/twisterlib/testplan.py +++ b/scripts/pylib/twister/twisterlib/testplan.py @@ -91,7 +91,13 @@ class TestPlan: os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "quarantine-schema.yaml")) - tc_schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "test-config-schema.yaml") + tc_schema_path = os.path.join( + ZEPHYR_BASE, + "scripts", + "schemas", + "twister", + "test-config-schema.yaml" + ) SAMPLE_FILENAME = 'sample.yaml' TESTSUITE_FILENAME = 'testcase.yaml' @@ -189,7 +195,9 @@ class TestPlan: if num == 0: raise TwisterRuntimeError("No test cases found at the specified location...") if self.load_errors: - raise TwisterRuntimeError(f"Found {self.load_errors} errors loading {num} test configurations.") + raise TwisterRuntimeError( + f"Found {self.load_errors} errors loading {num} test configurations." + ) self.find_subtests() # get list of scenarios we have parsed into one list @@ -217,7 +225,10 @@ class TestPlan: def load(self): if self.options.report_suffix: - last_run = os.path.join(self.options.outdir, f"twister_{self.options.report_suffix}.json") + last_run = os.path.join( + self.options.outdir, + f"twister_{self.options.report_suffix}.json" + ) else: last_run = os.path.join(self.options.outdir, "twister.json") @@ -264,7 +275,9 @@ class TestPlan: if int(subset) > 0 and int(sets) >= int(subset): logger.info(f"Running only a subset: {subset}/{sets}") else: - raise TwisterRuntimeError(f"You have provided a wrong subset value: {self.options.subset}.") + raise TwisterRuntimeError( + f"You have provided a wrong subset value: {self.options.subset}." + ) self.generate_subset(subset, int(sets)) @@ -584,7 +597,13 @@ class TestPlan: for name in parsed_data.scenarios: suite_dict = parsed_data.get_scenario(name) - suite = TestSuite(root, suite_path, name, data=suite_dict, detailed_test_id=self.options.detailed_test_id) + suite = TestSuite( + root, + suite_path, + name, + data=suite_dict, + detailed_test_id=self.options.detailed_test_id + ) # convert to fully qualified names _integration = [] @@ -619,14 +638,21 @@ class TestPlan: if testsuite_filter: scenario = os.path.basename(suite.name) - if suite.name and (suite.name in testsuite_filter or scenario in testsuite_filter): + if ( + suite.name + and (suite.name in testsuite_filter or scenario in testsuite_filter) + ): self.testsuites[suite.name] = suite elif suite.name in self.testsuites: - msg = f"test suite '{suite.name}' in '{suite.yamlfile}' is already added" + msg = ( + f"test suite '{suite.name}' in '{suite.yamlfile}' is already added" + ) if suite.yamlfile == self.testsuites[suite.name].yamlfile: logger.debug(f"Skip - {msg}") else: - msg = f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'" + msg = ( + f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'" + ) raise TwisterRuntimeError(msg) else: self.testsuites[suite.name] = suite @@ -651,7 +677,10 @@ class TestPlan: if self.quarantine: simulator = plat.simulator_by_name(self.options) matched_quarantine = self.quarantine.get_matched_quarantine( - instance.testsuite.id, plat.name, plat.arch, simulator.name if simulator is not None else 'na' + instance.testsuite.id, + plat.name, + plat.arch, + simulator.name if simulator is not None else 'na' ) if matched_quarantine and not self.options.quarantine_verify: instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE) @@ -722,7 +751,11 @@ class TestPlan: if instance.status != TwisterStatus.NONE: tc_reason = tc.get('reason') if tc_status != TwisterStatus.NONE: - case = instance.set_case_status_by_name(identifier, tc_status, tc_reason) + case = instance.set_case_status_by_name( + identifier, + tc_status, + tc_reason + ) case.duration = tc.get('execution_time', 0) if tc.get('log'): case.output = tc.get('log') @@ -795,7 +828,9 @@ class TestPlan: platform_filter = _platforms platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) elif emu_filter: - platforms = list(filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms)) + platforms = list( + filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms) + ) elif vendor_filter: platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms)) logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}") @@ -820,11 +855,16 @@ class TestPlan: keyed_tests = {} for ts_name, ts in self.testsuites.items(): - if ts.build_on_all and not platform_filter and platform_config.get('increased_platform_scope', True): + if ( + ts.build_on_all + and not platform_filter + and platform_config.get('increased_platform_scope', True) + ): platform_scope = self.platforms elif ts.integration_platforms: - integration_platforms = list(filter(lambda item: item.name in ts.integration_platforms, - self.platforms)) + integration_platforms = list( + filter(lambda item: item.name in ts.integration_platforms, self.platforms) + ) if self.options.integration: self.verify_platforms_existence( ts.integration_platforms, f"{ts_name} - integration_platforms") @@ -844,14 +884,20 @@ class TestPlan: # If there isn't any overlap between the platform_allow list and the platform_scope # we set the scope to the platform_allow list - if ts.platform_allow and not platform_filter and not integration and platform_config.get('increased_platform_scope', True): + if ( + ts.platform_allow + and not platform_filter + and not integration + and platform_config.get('increased_platform_scope', True) + ): self.verify_platforms_existence(ts.platform_allow, f"{ts_name} - platform_allow") a = set(platform_scope) b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms)) c = a.intersection(b) if not c: - platform_scope = list(filter(lambda item: item.name in ts.platform_allow, \ - self.platforms)) + platform_scope = list( + filter(lambda item: item.name in ts.platform_allow, self.platforms) + ) # list of instances per testsuite, aka configurations. instance_list = [] for plat in platform_scope: @@ -869,21 +915,34 @@ class TestPlan: continue if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)): - instance.add_filter(f"one or more required modules not available: {','.join(ts.modules)}", Filters.MODULE) + instance.add_filter( + f"one or more required modules not available: {','.join(ts.modules)}", + Filters.MODULE + ) if self.options.level: tl = self.get_level(self.options.level) if tl is None: - instance.add_filter(f"Unknown test level '{self.options.level}'", Filters.TESTPLAN) + instance.add_filter( + f"Unknown test level '{self.options.level}'", + Filters.TESTPLAN + ) else: planned_scenarios = tl.scenarios - if ts.id not in planned_scenarios and not set(ts.levels).intersection(set(tl.levels)): + if ( + ts.id not in planned_scenarios + and not set(ts.levels).intersection(set(tl.levels)) + ): instance.add_filter("Not part of requested test plan", Filters.TESTPLAN) if runnable and not instance.run: instance.add_filter("Not runnable on device", Filters.CMD_LINE) - if self.options.integration and ts.integration_platforms and plat.name not in ts.integration_platforms: + if ( + self.options.integration + and ts.integration_platforms + and plat.name not in ts.integration_platforms + ): instance.add_filter("Not part of integration platforms", Filters.TESTSUITE) if ts.skip: @@ -915,7 +974,10 @@ class TestPlan: instance.add_filter("In test case arch exclude", Filters.TESTSUITE) if ts.vendor_allow and plat.vendor not in ts.vendor_allow: - instance.add_filter("Not in test suite vendor allow list", Filters.TESTSUITE) + instance.add_filter( + "Not in test suite vendor allow list", + Filters.TESTSUITE + ) if ts.vendor_exclude and plat.vendor in ts.vendor_exclude: instance.add_filter("In test suite vendor exclude", Filters.TESTSUITE) @@ -923,7 +985,10 @@ class TestPlan: if ts.platform_exclude and plat.name in ts.platform_exclude: # works only when we have all platforms parsed, -p limits parsing... if not platform_filter: - self.verify_platforms_existence(ts.platform_exclude, f"{ts_name} - platform_exclude") + self.verify_platforms_existence( + ts.platform_exclude, + f"{ts_name} - platform_exclude" + ) instance.add_filter("In test case platform exclude", Filters.TESTSUITE) if ts.toolchain_exclude and toolchain in ts.toolchain_exclude: @@ -944,7 +1009,10 @@ class TestPlan: instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN) if not plat.env_satisfied: - instance.add_filter("Environment ({}) not satisfied".format(", ".join(plat.env)), Filters.PLATFORM) + instance.add_filter( + "Environment ({}) not satisfied".format(", ".join(plat.env)), + Filters.PLATFORM + ) if not force_toolchain \ and toolchain and (toolchain not in plat.supported_toolchains) \ @@ -958,7 +1026,10 @@ class TestPlan: if ts.harness: sim = plat.simulator_by_name(self.options.sim_name) if ts.harness == 'robot' and not (sim and sim.name == 'renode'): - instance.add_filter("No robot support for the selected platform", Filters.SKIP) + instance.add_filter( + "No robot support for the selected platform", + Filters.SKIP + ) if ts.depends_on: dep_intersection = ts.depends_on.intersection(set(plat.supported)) @@ -969,7 +1040,10 @@ class TestPlan: instance.add_filter("Not enough FLASH", Filters.PLATFORM) if set(plat.ignore_tags) & ts.tags: - instance.add_filter("Excluded tags per platform (exclude_tags)", Filters.PLATFORM) + instance.add_filter( + "Excluded tags per platform (exclude_tags)", + Filters.PLATFORM + ) if plat.only_tags and not set(plat.only_tags) & ts.tags: instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM) @@ -977,7 +1051,10 @@ class TestPlan: if ts.required_snippets: missing_snippet = False snippet_args = {"snippets": ts.required_snippets} - found_snippets = snippets.find_snippets_in_roots(snippet_args, [*self.env.snippet_roots, Path(ts.source_dir)]) + found_snippets = snippets.find_snippets_in_roots( + snippet_args, + [*self.env.snippet_roots, Path(ts.source_dir)] + ) # Search and check that all required snippet files are found for this_snippet in snippet_args['snippets']: @@ -1019,19 +1096,25 @@ class TestPlan: # handle quarantined tests self.handle_quarantined_tests(instance, plat) - # platform_key is a list of unique platform attributes that form a unique key a test - # will match against to determine if it should be scheduled to run. A key containing a - # field name that the platform does not have will filter the platform. + # platform_key is a list of unique platform attributes that form a unique key + # a test will match against to determine if it should be scheduled to run. + # A key containing a field name that the platform does not have + # will filter the platform. # # A simple example is keying on arch and simulation # to run a test once per unique (arch, simulation) platform. - if not ignore_platform_key and hasattr(ts, 'platform_key') and len(ts.platform_key) > 0: + if ( + not ignore_platform_key + and hasattr(ts, 'platform_key') + and len(ts.platform_key) > 0 + ): key_fields = sorted(set(ts.platform_key)) keys = [getattr(plat, key_field, None) for key_field in key_fields] for key in keys: if key is None or key == 'na': instance.add_filter( - f"Excluded platform missing key fields demanded by test {key_fields}", + "Excluded platform missing key fields" + f" demanded by test {key_fields}", Filters.PLATFORM ) break @@ -1041,8 +1124,17 @@ class TestPlan: test_keys = tuple(test_keys) keyed_test = keyed_tests.get(test_keys) if keyed_test is not None: - plat_key = {key_field: getattr(keyed_test['plat'], key_field) for key_field in key_fields} - instance.add_filter(f"Already covered for key {key} by platform {keyed_test['plat'].name} having key {plat_key}", Filters.PLATFORM_KEY) + plat_key = { + key_field: getattr( + keyed_test['plat'], + key_field + ) for key_field in key_fields + } + instance.add_filter( + f"Already covered for key {key}" + f" by platform {keyed_test['plat'].name} having key {plat_key}", + Filters.PLATFORM_KEY + ) else: # do not add a platform to keyed tests if previously # filtered @@ -1066,7 +1158,12 @@ class TestPlan: _platform_allow = set(ts.platform_allow) _intersection = _default_p.intersection(_platform_allow) if _intersection: - aa = list(filter(lambda _scenario: _scenario.platform.name in _intersection, instance_list)) + aa = list( + filter( + lambda _scenario: _scenario.platform.name in _intersection, + instance_list + ) + ) self.add_instances(aa) else: self.add_instances(instance_list) @@ -1074,20 +1171,36 @@ class TestPlan: # add integration platforms to the list of default # platforms, even if we are not in integration mode _platforms = self.default_platforms + ts.integration_platforms - instances = list(filter(lambda ts: ts.platform.name in _platforms, instance_list)) + instances = list( + filter(lambda ts: ts.platform.name in _platforms, instance_list) + ) self.add_instances(instances) elif integration: - instances = list(filter(lambda item: item.platform.name in ts.integration_platforms, instance_list)) + instances = list( + filter( + lambda item: item.platform.name in ts.integration_platforms, + instance_list + ) + ) self.add_instances(instances) elif emulation_platforms: self.add_instances(instance_list) - for instance in list(filter(lambda inst: not - inst.platform.simulator_by_name(self.options.sim_name), instance_list)): + for instance in list( + filter( + lambda inst: not inst.platform.simulator_by_name(self.options.sim_name), + instance_list + ) + ): instance.add_filter("Not an emulated platform", Filters.CMD_LINE) elif vendor_platforms: self.add_instances(instance_list) - for instance in list(filter(lambda inst: inst.platform.vendor not in vendor_filter, instance_list)): + for instance in list( + filter( + lambda inst: inst.platform.vendor not in vendor_filter, + instance_list + ) + ): instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE) else: self.add_instances(instance_list) @@ -1101,7 +1214,9 @@ class TestPlan: self.selected_platforms = set(p.platform.name for p in self.instances.values()) - filtered_instances = list(filter(lambda item: item.status == TwisterStatus.FILTER, self.instances.values())) + filtered_instances = list( + filter(lambda item: item.status == TwisterStatus.FILTER, self.instances.values()) + ) for filtered_instance in filtered_instances: change_skip_to_error_if_integration(self.options, filtered_instance) @@ -1195,4 +1310,6 @@ def change_skip_to_error_if_integration(options, instance): return instance.status = TwisterStatus.ERROR instance.reason += " but is one of the integration platforms" - logger.debug(f"Changing status of {instance.name} to ERROR because it is an integration platform") + logger.debug( + f"Changing status of {instance.name} to ERROR because it is an integration platform" + ) diff --git a/scripts/pylib/twister/twisterlib/testsuite.py b/scripts/pylib/twister/twisterlib/testsuite.py index 2f3fe134358..62637172c02 100644 --- a/scripts/pylib/twister/twisterlib/testsuite.py +++ b/scripts/pylib/twister/twisterlib/testsuite.py @@ -107,8 +107,13 @@ def scan_file(inf_name): if os.name == 'nt': mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} else: - mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ, - 'offset': 0} + mmap_args = { + 'fileno': inf.fileno(), + 'length': 0, + 'flags': mmap.MAP_PRIVATE, + 'prot': mmap.PROT_READ, + 'offset': 0 + } with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: regular_suite_regex_matches = \ @@ -130,13 +135,19 @@ def scan_file(inf_name): if regular_suite_regex_matches: ztest_suite_names = \ _extract_ztest_suite_names(regular_suite_regex_matches) - testcase_names, warnings = \ - _find_regular_ztest_testcases(main_c, regular_suite_regex_matches, has_registered_test_suites) + testcase_names, warnings = _find_regular_ztest_testcases( + main_c, + regular_suite_regex_matches, + has_registered_test_suites + ) elif registered_suite_regex_matches: ztest_suite_names = \ _extract_ztest_suite_names(registered_suite_regex_matches) - testcase_names, warnings = \ - _find_regular_ztest_testcases(main_c, registered_suite_regex_matches, has_registered_test_suites) + testcase_names, warnings = _find_regular_ztest_testcases( + main_c, + registered_suite_regex_matches, + has_registered_test_suites + ) elif new_suite_regex_matches or new_suite_testcase_regex_matches: ztest_suite_names = \ _extract_ztest_suite_names(new_suite_regex_matches) @@ -248,10 +259,15 @@ def _find_ztest_testcases(search_area, testcase_regex): """ testcase_regex_matches = \ [m for m in testcase_regex.finditer(search_area)] - testcase_names = \ - [(m.group("suite_name") if m.groupdict().get("suite_name") else b'', m.group("testcase_name")) \ - for m in testcase_regex_matches] - testcase_names = [(ts_name.decode("UTF-8"), tc_name.decode("UTF-8")) for ts_name, tc_name in testcase_names] + testcase_names = [ + ( + m.group("suite_name") if m.groupdict().get("suite_name") else b'', + m.group("testcase_name") + ) for m in testcase_regex_matches + ] + testcase_names = [ + (ts_name.decode("UTF-8"), tc_name.decode("UTF-8")) for ts_name, tc_name in testcase_names + ] warnings = None for testcase_name in testcase_names: if not testcase_name[1].startswith("test_"): @@ -424,7 +440,9 @@ class TestSuite(DisablePyTestCollectionMixin): self.id = name self.source_dir = suite_path - self.source_dir_rel = os.path.relpath(os.path.realpath(suite_path), start=canonical_zephyr_base) + self.source_dir_rel = os.path.relpath( + os.path.realpath(suite_path), start=canonical_zephyr_base + ) self.yamlfile = suite_path self.testcases = [] self.integration_platforms = [] @@ -455,7 +473,9 @@ class TestSuite(DisablePyTestCollectionMixin): setattr(self, k, v) if self.harness == 'console' and not self.harness_config: - raise Exception('Harness config error: console harness defined without a configuration.') + raise Exception( + 'Harness config error: console harness defined without a configuration.' + ) def add_subcases(self, data, parsed_subcases=None, suite_names=None): testcases = data.get("testcases", []) @@ -492,7 +512,9 @@ class TestSuite(DisablePyTestCollectionMixin): relative_ts_root = "" # workdir can be "." - unique = os.path.normpath(os.path.join(relative_ts_root, workdir, name)).replace(os.sep, '/') + unique = os.path.normpath( + os.path.join(relative_ts_root, workdir, name) + ).replace(os.sep, '/') return unique @staticmethod diff --git a/scripts/pylib/twister/twisterlib/twister_main.py b/scripts/pylib/twister/twisterlib/twister_main.py index 8b6f206fc51..5644b981131 100644 --- a/scripts/pylib/twister/twisterlib/twister_main.py +++ b/scripts/pylib/twister/twisterlib/twister_main.py @@ -70,7 +70,12 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): previous_results = None # Cleanup - if options.no_clean or options.only_failed or options.test_only or options.report_summary is not None: + if ( + options.no_clean + or options.only_failed + or options.test_only + or options.report_summary is not None + ): if os.path.exists(options.outdir): print("Keeping artifacts untouched") elif options.last_metrics: @@ -141,7 +146,8 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): if options.platform and not tplan.check_platform(i.platform, options.platform): continue logger.debug( - f"{i.platform.name:<25} {i.testsuite.name:<50} {Fore.YELLOW}SKIPPED{Fore.RESET}: {i.reason}" + f"{i.platform.name:<25} {i.testsuite.name:<50}" + f" {Fore.YELLOW}SKIPPED{Fore.RESET}: {i.reason}" ) report = Reporting(tplan, env)