twister: rework testplan and main twister script

Move all testplan related code to the testplan class and cleanup main
twister script.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2022-06-11 16:46:33 -04:00
commit f85d1eb4e1
2 changed files with 267 additions and 190 deletions

View file

@ -16,15 +16,25 @@ import logging
from distutils.spawn import find_executable
import colorama
import json
from multiprocessing import Lock, Process, Value
import collections
from typing import List
from winreg import QueryValue
from collections import OrderedDict
from itertools import islice
try:
from anytree import RenderTree, Node, find
except ImportError:
print("Install the anytree module to use the --test-tree option")
from twister.testsuite import TestSuite
from twister.error import TwisterRuntimeError
from twister.platform import Platform
from twister.config_parser import TwisterConfigParser
from twister.testinstance import TestInstance
from twister.runner import ProjectBuilder
from zephyr_module import west_projects, parse_modules
try:
# Use the C LibYAML parser if available, rather than the Python parser.
@ -186,6 +196,251 @@ class TestPlan:
self.link_dir_counter = 0
self.modules = []
self.run_individual_testsuite = []
def find_subtests(self):
sub_tests = self.options.sub_test
if sub_tests:
for subtest in sub_tests:
_subtests = self.get_testcase(subtest)
for _subtest in _subtests:
self.run_individual_testsuite.append(_subtest.name)
if self.run_individual_testsuite:
logger.info("Running the following tests:")
for test in self.run_individual_testsuite:
print(" - {}".format(test))
else:
raise TwisterRuntimeError("Tests not found")
def discover(self):
self.handle_modules()
if self.options.test:
self.run_individual_testsuite = self.options.test
num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite)
if num == 0:
raise TwisterRuntimeError("No test cases found at the specified location...")
self.find_subtests()
self.add_configurations()
# handle quarantine
ql = self.options.quarantine_list
if ql:
self.load_quarantine(ql)
qv = self.options.quarantine_verify
if qv:
if not ql:
logger.error("No quarantine list given to be verified")
raise TwisterRuntimeError("No quarantine list given to be verified")
if self.options.subset:
subset, sets = self.options.subset.split("/")
subset = int(subset)
self.generate_subset(subset, sets)
def load(self):
if self.options.report_suffix:
last_run = os.path.join(self.options.outdir, "twister_{}.json".format(self.options.report_suffix))
else:
last_run = os.path.join(self.options.outdir, "twister.json")
if self.options.only_failed:
self.load_from_file(last_run)
self.selected_platforms = set(p.platform.name for p in self.instances.values())
elif self.options.load_tests:
self.load_from_file(self.options.load_tests)
self.selected_platforms = set(p.platform.name for p in self.instances.values())
elif self.test_only:
# Get list of connected hardware and filter tests to only be run on connected hardware
# in cases where no platform was specified when running the tests.
# If the platform does not exist in the hardware map, just skip it.
connected_list = []
if not self.options.platform:
for connected in hwm.duts:
if connected['connected']:
connected_list.append(connected['platform'])
self.load_from_file(last_run, filter_platform=connected_list)
self.selected_platforms = set(p.platform.name for p in self.instances.values())
else:
self.apply_filters(
enable_slow=options.enable_slow,
platform=options.platform,
exclude_platform=options.exclude_platform,
arch=options.arch,
tag=options.tag,
exclude_tag=options.exclude_tag,
force_toolchain=options.force_toolchain,
all=options.all,
emulation_only=options.emulation_only,
runnable=(options.device_testing or options.filter == 'runnable'),
force_platform=options.force_platform
)
def generate_subset(self, subset, sets):
# Test instances are sorted depending on the context. For CI runs
# the execution order is: "plat1-testA, plat1-testB, ...,
# plat1-testZ, plat2-testA, ...". For hardware tests
# (device_testing), were multiple physical platforms can run the tests
# in parallel, it is more efficient to run in the order:
# "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
if self.options.device_testing:
self.instances = OrderedDict(sorted(self.instances.items(),
key=lambda x: x[0][x[0].find("/") + 1:]))
else:
self.instances = OrderedDict(sorted(self.instances.items()))
# Do calculation based on what is actually going to be run and evaluated
# at runtime, ignore the cases we already know going to be skipped.
# This fixes an issue where some sets would get majority of skips and
# basically run nothing beside filtering.
to_run = {k : v for k,v in self.instances.items() if v.status is None}
total = len(to_run)
per_set = int(total / sets)
num_extra_sets = total - (per_set * sets)
# Try and be more fair for rounding error with integer division
# so the last subset doesn't get overloaded, we add 1 extra to
# subsets 1..num_extra_sets.
if subset <= num_extra_sets:
start = (subset - 1) * (per_set + 1)
end = start + per_set + 1
else:
base = num_extra_sets * (per_set + 1)
start = ((subset - num_extra_sets - 1) * per_set) + base
end = start + per_set
sliced_instances = islice(to_run.items(), start, end)
skipped = {k : v for k,v in self.instances.items() if v.status == 'skipped'}
errors = {k : v for k,v in self.instances.items() if v.status == 'error'}
self.instances = OrderedDict(sliced_instances)
if subset == 1:
# add all pre-filtered tests that are skipped or got error status
# to the first set to allow for better distribution among all sets.
self.instances.update(skipped)
self.instances.update(errors)
def handle_modules(self):
# get all enabled west projects
west_proj = west_projects()
modules_meta = parse_modules(ZEPHYR_BASE,
[p.posixpath for p in west_proj['projects']]
if west_proj else None, None)
self.modules = [module.meta.get('name') for module in modules_meta]
def report(self):
if self.options.list_test_duplicates:
self.report_duplicates()
return 0
elif self.options.test_tree:
self.report_test_tree()
return 0
elif self.options.list_tests:
self.report_test_list()
return 0
elif self.options.list_tags:
self.report_tag_list()
return 0
return 1
def report_duplicates(self):
all_tests = self.get_all_tests()
dupes = [item for item, count in collections.Counter(all_tests).items() if count > 1]
if dupes:
print("Tests with duplicate identifiers:")
for dupe in dupes:
print("- {}".format(dupe))
for dc in self.get_testsuite(dupe):
print(" - {}".format(dc))
else:
print("No duplicates found.")
def report_tag_list(self):
tags = set()
for _, tc in self.testsuites.items():
tags = tags.union(tc.tags)
for t in tags:
print("- {}".format(t))
def report_test_tree(self):
all_tests = self.get_all_tests()
testsuite = Node("Testsuite")
samples = Node("Samples", parent=testsuite)
tests = Node("Tests", parent=testsuite)
for test in sorted(all_tests):
if test.startswith("sample."):
sec = test.split(".")
area = find(samples, lambda node: node.name == sec[1] and node.parent == samples)
if not area:
area = Node(sec[1], parent=samples)
t = Node(test, parent=area)
else:
sec = test.split(".")
area = find(tests, lambda node: node.name == sec[0] and node.parent == tests)
if not area:
area = Node(sec[0], parent=tests)
if area and len(sec) > 2:
subarea = find(area, lambda node: node.name == sec[1] and node.parent == area)
if not subarea:
subarea = Node(sec[1], parent=area)
t = Node(test, parent=subarea)
for pre, _, node in RenderTree(testsuite):
print("%s%s" % (pre, node.name))
def report_test_list(self):
cnt = 0
all_tests = self.get_all_tests()
for test in sorted(all_tests):
cnt = cnt + 1
print(" - {}".format(test))
print("{} total.".format(cnt))
def report_excluded_tests(self):
all_tests = self.get_all_tests()
to_be_run = set()
for i, p in self.instances.items():
to_be_run.update(p.testsuite.cases)
if all_tests - to_be_run:
print("Tests that never build or run:")
for not_run in all_tests - to_be_run:
print("- {}".format(not_run))
return
def report_platform_tests(self, platforms=[]):
if len(platforms) > 1:
raise TwisterRuntimeError("When exporting tests, only one platform "
"should be specified.")
for p in platforms:
inst = self.get_platform_instances(p)
count = 0
for i in inst.values():
for c in i.testsuite.cases:
print(f"- {c}")
count += 1
print(f"Tests found: {count}")
return
def get_platform_instances(self, platform):
filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + os.sep)}
@ -690,7 +945,7 @@ class TestPlan:
platform_filter = kwargs.get('platform')
exclude_platform = kwargs.get('exclude_platform', [])
testsuite_filter = kwargs.get('run_individual_tests', [])
testsuite_filter = self.run_individual_testsuite
arch_filter = kwargs.get('arch')
tag_filter = kwargs.get('tag')
exclude_tag = kwargs.get('exclude_tag')

