diff --git a/scripts/pylib/twister/enviornment.py b/scripts/pylib/twister/enviornment.py index afc3f64c6e2..ee4e0172983 100644 --- a/scripts/pylib/twister/enviornment.py +++ b/scripts/pylib/twister/enviornment.py @@ -32,10 +32,13 @@ canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) class TwisterEnv: - def __init__(self) -> None: + def __init__(self, options) -> None: self.version = None self.toolchain = None + self.options = options + + def discover(self): self.check_zephyr_version() self.get_toolchain() diff --git a/scripts/pylib/twister/reports.py b/scripts/pylib/twister/reports.py new file mode 100644 index 00000000000..ab283242ce1 --- /dev/null +++ b/scripts/pylib/twister/reports.py @@ -0,0 +1,500 @@ +#!/usr/bin/env python3 +# vim: set syntax=python ts=4 : +# +# Copyright (c) 2018 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import json +import logging +from colorama import Fore +import xml.etree.ElementTree as ET +import string +from datetime import datetime + +logger = logging.getLogger('twister') +logger.setLevel(logging.DEBUG) + +class Reporting: + + def __init__(self, plan, env) -> None: + self.plan = plan #FIXME + self.instances = plan.instances + self.platforms = plan.platforms + self.selected_platforms = plan.selected_platforms + self.filtered_platforms = plan.filtered_platforms + self.env = env + self.timestamp = datetime.now().isoformat() + self.outdir = os.path.abspath(env.options.outdir) + + @staticmethod + def process_log(log_file): + filtered_string = "" + if os.path.exists(log_file): + with open(log_file, "rb") as f: + log = f.read().decode("utf-8") + filtered_string = ''.join(filter(lambda x: x in string.printable, log)) + + return filtered_string + + + @staticmethod + def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip): + fails, passes, errors, skips = stats + + if status in ['skipped', 'filtered']: + duration = 0 + + eleTestcase = ET.SubElement( + eleTestsuite, "testcase", + classname=classname, + name=f"{name}", + time=f"{duration}") + + if status in ['skipped', 'filtered']: + skips += 1 + # temporarily add build_only_as_skip to restore existing CI report behaviour + if ts_status == "passed" and not runnable: + tc_type = "build" + else: + tc_type = status + ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}") + elif status in ["failed", "blocked"]: + fails += 1 + el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}") + if log: + el.text = log + elif status == "error": + errors += 1 + el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}") + if log: + el.text = log + elif status == 'passed': + if not runnable and build_only_as_skip: + ET.SubElement(eleTestcase, 'skipped', type="build", message="built only") + skips += 1 + else: + passes += 1 + else: + if not status: + logger.debug(f"{name}: No status") + ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?") + else: + logger.error(f"{name}: Unknown status '{status}'") + + return (fails, passes, errors, skips) + + # Generate a report with all testsuites instead of doing this per platform + def xunit_report_suites(self, json_file, filename): + + json_data = {} + with open(json_file, "r") as json_results: + json_data = json.load(json_results) + + + env = json_data.get('environment', {}) + version = env.get('zephyr_version', None) + + eleTestsuites = ET.Element('testsuites') + all_suites = json_data.get("testsuites", []) + + suites_to_report = all_suites + # do not create entry if everything is filtered out + if not self.env.options.detailed_skipped_report: + suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites)) + + for suite in suites_to_report: + duration = 0 + eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', + name=suite.get("name"), time="0", + timestamp = self.timestamp, + tests="0", + failures="0", + errors="0", skipped="0") + eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') + # Multiple 'property' can be added to 'properties' + # differing by name and value + ET.SubElement(eleTSPropetries, 'property', name="version", value=version) + ET.SubElement(eleTSPropetries, 'property', name="platform", value=suite.get("platform")) + ET.SubElement(eleTSPropetries, 'property', name="architecture", value=suite.get("arch")) + + total = 0 + fails = passes = errors = skips = 0 + handler_time = suite.get('execution_time', 0) + runnable = suite.get('runnable', 0) + duration += float(handler_time) + ts_status = suite.get('status') + for tc in suite.get("testcases", []): + status = tc.get('status') + reason = tc.get('reason', suite.get('reason', 'Unknown')) + log = tc.get("log", suite.get("log")) + + tc_duration = tc.get('execution_time', handler_time) + name = tc.get("identifier") + classname = ".".join(name.split(".")[:2]) + fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, + name, classname, status, ts_status, reason, tc_duration, runnable, + (fails, passes, errors, skips), log, True) + + total = (errors + passes + fails + skips) + + eleTestsuite.attrib['time'] = f"{duration}" + eleTestsuite.attrib['failures'] = f"{fails}" + eleTestsuite.attrib['errors'] = f"{errors}" + eleTestsuite.attrib['skipped'] = f"{skips}" + eleTestsuite.attrib['tests'] = f"{total}" + + result = ET.tostring(eleTestsuites) + with open(filename, 'wb') as report: + report.write(result) + + def xunit_report(self, json_file, filename, selected_platform=None, full_report=False): + if selected_platform: + selected = [selected_platform] + logger.info(f"Writing target report for {selected_platform}...") + else: + logger.info(f"Writing xunit report {filename}...") + selected = self.selected_platforms + + json_data = {} + with open(json_file, "r") as json_results: + json_data = json.load(json_results) + + + env = json_data.get('environment', {}) + version = env.get('zephyr_version', None) + + eleTestsuites = ET.Element('testsuites') + all_suites = json_data.get("testsuites", []) + + for platform in selected: + suites = list(filter(lambda d: d['platform'] == platform, all_suites)) + # do not create entry if everything is filtered out + if not self.env.options.detailed_skipped_report: + non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites)) + if not non_filtered: + continue + + duration = 0 + eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', + name=platform, + timestamp = self.timestamp, + time="0", + tests="0", + failures="0", + errors="0", skipped="0") + eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') + # Multiple 'property' can be added to 'properties' + # differing by name and value + ET.SubElement(eleTSPropetries, 'property', name="version", value=version) + + total = 0 + fails = passes = errors = skips = 0 + for ts in suites: + handler_time = ts.get('execution_time', 0) + runnable = ts.get('runnable', 0) + duration += float(handler_time) + + ts_status = ts.get('status') + # Do not report filtered testcases + if ts_status == 'filtered' and not self.env.options.detailed_skipped_report: + continue + if full_report: + for tc in ts.get("testcases", []): + status = tc.get('status') + reason = tc.get('reason', ts.get('reason', 'Unknown')) + log = tc.get("log", ts.get("log")) + + tc_duration = tc.get('execution_time', handler_time) + name = tc.get("identifier") + classname = ".".join(name.split(".")[:2]) + fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, + name, classname, status, ts_status, reason, tc_duration, runnable, + (fails, passes, errors, skips), log, True) + else: + reason = ts.get('reason', 'Unknown') + name = ts.get("name") + classname = f"{platform}:{name}" + log = ts.get("log") + fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, + name, classname, ts_status, ts_status, reason, duration, runnable, + (fails, passes, errors, skips), log, False) + + total = (errors + passes + fails + skips) + + eleTestsuite.attrib['time'] = f"{duration}" + eleTestsuite.attrib['failures'] = f"{fails}" + eleTestsuite.attrib['errors'] = f"{errors}" + eleTestsuite.attrib['skipped'] = f"{skips}" + eleTestsuite.attrib['tests'] = f"{total}" + + result = ET.tostring(eleTestsuites) + with open(filename, 'wb') as report: + report.write(result) + + def json_report(self, filename, version="NA"): + logger.info(f"Writing JSON report {filename}") + report = {} + report["environment"] = {"os": os.name, + "zephyr_version": version, + "toolchain": self.env.toolchain + } + suites = [] + + for instance in self.instances.values(): + suite = {} + handler_log = os.path.join(instance.build_dir, "handler.log") + build_log = os.path.join(instance.build_dir, "build.log") + device_log = os.path.join(instance.build_dir, "device.log") + + handler_time = instance.metrics.get('handler_time', 0) + ram_size = instance.metrics.get ("ram_size", 0) + rom_size = instance.metrics.get("rom_size",0) + suite = { + "name": instance.testsuite.name, + "arch": instance.platform.arch, + "platform": instance.platform.name, + } + if instance.run_id: + suite['run_id'] = instance.run_id + + suite["runnable"] = False + if instance.status != 'filtered': + suite["runnable"] = instance.run + + if ram_size: + suite["ram_size"] = ram_size + if rom_size: + suite["rom_size"] = rom_size + + if instance.status in ["error", "failed"]: + suite['status'] = instance.status + suite["reason"] = instance.reason + # FIXME + if os.path.exists(handler_log): + suite["log"] = self.process_log(handler_log) + elif os.path.exists(device_log): + suite["log"] = self.process_log(device_log) + else: + suite["log"] = self.process_log(build_log) + elif instance.status == 'filtered': + suite["status"] = "filtered" + suite["reason"] = instance.reason + elif instance.status == 'passed': + suite["status"] = "passed" + elif instance.status == 'skipped': + suite["status"] = "skipped" + suite["reason"] = instance.reason + + if instance.status is not None: + suite["execution_time"] = f"{float(handler_time):.2f}" + + testcases = [] + + if len(instance.testcases) == 1: + single_case_duration = f"{float(handler_time):.2f}" + else: + single_case_duration = 0 + + for case in instance.testcases: + # freeform was set when no sub testcases were parsed, however, + # if we discover those at runtime, the fallback testcase wont be + # needed anymore and can be removed from the output, it does + # not have a status and would otherwise be reported as skipped. + if case.freeform and case.status is None and len(instance.testcases) > 1: + continue + testcase = {} + testcase['identifier'] = case.name + if instance.status: + if single_case_duration: + testcase['execution_time'] = single_case_duration + else: + testcase['execution_time'] = f"{float(case.duration):.2f}" + + if case.output != "": + testcase['log'] = case.output + + if case.status == "skipped": + if instance.status == "filtered": + testcase["status"] = "filtered" + else: + testcase["status"] = "skipped" + testcase["reason"] = case.reason or instance.reason + else: + testcase["status"] = case.status + if case.reason: + testcase["reason"] = case.reason + + testcases.append(testcase) + + suite['testcases'] = testcases + suites.append(suite) + + report["testsuites"] = suites + with open(filename, "wt") as json_file: + json.dump(report, json_file, indent=4, separators=(',',':')) + + + def compare_metrics(self, filename): + # name, datatype, lower results better + interesting_metrics = [("ram_size", int, True), + ("rom_size", int, True)] + + if not os.path.exists(filename): + logger.error("Cannot compare metrics, %s not found" % filename) + return [] + + results = [] + saved_metrics = {} + with open(filename) as fp: + jt = json.load(fp) + for ts in jt.get("testsuites", []): + d = {} + for m, _, _ in interesting_metrics: + d[m] = ts.get(m, 0) + ts_name = ts.get('name') + ts_platform = ts.get('platform') + saved_metrics[(ts_name, ts_platform)] = d + + for instance in self.instances.values(): + mkey = (instance.testsuite.name, instance.platform.name) + if mkey not in saved_metrics: + continue + sm = saved_metrics[mkey] + for metric, mtype, lower_better in interesting_metrics: + if metric not in instance.metrics: + continue + if sm[metric] == "": + continue + delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) + if delta == 0: + continue + results.append((instance, metric, instance.metrics.get(metric, 0), delta, + lower_better)) + return results + + def footprint_reports(self, report, show_footprint, all_deltas, + footprint_threshold, last_metrics): + if not report: + return + + logger.debug("running footprint_reports") + deltas = self.compare_metrics(report) + warnings = 0 + if deltas and show_footprint: + for i, metric, value, delta, lower_better in deltas: + if not all_deltas and ((delta < 0 and lower_better) or + (delta > 0 and not lower_better)): + continue + + percentage = 0 + if value > delta: + percentage = (float(delta) / float(value - delta)) + + if not all_deltas and (percentage < (footprint_threshold / 100.0)): + continue + + logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( + i.platform.name, i.testsuite.name, Fore.YELLOW, + "INFO" if all_deltas else "WARNING", Fore.RESET, + metric, delta, value, percentage)) + warnings += 1 + + if warnings: + logger.warning("Deltas based on metrics from last %s" % + ("release" if not last_metrics else "run")) + + def summary(self, results, unrecognized_sections, duration): + failed = 0 + run = 0 + for instance in self.instances.values(): + if instance.status == "failed": + failed += 1 + elif instance.metrics.get("unrecognized") and not unrecognized_sections: + logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % + (Fore.RED, Fore.RESET, instance.name, + str(instance.metrics.get("unrecognized", [])))) + failed += 1 + + # FIXME: need a better way to identify executed tests + handler_time = instance.metrics.get('handler_time', 0) + if float(handler_time) > 0: + run += 1 + + if results.total and results.total != results.skipped_configs: + pass_rate = (float(results.passed) / float(results.total - results.skipped_configs)) + else: + pass_rate = 0 + + logger.info( + "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( + Fore.RED if failed else Fore.GREEN, + results.passed, + results.total, + Fore.RESET, + pass_rate, + Fore.RED if results.failed else Fore.RESET, + results.failed + results.error, + Fore.RESET, + results.skipped_configs, + Fore.YELLOW if self.plan.warnings else Fore.RESET, + self.plan.warnings, + Fore.RESET, + duration)) + + total_platforms = len(self.platforms) + # if we are only building, do not report about tests being executed. + if self.platforms and not self.env.options.build_only: + logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format( + results.cases - results.skipped_cases, + results.skipped_cases, + len(self.filtered_platforms), + total_platforms, + (100 * len(self.filtered_platforms) / len(self.platforms)) + )) + + built_only = results.total - run - results.skipped_configs + logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ +{Fore.RED}{built_only}{Fore.RESET} test configurations were only built.") + + def save_reports(self, name, suffix, report_dir, no_update, platform_reports): + if not self.instances: + return + + logger.info("Saving reports...") + if name: + report_name = name + else: + report_name = "twister" + + if report_dir: + os.makedirs(report_dir, exist_ok=True) + filename = os.path.join(report_dir, report_name) + outdir = report_dir + else: + outdir = self.outdir + filename = os.path.join(outdir, report_name) + + if suffix: + filename = "{}_{}".format(filename, suffix) + + if not no_update: + json_file = filename + ".json" + self.json_report(json_file, version=self.env.version) + self.xunit_report(json_file, filename + ".xml", full_report=False) + self.xunit_report(json_file, filename + "_report.xml", full_report=True) + self.xunit_report_suites(json_file, filename + "_suite_report.xml") + + if platform_reports: + self.target_report(json_file, outdir, suffix) + + + def target_report(self, json_file, outdir, suffix): + platforms = {inst.platform.name for _, inst in self.instances.items()} + for platform in platforms: + if suffix: + filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix)) + else: + filename = os.path.join(outdir,"{}.xml".format(platform)) + self.xunit_report(json_file, filename, platform, full_report=True) + diff --git a/scripts/pylib/twister/twisterlib.py b/scripts/pylib/twister/twisterlib.py index e13fa892779..785c2cc4ce6 100755 --- a/scripts/pylib/twister/twisterlib.py +++ b/scripts/pylib/twister/twisterlib.py @@ -38,6 +38,7 @@ from typing import List from cmakecache import CMakeCache from testsuite import TestCase, TestSuite +from error import TwisterRuntimeError try: # Use the C LibYAML parser if available, rather than the Python parser. @@ -2474,11 +2475,9 @@ class TestPlan: self.enable_ubsan = False self.enable_lsan = False self.enable_asan = False - self.detailed_skipped_report = False self.enable_valgrind = False self.extra_args = [] self.inline_logs = False - self.enable_sizes_report = False self.west_flash = None self.west_runner = None self.generator = None @@ -2502,9 +2501,7 @@ class TestPlan: self.load_errors = 0 self.instances = dict() - self.total_platforms = 0 self.start_time = 0 - self.duration = 0 self.warnings = 0 # hardcoded for now @@ -2521,7 +2518,6 @@ class TestPlan: self.modules = [] - self.timestamp = datetime.now().isoformat() def get_platform_instances(self, platform): filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + os.sep)} @@ -2549,169 +2545,6 @@ class TestPlan: results.error += 1 results.done += 1 - def compare_metrics(self, filename): - # name, datatype, lower results better - interesting_metrics = [("ram_size", int, True), - ("rom_size", int, True)] - - if not os.path.exists(filename): - logger.error("Cannot compare metrics, %s not found" % filename) - return [] - - results = [] - saved_metrics = {} - with open(filename) as fp: - jt = json.load(fp) - for ts in jt.get("testsuites", []): - d = {} - for m, _, _ in interesting_metrics: - d[m] = ts.get(m, 0) - ts_name = ts.get('name') - ts_platform = ts.get('platform') - saved_metrics[(ts_name, ts_platform)] = d - - for instance in self.instances.values(): - mkey = (instance.testsuite.name, instance.platform.name) - if mkey not in saved_metrics: - continue - sm = saved_metrics[mkey] - for metric, mtype, lower_better in interesting_metrics: - if metric not in instance.metrics: - continue - if sm[metric] == "": - continue - delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) - if delta == 0: - continue - results.append((instance, metric, instance.metrics.get(metric, 0), delta, - lower_better)) - return results - - def footprint_reports(self, report, show_footprint, all_deltas, - footprint_threshold, last_metrics): - if not report: - return - - logger.debug("running footprint_reports") - deltas = self.compare_metrics(report) - warnings = 0 - if deltas and show_footprint: - for i, metric, value, delta, lower_better in deltas: - if not all_deltas and ((delta < 0 and lower_better) or - (delta > 0 and not lower_better)): - continue - - percentage = 0 - if value > delta: - percentage = (float(delta) / float(value - delta)) - - if not all_deltas and (percentage < (footprint_threshold / 100.0)): - continue - - logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( - i.platform.name, i.testsuite.name, Fore.YELLOW, - "INFO" if all_deltas else "WARNING", Fore.RESET, - metric, delta, value, percentage)) - warnings += 1 - - if warnings: - logger.warning("Deltas based on metrics from last %s" % - ("release" if not last_metrics else "run")) - - def summary(self, results, unrecognized_sections): - failed = 0 - run = 0 - for instance in self.instances.values(): - if instance.status == "failed": - failed += 1 - elif instance.metrics.get("unrecognized") and not unrecognized_sections: - logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % - (Fore.RED, Fore.RESET, instance.name, - str(instance.metrics.get("unrecognized", [])))) - failed += 1 - - # FIXME: need a better way to identify executed tests - handler_time = instance.metrics.get('handler_time', 0) - if float(handler_time) > 0: - run += 1 - - if results.total and results.total != results.skipped_configs: - pass_rate = (float(results.passed) / float(results.total - results.skipped_configs)) - else: - pass_rate = 0 - - logger.info( - "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( - Fore.RED if failed else Fore.GREEN, - results.passed, - results.total, - Fore.RESET, - pass_rate, - Fore.RED if results.failed else Fore.RESET, - results.failed + results.error, - Fore.RESET, - results.skipped_configs, - Fore.YELLOW if self.warnings else Fore.RESET, - self.warnings, - Fore.RESET, - self.duration)) - - self.total_platforms = len(self.platforms) - # if we are only building, do not report about tests being executed. - if self.platforms and not self.build_only: - logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format( - results.cases - results.skipped_cases, - results.skipped_cases, - len(self.filtered_platforms), - self.total_platforms, - (100 * len(self.filtered_platforms) / len(self.platforms)) - )) - - built_only = results.total - run - results.skipped_configs - logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ -{Fore.RED}{built_only}{Fore.RESET} test configurations were only built.") - - def save_reports(self, name, suffix, report_dir, no_update, platform_reports): - if not self.instances: - return - - logger.info("Saving reports...") - if name: - report_name = name - else: - report_name = "twister" - - if report_dir: - os.makedirs(report_dir, exist_ok=True) - filename = os.path.join(report_dir, report_name) - outdir = report_dir - else: - filename = os.path.join(self.outdir, report_name) - outdir = self.outdir - - if suffix: - filename = "{}_{}".format(filename, suffix) - - if not no_update: - json_file = filename + ".json" - self.json_report(json_file, version=self.env.version) - self.xunit_report(json_file, filename + ".xml", full_report=False) - self.xunit_report(json_file, filename + "_report.xml", full_report=True) - self.xunit_report_suites(json_file, filename + "_suite_report.xml") - - if platform_reports: - self.target_report(json_file, outdir, suffix) - - - def target_report(self, json_file, outdir, suffix): - platforms = {inst.platform.name for _, inst in self.instances.items()} - for platform in platforms: - if suffix: - filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix)) - else: - filename = os.path.join(outdir,"{}.xml".format(platform)) - self.xunit_report(json_file, filename, platform, full_report=True) - def add_configurations(self): @@ -3519,313 +3352,6 @@ class TestPlan: return results - @staticmethod - def process_log(log_file): - filtered_string = "" - if os.path.exists(log_file): - with open(log_file, "rb") as f: - log = f.read().decode("utf-8") - filtered_string = ''.join(filter(lambda x: x in string.printable, log)) - - return filtered_string - - - @staticmethod - def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip): - fails, passes, errors, skips = stats - - if status in ['skipped', 'filtered']: - duration = 0 - - eleTestcase = ET.SubElement( - eleTestsuite, "testcase", - classname=classname, - name=f"{name}", - time=f"{duration}") - - if status in ['skipped', 'filtered']: - skips += 1 - # temporarily add build_only_as_skip to restore existing CI report behaviour - if ts_status == "passed" and not runnable: - tc_type = "build" - else: - tc_type = status - ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}") - elif status in ["failed", "blocked"]: - fails += 1 - el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}") - if log: - el.text = log - elif status == "error": - errors += 1 - el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}") - if log: - el.text = log - elif status == 'passed': - if not runnable and build_only_as_skip: - ET.SubElement(eleTestcase, 'skipped', type="build", message="built only") - skips += 1 - else: - passes += 1 - else: - if not status: - logger.debug(f"{name}: No status") - ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?") - else: - logger.error(f"{name}: Unknown status '{status}'") - - return (fails, passes, errors, skips) - - # Generate a report with all testsuites instead of doing this per platform - def xunit_report_suites(self, json_file, filename): - - json_data = {} - with open(json_file, "r") as json_results: - json_data = json.load(json_results) - - - env = json_data.get('environment', {}) - version = env.get('zephyr_version', None) - - eleTestsuites = ET.Element('testsuites') - all_suites = json_data.get("testsuites", []) - - suites_to_report = all_suites - # do not create entry if everything is filtered out - if not self.detailed_skipped_report: - suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites)) - - for suite in suites_to_report: - duration = 0 - eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', - name=suite.get("name"), time="0", - timestamp = self.timestamp, - tests="0", - failures="0", - errors="0", skipped="0") - eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') - # Multiple 'property' can be added to 'properties' - # differing by name and value - ET.SubElement(eleTSPropetries, 'property', name="version", value=version) - ET.SubElement(eleTSPropetries, 'property', name="platform", value=suite.get("platform")) - ET.SubElement(eleTSPropetries, 'property', name="architecture", value=suite.get("arch")) - - total = 0 - fails = passes = errors = skips = 0 - handler_time = suite.get('execution_time', 0) - runnable = suite.get('runnable', 0) - duration += float(handler_time) - ts_status = suite.get('status') - for tc in suite.get("testcases", []): - status = tc.get('status') - reason = tc.get('reason', suite.get('reason', 'Unknown')) - log = tc.get("log", suite.get("log")) - - tc_duration = tc.get('execution_time', handler_time) - name = tc.get("identifier") - classname = ".".join(name.split(".")[:2]) - fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, - name, classname, status, ts_status, reason, tc_duration, runnable, - (fails, passes, errors, skips), log, True) - - total = (errors + passes + fails + skips) - - eleTestsuite.attrib['time'] = f"{duration}" - eleTestsuite.attrib['failures'] = f"{fails}" - eleTestsuite.attrib['errors'] = f"{errors}" - eleTestsuite.attrib['skipped'] = f"{skips}" - eleTestsuite.attrib['tests'] = f"{total}" - - result = ET.tostring(eleTestsuites) - with open(filename, 'wb') as report: - report.write(result) - - def xunit_report(self, json_file, filename, selected_platform=None, full_report=False): - if selected_platform: - selected = [selected_platform] - logger.info(f"Writing target report for {selected_platform}...") - else: - logger.info(f"Writing xunit report {filename}...") - selected = self.selected_platforms - - json_data = {} - with open(json_file, "r") as json_results: - json_data = json.load(json_results) - - - env = json_data.get('environment', {}) - version = env.get('zephyr_version', None) - - eleTestsuites = ET.Element('testsuites') - all_suites = json_data.get("testsuites", []) - - for platform in selected: - suites = list(filter(lambda d: d['platform'] == platform, all_suites)) - # do not create entry if everything is filtered out - if not self.detailed_skipped_report: - non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites)) - if not non_filtered: - continue - - duration = 0 - eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', - name=platform, - timestamp = self.timestamp, - time="0", - tests="0", - failures="0", - errors="0", skipped="0") - eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') - # Multiple 'property' can be added to 'properties' - # differing by name and value - ET.SubElement(eleTSPropetries, 'property', name="version", value=version) - - total = 0 - fails = passes = errors = skips = 0 - for ts in suites: - handler_time = ts.get('execution_time', 0) - runnable = ts.get('runnable', 0) - duration += float(handler_time) - - ts_status = ts.get('status') - # Do not report filtered testcases - if ts_status == 'filtered' and not self.detailed_skipped_report: - continue - if full_report: - for tc in ts.get("testcases", []): - status = tc.get('status') - reason = tc.get('reason', ts.get('reason', 'Unknown')) - log = tc.get("log", ts.get("log")) - - tc_duration = tc.get('execution_time', handler_time) - name = tc.get("identifier") - classname = ".".join(name.split(".")[:2]) - fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, - name, classname, status, ts_status, reason, tc_duration, runnable, - (fails, passes, errors, skips), log, True) - else: - reason = ts.get('reason', 'Unknown') - name = ts.get("name") - classname = f"{platform}:{name}" - log = ts.get("log") - fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, - name, classname, ts_status, ts_status, reason, duration, runnable, - (fails, passes, errors, skips), log, False) - - total = (errors + passes + fails + skips) - - eleTestsuite.attrib['time'] = f"{duration}" - eleTestsuite.attrib['failures'] = f"{fails}" - eleTestsuite.attrib['errors'] = f"{errors}" - eleTestsuite.attrib['skipped'] = f"{skips}" - eleTestsuite.attrib['tests'] = f"{total}" - - result = ET.tostring(eleTestsuites) - with open(filename, 'wb') as report: - report.write(result) - - def json_report(self, filename, version="NA"): - logger.info(f"Writing JSON report {filename}") - report = {} - report["environment"] = {"os": os.name, - "zephyr_version": version, - "toolchain": self.env.toolchain - } - suites = [] - - for instance in self.instances.values(): - suite = {} - handler_log = os.path.join(instance.build_dir, "handler.log") - build_log = os.path.join(instance.build_dir, "build.log") - device_log = os.path.join(instance.build_dir, "device.log") - - handler_time = instance.metrics.get('handler_time', 0) - ram_size = instance.metrics.get ("ram_size", 0) - rom_size = instance.metrics.get("rom_size",0) - suite = { - "name": instance.testsuite.name, - "arch": instance.platform.arch, - "platform": instance.platform.name, - } - if instance.run_id: - suite['run_id'] = instance.run_id - - suite["runnable"] = False - if instance.status != 'filtered': - suite["runnable"] = instance.run - - if ram_size: - suite["ram_size"] = ram_size - if rom_size: - suite["rom_size"] = rom_size - - if instance.status in ["error", "failed"]: - suite['status'] = instance.status - suite["reason"] = instance.reason - # FIXME - if os.path.exists(handler_log): - suite["log"] = self.process_log(handler_log) - elif os.path.exists(device_log): - suite["log"] = self.process_log(device_log) - else: - suite["log"] = self.process_log(build_log) - elif instance.status == 'filtered': - suite["status"] = "filtered" - suite["reason"] = instance.reason - elif instance.status == 'passed': - suite["status"] = "passed" - elif instance.status == 'skipped': - suite["status"] = "skipped" - suite["reason"] = instance.reason - - if instance.status is not None: - suite["execution_time"] = f"{float(handler_time):.2f}" - - testcases = [] - - if len(instance.testcases) == 1: - single_case_duration = f"{float(handler_time):.2f}" - else: - single_case_duration = 0 - - for case in instance.testcases: - # freeform was set when no sub testcases were parsed, however, - # if we discover those at runtime, the fallback testcase wont be - # needed anymore and can be removed from the output, it does - # not have a status and would otherwise be reported as skipped. - if case.freeform and case.status is None and len(instance.testcases) > 1: - continue - testcase = {} - testcase['identifier'] = case.name - if instance.status: - if single_case_duration: - testcase['execution_time'] = single_case_duration - else: - testcase['execution_time'] = f"{float(case.duration):.2f}" - - if case.output != "": - testcase['log'] = case.output - - if case.status == "skipped": - if instance.status == "filtered": - testcase["status"] = "filtered" - else: - testcase["status"] = "skipped" - testcase["reason"] = case.reason or instance.reason - else: - testcase["status"] = case.status - if case.reason: - testcase["reason"] = case.reason - - testcases.append(testcase) - - suite['testcases'] = testcases - suites.append(suite) - - report["testsuites"] = suites - with open(filename, "wt") as json_file: - json.dump(report, json_file, indent=4, separators=(',',':')) - def get_testsuite(self, identifier): results = [] for _, ts in self.testsuites.items(): diff --git a/scripts/twister b/scripts/twister index b372dfdc249..5fd9ef7da03 100755 --- a/scripts/twister +++ b/scripts/twister @@ -208,6 +208,7 @@ sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister")) import twisterlib from twisterlib import HardwareMap, TestPlan, SizeCalculator, CoverageTool, ExecutionCounter from enviornment import TwisterEnv, canonical_zephyr_base +from reports import Reporting logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) @@ -911,7 +912,8 @@ def main(): logger.error("You have provided a wrong subset value: %s." % options.subset) return - env = TwisterEnv() + env = TwisterEnv(options) + env.discover() tplan = TestPlan(options.board_root, options.testsuite_root, env, options.outdir) # Set testplan options from command line. @@ -940,7 +942,6 @@ def main(): tplan.overflow_as_errors = options.overflow_as_errors tplan.suite_name_check = not options.disable_suite_name_check tplan.seed = options.seed - tplan.detailed_skipped_report = options.detailed_skipped_report # get all enabled west projects west_proj = west_projects() @@ -1241,11 +1242,11 @@ def main(): tplan.instances.update(skipped) tplan.instances.update(errors) - - tplan.json_report(os.path.join(options.outdir, "testplan.json")) + report = Reporting(tplan, env) + report.json_report(os.path.join(options.outdir, "testplan.json")) if options.save_tests: - tplan.json_report(options.save_tests) + report.json_report(options.save_tests) return if options.device_testing and not options.build_only: @@ -1319,17 +1320,17 @@ def main(): elif options.last_metrics: report_to_use = previous_results_file - tplan.footprint_reports(report_to_use, + report.footprint_reports(report_to_use, options.show_footprint, options.all_deltas, options.footprint_threshold, options.last_metrics) - tplan.duration = time.time() - start_time + duration = time.time() - start_time results.summary() - tplan.summary(results, options.disable_unrecognized_section_test) + report.summary(results, options.disable_unrecognized_section_test, duration) if options.coverage: if not options.gcov_tool: @@ -1365,7 +1366,7 @@ def main(): table.append(row) print(tabulate(table, headers=header, tablefmt="github")) - tplan.save_reports(options.report_name, + report.save_reports(options.report_name, options.report_suffix, options.report_dir, options.no_update,