twister: move execution code out of testplan

Move all code related to execution into runner class.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2022-06-10 06:51:25 -04:00
commit 20f257a97d
3 changed files with 99 additions and 83 deletions

View file

@ -10,8 +10,9 @@ import sys
import subprocess import subprocess
import pickle import pickle
import logging import logging
import queue
from colorama import Fore from colorama import Fore
from multiprocessing import Lock, Value from multiprocessing import Lock, Process, Value
from twister.cmakecache import CMakeCache from twister.cmakecache import CMakeCache
@ -740,3 +741,82 @@ class ProjectBuilder(FilterBuilder):
instance.metrics["unrecognized"] = [] instance.metrics["unrecognized"] = []
instance.metrics["handler_time"] = instance.execution_time instance.metrics["handler_time"] = instance.execution_time
class TwisterRunner:
def __init__(self, instances, jobs=1, env=None) -> None:
self.pipeline = None
self.options = env.options
self.env = env
self.instances = instances
self.jobs = jobs
def update_counting(self, results=None):
for instance in self.instances.values():
results.cases += len(instance.testsuite.testcases)
if instance.status == 'filtered':
results.skipped_filter += 1
results.skipped_configs += 1
elif instance.status == 'passed':
results.passed += 1
results.done += 1
elif instance.status == 'error':
results.error += 1
results.done += 1
def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
for instance in self.instances.values():
if build_only:
instance.run = False
no_retry_statuses = ['passed', 'skipped', 'filtered']
if not retry_build_errors:
no_retry_statuses.append("error")
if instance.status not in no_retry_statuses:
logger.debug(f"adding {instance.name}")
instance.status = None
if test_only and instance.run:
pipeline.put({"op": "run", "test": instance})
else:
pipeline.put({"op": "cmake", "test": instance})
def pipeline_mgr(self, pipeline, done_queue, lock, results):
while True:
try:
task = pipeline.get_nowait()
except queue.Empty:
break
else:
instance = task['test']
pb = ProjectBuilder(self, instance, self.env)
pb.process(pipeline, done_queue, task, lock, results)
return True
def execute(self, pipeline, done, results):
lock = Lock()
logger.info("Adding tasks to the queue...")
self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
retry_build_errors=self.options.retry_build_errors)
logger.info("Added initial list of jobs to queue")
processes = []
for job in range(self.jobs):
logger.debug(f"Launch process {job}")
p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
processes.append(p)
p.start()
try:
for p in processes:
p.join()
except KeyboardInterrupt:
logger.info("Execution interrupted")
for p in processes:
p.terminate()
return results

View file