View file

@ -170,15 +170,12 @@ import sys
import logging
import time
import shutil
from collections import OrderedDict
from itertools import islice
import colorama
from colorama import Fore
from pathlib import Path
import queue
from zephyr_module import west_projects, parse_modules
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
if not ZEPHYR_BASE:
@ -193,10 +190,6 @@ if not ZEPHYR_BASE:
print(f'ZEPHYR_BASE unset, using "{ZEPHYR_BASE}"')
try:
from anytree import RenderTree, Node, find
except ImportError:
print("Install the anytree module to use the --test-tree option")
try:
from tabulate import tabulate
@ -364,7 +357,7 @@ Artificially long but functional example:
test_xor_subtest.add_argument(
"-s", "--test", action="append",
help="Run only the specified test cases. These are named by "
help="Run only the specified testsuite scenario. These are named by "
"<path/relative/to/Zephyr/base/section.name.in.testcase.yaml>")
test_xor_subtest.add_argument(
@ -926,24 +919,11 @@ def main():
env.discover()
tplan = TestPlan(options.board_root, options.testsuite_root, env, options.outdir)
# get all enabled west projects
west_proj = west_projects()
modules_meta = parse_modules(ZEPHYR_BASE,
[p.posixpath for p in west_proj['projects']]
if west_proj else None, None)
modules = [module.meta.get('name') for module in modules_meta]
tplan.modules = modules
run_individual_tests = []
if options.test:
run_individual_tests = options.test
num = tplan.add_testsuites(testsuite_filter=run_individual_tests)
if num == 0:
logger.error("No test cases found at the specified location...")
try:
tplan.discover()
except RuntimeError as e:
logger.error(f"{e}")
sys.exit(1)
tplan.add_configurations()
if options.device_testing:
if options.hardware_map:
@ -976,7 +956,7 @@ def main():
--device-serial or --device-serial-pty,
only one platform is allowed""")
# the fixtures given by twister command explicitly should be assigned to each DUTs
# the fixtures given by twister command explicitly should be assigned to each DUT
if options.fixture:
for d in tplan.duts:
d.fixtures.extend(options.fixture)
@ -984,101 +964,14 @@ def main():
if tplan.load_errors:
sys.exit(1)
if options.list_tags:
tags = set()
for _, tc in tplan.testsuites.items():
tags = tags.union(tc.tags)
for t in tags:
print("- {}".format(t))
if tplan.report() == 0:
return
if not options.platform and (options.list_tests or options.test_tree or options.list_test_duplicates \
or options.sub_test):
cnt = 0
all_tests = tplan.get_all_tests()
if options.list_test_duplicates:
import collections
dupes = [item for item, count in collections.Counter(all_tests).items() if count > 1]
if dupes:
print("Tests with duplicate identifiers:")
for dupe in dupes:
print("- {}".format(dupe))
for dc in tplan.get_testsuite(dupe):
print(" - {}".format(dc))
else:
print("No duplicates found.")
return
if options.sub_test:
for st in options.sub_test:
subtests = tplan.get_testcase(st)
for sti in subtests:
run_individual_tests.append(sti.name)
if run_individual_tests:
logger.info("Running the following tests:")
for test in run_individual_tests:
print(" - {}".format(test))
else:
logger.info("Tests not found")
return
elif options.list_tests or options.test_tree:
if options.test_tree:
testsuite = Node("Testsuite")
samples = Node("Samples", parent=testsuite)
tests = Node("Tests", parent=testsuite)
for test in sorted(all_tests):
cnt = cnt + 1
if options.list_tests:
print(" - {}".format(test))
if options.test_tree:
if test.startswith("sample."):
sec = test.split(".")
area = find(samples, lambda node: node.name == sec[1] and node.parent == samples)
if not area:
area = Node(sec[1], parent=samples)
t = Node(test, parent=area)
else:
sec = test.split(".")
area = find(tests, lambda node: node.name == sec[0] and node.parent == tests)
if not area:
area = Node(sec[0], parent=tests)
if area and len(sec) > 2:
subarea = find(area, lambda node: node.name == sec[1] and node.parent == area)
if not subarea:
subarea = Node(sec[1], parent=area)
t = Node(test, parent=subarea)
if options.list_tests:
print("{} total.".format(cnt))
if options.test_tree:
for pre, _, node in RenderTree(testsuite):
print("%s%s" % (pre, node.name))
return
if options.report_suffix:
last_run = os.path.join(options.outdir, "twister_{}.json".format(options.report_suffix))
else:
last_run = os.path.join(options.outdir, "twister.json")
if options.quarantine_list:
tplan.load_quarantine(options.quarantine_list)
if options.quarantine_verify:
if not options.quarantine_list:
logger.error("No quarantine list given to be verified")
sys.exit(1)
tplan.quarantine_verify = options.quarantine_verify
if options.only_failed:
tplan.load_from_file(last_run)
tplan.selected_platforms = set(p.platform.name for p in tplan.instances.values())
@ -1108,27 +1001,12 @@ def main():
force_toolchain=options.force_toolchain,
all=options.all,
emulation_only=options.emulation_only,
run_individual_tests=run_individual_tests,
runnable=(options.device_testing or options.filter == 'runnable'),
force_platform=options.force_platform
)
if options.list_tests and options.platform:
if len(options.platform) > 1:
logger.error("When exporting tests, only one platform "
"should be specified.")
return
for p in options.platform:
inst = tplan.get_platform_instances(p)
count = 0
for i in inst.values():
for c in i.testsuite.cases:
print(f"- {c}")
count += 1
print(f"Tests found: {count}")
tplan.report_platform_tests(options.platform)
return
if VERBOSE > 1:
@ -1150,65 +1028,9 @@ def main():
i.reason))
if options.report_excluded:
all_tests = tplan.get_all_tests()
to_be_run = set()
for i, p in tplan.instances.items():
to_be_run.update(p.testsuite.cases)
if all_tests - to_be_run:
print("Tests that never build or run:")
for not_run in all_tests - to_be_run:
print("- {}".format(not_run))
tplan.report_excluded_tests()
return
if options.subset:
# Test instances are sorted depending on the context. For CI runs
# the execution order is: "plat1-testA, plat1-testB, ...,
# plat1-testZ, plat2-testA, ...". For hardware tests
# (device_testing), were multiple physical platforms can run the tests
# in parallel, it is more efficient to run in the order:
# "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
if options.device_testing:
tplan.instances = OrderedDict(sorted(tplan.instances.items(),
key=lambda x: x[0][x[0].find("/") + 1:]))
else:
tplan.instances = OrderedDict(sorted(tplan.instances.items()))
# Do calculation based on what is actually going to be run and evaluated
# at runtime, ignore the cases we already know going to be skipped.
# This fixes an issue where some sets would get majority of skips and
# basically run nothing beside filtering.
to_run = {k : v for k,v in tplan.instances.items() if v.status is None}
subset, sets = options.subset.split("/")
subset = int(subset)
sets = int(sets)
total = len(to_run)
per_set = int(total / sets)
num_extra_sets = total - (per_set * sets)
# Try and be more fair for rounding error with integer division
# so the last subset doesn't get overloaded, we add 1 extra to
# subsets 1..num_extra_sets.
if subset <= num_extra_sets:
start = (subset - 1) * (per_set + 1)
end = start + per_set + 1
else:
base = num_extra_sets * (per_set + 1)
start = ((subset - num_extra_sets - 1) * per_set) + base
end = start + per_set
sliced_instances = islice(to_run.items(), start, end)
skipped = {k : v for k,v in tplan.instances.items() if v.status == 'skipped'}
errors = {k : v for k,v in tplan.instances.items() if v.status == 'error'}
tplan.instances = OrderedDict(sliced_instances)
if subset == 1:
# add all pre-filtered tests that are skipped or got error status
# to the first set to allow for better distribution among all sets.
tplan.instances.update(skipped)
tplan.instances.update(errors)
report = Reporting(tplan, env)
report.json_report(os.path.join(options.outdir, "testplan.json"))