diff --git a/scripts/ci/twister_ignore.txt b/scripts/ci/twister_ignore.txt index abcdc774ed4..96dfdfd83d5 100644 --- a/scripts/ci/twister_ignore.txt +++ b/scripts/ci/twister_ignore.txt @@ -43,6 +43,7 @@ scripts/ci/errno.py scripts/ci/upload_test_results_es.py scripts/ci/what_changed.py scripts/ci/version_mgr.py +scripts/ci/twister_report_analyzer.py scripts/ci/stats/* scripts/requirements* scripts/checkpatch/* diff --git a/scripts/ci/twister_report_analyzer.py b/scripts/ci/twister_report_analyzer.py new file mode 100755 index 00000000000..74ee9862c11 --- /dev/null +++ b/scripts/ci/twister_report_analyzer.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Nordic Semiconductor NA +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import argparse +import csv +import json +import logging +import os +import textwrap +from dataclasses import asdict, dataclass, field, is_dataclass + +logger = logging.getLogger(__name__) + + +def create_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + allow_abbrev=False, + description='Analyzes Twister JSON reports', + epilog=( + textwrap.dedent(""" + Example usage: + To analyze errors with predefined CMake and Build error patterns, run: + > python %(prog)s twister_reports/*.json --long-summary + The summary will be saved to twister_report_summary.json file unless --output option is used. + To save error summary to CSV file, use --output-csv option (number of test files is limited to 100): + > python %(prog)s twister_reports/*.json --output-csv twister_report_summary.csv + One can use --error-patterns option to provide custom error patterns file: + > python %(prog)s **/twister.json --error-patterns error_patterns.txt + """) # noqa E501 + ), + ) + parser.add_argument('inputs', type=str, nargs="+", help='twister.json files to read') + parser.add_argument( + '--error-patterns', + type=str, + help='text file with custom error patterns, ' 'each entry must be separated by newlines', + ) + parser.add_argument( + '--output', + type=str, + default='twister_report_summary.json', + help='output json file name, default: twister_report_summary.json', + ) + parser.add_argument('--output-csv', type=str, help='output csv file name') + parser.add_argument( + '--output-md', type=str, help='output markdown file name to store table with errors' + ) + parser.add_argument( + '--status', + action='store_true', + help='add statuses of testsuites and testcases to the summary', + ) + parser.add_argument( + '--platforms', + action='store_true', + help='add errors per platform to the summary', + ) + parser.add_argument( + '--long-summary', + action='store_true', + help='show all matched errors grouped by reason, otherwise show only most common errors', + ) + + parser.add_argument( + '-ll', + '--log-level', + type=str.upper, + default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + ) + return parser + + +@dataclass +class Counters: + counters: dict[str, TestCollection] = field(default_factory=dict) + + def add_counter(self, key: str, test: str = '') -> None: + self.counters[key] = self.counters.get(key, TestCollection()) + self.counters[key].append(test) + + def print_counters(self, indent: int = 0): + for key, value in self.counters.items(): + print(f'{" " * indent}{value.quantity:4} {key}') + if value.subcounters.counters: + value.subcounters.print_counters(indent + 4) + + def sort_by_quantity(self): + self.counters = dict( + sorted(self.counters.items(), key=lambda item: item[1].quantity, reverse=True) + ) + for value in self.counters.values(): + if value.subcounters.counters: + value.subcounters.sort_by_quantity() + + def get_next_entry(self, depth: int = 0, max_depth: int = 10): + for key, value in self.counters.items(): + # limit number of test files to 100 to not exceed CSV cell limit + yield depth, value.quantity, key, ', '.join(value.tests[0:100]) + if value.subcounters.counters and depth < max_depth: + yield from value.subcounters.get_next_entry(depth + 1, max_depth) + + def _flatten(self): + """ + Yield all deepest counters in a flat structure. + Deepest counters refer to those counters which + do not contain any further nested subcounters. + """ + for key, value in self.counters.items(): + if value.subcounters.counters: + yield from value.subcounters._flatten() + else: + yield key, value + + def get_most_common(self, n: int = 10): + return dict(sorted(self._flatten(), key=lambda item: item[1].quantity, reverse=True)[:n]) + + +@dataclass +class TestCollection: + quantity: int = 0 + tests: list[str] = field(default_factory=list) + subcounters: Counters = field(default_factory=Counters) + + def append(self, test: str = ''): + self.quantity += 1 + if test: + self.tests.append(test) + + +class TwisterReports: + def __init__(self): + self.status: Counters = Counters() + self.errors: Counters = Counters() + self.platforms: Counters = Counters() + + def parse_report(self, json_filename): + logger.info(f'Process {json_filename}') + with open(json_filename) as json_results: + json_data = json.load(json_results) + + for ts in json_data.get('testsuites', []): + self.parse_statuses(ts) + + for ts in json_data.get('testsuites', []): + self.parse_testsuite(ts) + + def parse_statuses(self, testsuite): + ts_status = testsuite.get('status', 'no status in testsuite') + self.status.add_counter(ts_status) + # Process testcases + for tc in testsuite.get('testcases', []): + tc_status = tc.get('status') + self.status.counters[ts_status].subcounters.add_counter(tc_status) + + def parse_testsuite(self, testsuite): + ts_status = testsuite.get('status') or 'no status in testsuite' + if ts_status not in ('error', 'failed'): + return + + ts_reason = testsuite.get('reason') or 'Unknown reason' + self.errors.add_counter(ts_reason) + ts_platform = testsuite.get('platform') or 'Unknown platform' + self.platforms.add_counter(ts_platform) + ts_log = testsuite.get('log') + test_identifier = f'{testsuite.get("platform")}:{testsuite.get("name")}' + + matched = self._parse_ts_error_log( + self.errors.counters[ts_reason].subcounters, ts_reason, ts_log, test_identifier + ) + + # Process testcases + for tc in testsuite.get('testcases', []): + tc_reason = tc.get('reason') + tc_log = tc.get('log') + if tc_reason and tc_log: + self.errors.counters[ts_reason].subcounters.add_counter(tc_reason, test_identifier) + matched = True + + if not matched: + self.errors.counters[ts_reason].tests.append(test_identifier) + + def _parse_ts_error_log( + self, counters: Counters, reason: str, log: str, test: str = '' + ) -> bool: + if reason == 'CMake build failure': + if error_key := self._parse_cmake_build_failure(log): + counters.add_counter(error_key, test) + return True + elif reason == 'Build failure': # noqa SIM102 + if error_key := self._parse_build_failure(log): + counters.add_counter(error_key, test) + return True + return False + + def _parse_cmake_build_failure(self, log: str) -> str | None: + last_warning = 'no warning found' + lines = log.splitlines() + for i, line in enumerate(lines): + if "warning: " in line: + last_warning = line + elif "devicetree error: " in line: + return "devicetree error" + elif "fatal error: " in line: + return line[line.index('fatal error: ') :].strip() + elif "error: " in line: # error: Aborting due to Kconfig warnings + if "undefined symbol" in last_warning: + return last_warning[last_warning.index('undefined symbol') :].strip() + return last_warning + elif "CMake Error at" in line: + for next_line in lines[i + 1 :]: + if next_line.strip(): + return line + ' ' + next_line + return line + return "No matching CMake error pattern found" + + def _parse_build_failure(self, log: str) -> str | None: + last_warning = '' + lines = log.splitlines() + for i, line in enumerate(lines): + if "undefined reference" in line: + return line[line.index('undefined reference') :].strip() + elif "error: ld returned" in line: + if last_warning: + return last_warning + elif "overflowed by" in lines[i - 1]: + return "ld.bfd: region overflowed" + elif "ld.bfd: warning: " in lines[i - 1]: + return "ld.bfd:" + lines[i - 1].split("ld.bfd:", 1)[-1] + return line + elif "error: " in line: + return line[line.index('error: ') :].strip() + elif ": in function " in line: + last_warning = line[line.index('in function') :].strip() + return "No matching build error pattern found" + + def sort_counters(self): + self.status.sort_by_quantity() + self.platforms.sort_by_quantity() + self.errors.sort_by_quantity() + + +class TwisterReportsWithPatterns(TwisterReports): + def __init__(self, error_patterns_file): + super().__init__() + self.error_patterns = [] + self.add_error_patterns(error_patterns_file) + + def add_error_patterns(self, filename): + with open(filename) as f: + self.error_patterns = [ + line + for line in f.read().splitlines() + if line.strip() and not line.strip().startswith('#') + ] + logger.info(f'Loaded {len(self.error_patterns)} error patterns from {filename}') + + def parse_testsuite(self, testsuite): + ts_status = testsuite.get('status') or 'no status in testsuite' + if ts_status not in ('error', 'failed'): + return + + ts_reason = testsuite.get('reason') or 'Unknown reason' + self.errors.add_counter(ts_reason) + ts_log = testsuite.get('log') + test_identifier = f'{testsuite.get("platform")}:{testsuite.get("name")}' + self._parse_log_with_error_paterns( + self.errors.counters[ts_reason].subcounters, ts_log, test_identifier + ) + # Process testcases + for tc in testsuite.get('testcases', []): + tc_reason = tc.get('reason') + tc_log = tc.get('log') + if tc_reason and tc_log: + self.errors.counters[ts_reason].subcounters.add_counter(tc_reason) + self._parse_log_with_error_paterns( + self.errors.counters[ts_reason].subcounters.counters[tc_reason].subcounters, + tc_log, + test_identifier, + ) + + def _parse_log_with_error_paterns(self, counters: Counters, log: str, test: str = ''): + for line in log.splitlines(): + for error_pattern in self.error_patterns: + if error_pattern in line: + logger.debug(f'Matched: {error_pattern} in {line}') + counters.add_counter(error_pattern, test) + return + + +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if is_dataclass(o): + return asdict(o) + return super().default(o) + + +def dump_to_json(filename, data): + with open(filename, 'w') as f: + json.dump(data, f, indent=4, cls=EnhancedJSONEncoder) + logger.info(f'Data saved to {filename}') + + +def dump_to_csv(filename, data: Counters): + with open(filename, 'w', newline='') as csvfile: + csvwriter = csv.writer(csvfile) + # Write headers + csvwriter.writerow(['Depth', 'Counter', 'Key', 'Tests']) + # Write error data + for csv_data in data.get_next_entry(): + csvwriter.writerow(csv_data) + logger.info(f'Data saved to {filename}') + + +def dump_markdown_table(filename, data: Counters, max_depth=2): + with open(filename, 'w', newline='') as md: + for depth, quantity, key, _ in data.get_next_entry(max_depth=max_depth): + if depth == 0: + md.write('\n') + md.write(f'| {quantity:4} | {key} |\n') + if depth == 0: + md.write('|-------|------|\n') + logger.info(f'Markdown table saved to {filename}') + + +def summary_with_most_common_errors(errors: Counters, limit: int = 15): + print('\nMost common errors summary:') + for key, value in errors.get_most_common(n=limit).items(): + print(f'{value.quantity:4} {key}') + + +def main(): + parser = create_parser() + args = parser.parse_args() + + logging.basicConfig(level=args.log_level, format='%(levelname)-8s: %(message)s') + logger = logging.getLogger() + + if args.error_patterns: + reports = TwisterReportsWithPatterns(args.error_patterns) + else: + reports = TwisterReports() + + for filename in args.inputs: + if os.path.exists(filename): + reports.parse_report(filename) + else: + logger.warning(f'File not found: {filename}') + + reports.sort_counters() + dump_to_json( + args.output, + {'status': reports.status, 'platforms': reports.platforms, 'errors': reports.errors}, + ) + + if args.status: + print('\nTestsuites and testcases status summary:') + reports.status.print_counters() + + if not reports.errors.counters: + return + + if args.platforms: + print('\nErrors per platform:') + reports.platforms.print_counters() + + if args.long_summary: + print('\nErrors summary:') + reports.errors.print_counters() + else: + summary_with_most_common_errors(reports.errors) + + if args.output_csv: + dump_to_csv(args.output_csv, reports.errors) + if args.output_md: + dump_markdown_table(args.output_md, reports.errors, max_depth=2) + + +if __name__ == '__main__': + main()