@ -161,6 +161,8 @@ class TestPlan:
self.board_roots = board_root_list self.board_roots = board_root_list
self.options = env.options self.options = env.options
self.env = env
# Test Plan Options # Test Plan Options
self.suite_name_check = True self.suite_name_check = True
self.seed = 0 self.seed = 0
@ -182,10 +184,6 @@ class TestPlan:
self.duts = [] self.duts = []
# used during creating shorter build paths # used during creating shorter build paths
self.link_dir_counter = 0 self.link_dir_counter = 0
self.pipeline = None
self.env = env
self.modules = [] self.modules = []
@ -202,19 +200,6 @@ class TestPlan:
sys.stdout.write(what + "\n") sys.stdout.write(what + "\n")
sys.stdout.flush() sys.stdout.flush()
def update_counting(self, results=None):
for instance in self.instances.values():
results.cases += len(instance.testsuite.testcases)
if instance.status == 'filtered':
results.skipped_filter += 1
results.skipped_configs += 1
elif instance.status == 'passed':
results.passed += 1
results.done += 1
elif instance.status == 'error':
results.error += 1
results.done += 1
def add_configurations(self): def add_configurations(self):
@ -949,59 +934,6 @@ class TestPlan:
for instance in instance_list: for instance in instance_list:
self.instances[instance.name] = instance self.instances[instance.name] = instance
def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
for instance in self.instances.values():
if build_only:
instance.run = False
no_retry_statuses = ['passed', 'skipped', 'filtered']
if not retry_build_errors:
no_retry_statuses.append("error")
if instance.status not in no_retry_statuses:
logger.debug(f"adding {instance.name}")
instance.status = None
if test_only and instance.run:
pipeline.put({"op": "run", "test": instance})
else:
pipeline.put({"op": "cmake", "test": instance})
def pipeline_mgr(self, pipeline, done_queue, lock, results):
while True:
try:
task = pipeline.get_nowait()
except queue.Empty:
break
else:
instance = task['test']
pb = ProjectBuilder(self, instance, self.env)
pb.process(pipeline, done_queue, task, lock, results)
return True
def execute(self, pipeline, done, results):
lock = Lock()
logger.info("Adding tasks to the queue...")
self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
retry_build_errors=self.options.retry_build_errors)
logger.info("Added initial list of jobs to queue")
processes = []
for job in range(self.jobs):
logger.debug(f"Launch process {job}")
p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
processes.append(p)
p.start()
try:
for p in processes:
p.join()
except KeyboardInterrupt:
logger.info("Execution interrupted")
for p in processes:
p.terminate()
return results
def get_testsuite(self, identifier): def get_testsuite(self, identifier):
results = [] results = []

View file

@ -178,6 +178,7 @@ from colorama import Fore
from pathlib import Path from pathlib import Path
from multiprocessing.managers import BaseManager from multiprocessing.managers import BaseManager
import queue import queue
from zephyr_module import west_projects, parse_modules from zephyr_module import west_projects, parse_modules
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
@ -211,6 +212,7 @@ from twister.enviornment import TwisterEnv, canonical_zephyr_base
from twister.reports import Reporting from twister.reports import Reporting
from twister.hardwaremap import HardwareMap from twister.hardwaremap import HardwareMap
from twister.coverage import CoverageTool from twister.coverage import CoverageTool
from twister.runner import TwisterRunner
logger = logging.getLogger('twister') logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -933,15 +935,6 @@ def main():
modules = [module.meta.get('name') for module in modules_meta] modules = [module.meta.get('name') for module in modules_meta]
tplan.modules = modules tplan.modules = modules
# Set number of jobs
if options.jobs:
tplan.jobs = options.jobs
elif options.build_only:
tplan.jobs = multiprocessing.cpu_count() * 2
else:
tplan.jobs = multiprocessing.cpu_count()
logger.info("JOBS: %d" % tplan.jobs)
run_individual_tests = [] run_individual_tests = []
if options.test: if options.test:
@ -1248,10 +1241,21 @@ def main():
pipeline = manager.LifoQueue() pipeline = manager.LifoQueue()
done_queue = manager.LifoQueue() done_queue = manager.LifoQueue()
tplan.update_counting(results) # Set number of jobs
if options.jobs:
jobs = options.jobs
elif options.build_only:
jobs = multiprocessing.cpu_count() * 2
else:
jobs = multiprocessing.cpu_count()
logger.info("JOBS: %d" % jobs)
runner = TwisterRunner(tplan.instances, jobs, env)
runner.update_counting(results)
logger.info("%d test scenarios (%d configurations) selected, %d configurations discarded due to filters." % logger.info("%d test scenarios (%d configurations) selected, %d configurations discarded due to filters." %
(len(tplan.testsuites), len(tplan.instances), results.skipped_configs)) (len(tplan.testsuites), len(tplan.instances), results.skipped_configs))
tplan.start_time = start_time
while True: while True:
completed += 1 completed += 1
@ -1266,7 +1270,7 @@ def main():
else: else:
results.failed = results.error results.failed = results.error
results = tplan.execute(pipeline, done_queue, results) results = runner.execute(pipeline, done_queue, results)
while True: while True:
try: try: