diff --git a/scripts/pylib/twister/twister/runner.py b/scripts/pylib/twister/twister/runner.py index f1dd4a4de1a..89d08cec531 100644 --- a/scripts/pylib/twister/twister/runner.py +++ b/scripts/pylib/twister/twister/runner.py @@ -10,8 +10,9 @@ import sys import subprocess import pickle import logging +import queue from colorama import Fore -from multiprocessing import Lock, Value +from multiprocessing import Lock, Process, Value from twister.cmakecache import CMakeCache @@ -740,3 +741,82 @@ class ProjectBuilder(FilterBuilder): instance.metrics["unrecognized"] = [] instance.metrics["handler_time"] = instance.execution_time + + + +class TwisterRunner: + + def __init__(self, instances, jobs=1, env=None) -> None: + self.pipeline = None + self.options = env.options + self.env = env + self.instances = instances + self.jobs = jobs + + def update_counting(self, results=None): + for instance in self.instances.values(): + results.cases += len(instance.testsuite.testcases) + if instance.status == 'filtered': + results.skipped_filter += 1 + results.skipped_configs += 1 + elif instance.status == 'passed': + results.passed += 1 + results.done += 1 + elif instance.status == 'error': + results.error += 1 + results.done += 1 + + + def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False): + for instance in self.instances.values(): + if build_only: + instance.run = False + + no_retry_statuses = ['passed', 'skipped', 'filtered'] + if not retry_build_errors: + no_retry_statuses.append("error") + + if instance.status not in no_retry_statuses: + logger.debug(f"adding {instance.name}") + instance.status = None + if test_only and instance.run: + pipeline.put({"op": "run", "test": instance}) + else: + pipeline.put({"op": "cmake", "test": instance}) + + def pipeline_mgr(self, pipeline, done_queue, lock, results): + while True: + try: + task = pipeline.get_nowait() + except queue.Empty: + break + else: + instance = task['test'] + pb = ProjectBuilder(self, instance, self.env) + pb.process(pipeline, done_queue, task, lock, results) + + return True + + def execute(self, pipeline, done, results): + lock = Lock() + logger.info("Adding tasks to the queue...") + self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, + retry_build_errors=self.options.retry_build_errors) + logger.info("Added initial list of jobs to queue") + + processes = [] + for job in range(self.jobs): + logger.debug(f"Launch process {job}") + p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, )) + processes.append(p) + p.start() + + try: + for p in processes: + p.join() + except KeyboardInterrupt: + logger.info("Execution interrupted") + for p in processes: + p.terminate() + + return results diff --git a/scripts/pylib/twister/twister/testplan.py b/scripts/pylib/twister/twister/testplan.py index 60cfa851737..a80ac4ac5d3 100755 --- a/scripts/pylib/twister/twister/testplan.py +++ b/scripts/pylib/twister/twister/testplan.py @@ -161,6 +161,8 @@ class TestPlan: self.board_roots = board_root_list self.options = env.options + self.env = env + # Test Plan Options self.suite_name_check = True self.seed = 0 @@ -182,10 +184,6 @@ class TestPlan: self.duts = [] # used during creating shorter build paths self.link_dir_counter = 0 - - self.pipeline = None - self.env = env - self.modules = [] @@ -202,19 +200,6 @@ class TestPlan: sys.stdout.write(what + "\n") sys.stdout.flush() - def update_counting(self, results=None): - for instance in self.instances.values(): - results.cases += len(instance.testsuite.testcases) - if instance.status == 'filtered': - results.skipped_filter += 1 - results.skipped_configs += 1 - elif instance.status == 'passed': - results.passed += 1 - results.done += 1 - elif instance.status == 'error': - results.error += 1 - results.done += 1 - def add_configurations(self): @@ -949,59 +934,6 @@ class TestPlan: for instance in instance_list: self.instances[instance.name] = instance - def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False): - for instance in self.instances.values(): - if build_only: - instance.run = False - - no_retry_statuses = ['passed', 'skipped', 'filtered'] - if not retry_build_errors: - no_retry_statuses.append("error") - - if instance.status not in no_retry_statuses: - logger.debug(f"adding {instance.name}") - instance.status = None - if test_only and instance.run: - pipeline.put({"op": "run", "test": instance}) - else: - pipeline.put({"op": "cmake", "test": instance}) - - def pipeline_mgr(self, pipeline, done_queue, lock, results): - while True: - try: - task = pipeline.get_nowait() - except queue.Empty: - break - else: - instance = task['test'] - pb = ProjectBuilder(self, instance, self.env) - pb.process(pipeline, done_queue, task, lock, results) - - return True - - def execute(self, pipeline, done, results): - lock = Lock() - logger.info("Adding tasks to the queue...") - self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, - retry_build_errors=self.options.retry_build_errors) - logger.info("Added initial list of jobs to queue") - - processes = [] - for job in range(self.jobs): - logger.debug(f"Launch process {job}") - p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, )) - processes.append(p) - p.start() - - try: - for p in processes: - p.join() - except KeyboardInterrupt: - logger.info("Execution interrupted") - for p in processes: - p.terminate() - - return results def get_testsuite(self, identifier): results = [] diff --git a/scripts/twister b/scripts/twister index d3c9607bae1..bfbb0e56b8b 100755 --- a/scripts/twister +++ b/scripts/twister @@ -178,6 +178,7 @@ from colorama import Fore from pathlib import Path from multiprocessing.managers import BaseManager import queue + from zephyr_module import west_projects, parse_modules ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") @@ -211,6 +212,7 @@ from twister.enviornment import TwisterEnv, canonical_zephyr_base from twister.reports import Reporting from twister.hardwaremap import HardwareMap from twister.coverage import CoverageTool +from twister.runner import TwisterRunner logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) @@ -933,15 +935,6 @@ def main(): modules = [module.meta.get('name') for module in modules_meta] tplan.modules = modules - # Set number of jobs - if options.jobs: - tplan.jobs = options.jobs - elif options.build_only: - tplan.jobs = multiprocessing.cpu_count() * 2 - else: - tplan.jobs = multiprocessing.cpu_count() - logger.info("JOBS: %d" % tplan.jobs) - run_individual_tests = [] if options.test: @@ -1248,10 +1241,21 @@ def main(): pipeline = manager.LifoQueue() done_queue = manager.LifoQueue() - tplan.update_counting(results) + # Set number of jobs + if options.jobs: + jobs = options.jobs + elif options.build_only: + jobs = multiprocessing.cpu_count() * 2 + else: + jobs = multiprocessing.cpu_count() + logger.info("JOBS: %d" % jobs) + + runner = TwisterRunner(tplan.instances, jobs, env) + + runner.update_counting(results) + logger.info("%d test scenarios (%d configurations) selected, %d configurations discarded due to filters." % (len(tplan.testsuites), len(tplan.instances), results.skipped_configs)) - tplan.start_time = start_time while True: completed += 1 @@ -1266,7 +1270,7 @@ def main(): else: results.failed = results.error - results = tplan.execute(pipeline, done_queue, results) + results = runner.execute(pipeline, done_queue, results) while True: try: