twister: move runner classes out
Move runners into own file. Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
parent
1084869634
commit
96b9ff6b5e
3 changed files with 816 additions and 794 deletions
810
scripts/pylib/twister/twister/runner.py
Normal file
810
scripts/pylib/twister/twister/runner.py
Normal file
|
@ -0,0 +1,810 @@
|
|||
# vim: set syntax=python ts=4 :
|
||||
#
|
||||
# Copyright (c) 20180-2022 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
import sys
|
||||
import subprocess
|
||||
import pickle
|
||||
import logging
|
||||
from colorama import Fore
|
||||
from multiprocessing import Lock, Process, Value
|
||||
|
||||
from twister.cmakecache import CMakeCache
|
||||
from twister.handlers import BinaryHandler, QEMUHandler, DeviceHandler
|
||||
|
||||
logger = logging.getLogger('twister')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
import expr_parser
|
||||
|
||||
class ExecutionCounter(object):
|
||||
def __init__(self, total=0):
|
||||
self._done = Value('i', 0)
|
||||
self._passed = Value('i', 0)
|
||||
self._skipped_configs = Value('i', 0)
|
||||
self._skipped_runtime = Value('i', 0)
|
||||
self._skipped_filter = Value('i', 0)
|
||||
self._skipped_cases = Value('i', 0)
|
||||
self._error = Value('i', 0)
|
||||
self._failed = Value('i', 0)
|
||||
self._total = Value('i', total)
|
||||
self._cases = Value('i', 0)
|
||||
|
||||
|
||||
self.lock = Lock()
|
||||
|
||||
|
||||
def summary(self):
|
||||
logger.debug("--------------------------------")
|
||||
logger.debug(f"Total Test suites: {self.total}")
|
||||
logger.debug(f"Total Test cases: {self.cases}")
|
||||
logger.debug(f"Skipped test cases: {self.skipped_cases}")
|
||||
logger.debug(f"Completed Testsuites: {self.done}")
|
||||
logger.debug(f"Passing Testsuites: {self.passed}")
|
||||
logger.debug(f"Failing Testsuites: {self.failed}")
|
||||
logger.debug(f"Skipped Testsuites: {self.skipped_configs}")
|
||||
logger.debug(f"Skipped Testsuites (runtime): {self.skipped_runtime}")
|
||||
logger.debug(f"Skipped Testsuites (filter): {self.skipped_filter}")
|
||||
logger.debug(f"Errors: {self.error}")
|
||||
logger.debug("--------------------------------")
|
||||
|
||||
@property
|
||||
def cases(self):
|
||||
with self._cases.get_lock():
|
||||
return self._cases.value
|
||||
|
||||
@cases.setter
|
||||
def cases(self, value):
|
||||
with self._cases.get_lock():
|
||||
self._cases.value = value
|
||||
|
||||
@property
|
||||
def skipped_cases(self):
|
||||
with self._skipped_cases.get_lock():
|
||||
return self._skipped_cases.value
|
||||
|
||||
@skipped_cases.setter
|
||||
def skipped_cases(self, value):
|
||||
with self._skipped_cases.get_lock():
|
||||
self._skipped_cases.value = value
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
with self._error.get_lock():
|
||||
return self._error.value
|
||||
|
||||
@error.setter
|
||||
def error(self, value):
|
||||
with self._error.get_lock():
|
||||
self._error.value = value
|
||||
|
||||
@property
|
||||
def done(self):
|
||||
with self._done.get_lock():
|
||||
return self._done.value
|
||||
|
||||
@done.setter
|
||||
def done(self, value):
|
||||
with self._done.get_lock():
|
||||
self._done.value = value
|
||||
|
||||
@property
|
||||
def passed(self):
|
||||
with self._passed.get_lock():
|
||||
return self._passed.value
|
||||
|
||||
@passed.setter
|
||||
def passed(self, value):
|
||||
with self._passed.get_lock():
|
||||
self._passed.value = value
|
||||
|
||||
@property
|
||||
def skipped_configs(self):
|
||||
with self._skipped_configs.get_lock():
|
||||
return self._skipped_configs.value
|
||||
|
||||
@skipped_configs.setter
|
||||
def skipped_configs(self, value):
|
||||
with self._skipped_configs.get_lock():
|
||||
self._skipped_configs.value = value
|
||||
|
||||
@property
|
||||
def skipped_filter(self):
|
||||
with self._skipped_filter.get_lock():
|
||||
return self._skipped_filter.value
|
||||
|
||||
@skipped_filter.setter
|
||||
def skipped_filter(self, value):
|
||||
with self._skipped_filter.get_lock():
|
||||
self._skipped_filter.value = value
|
||||
|
||||
@property
|
||||
def skipped_runtime(self):
|
||||
with self._skipped_runtime.get_lock():
|
||||
return self._skipped_runtime.value
|
||||
|
||||
@skipped_runtime.setter
|
||||
def skipped_runtime(self, value):
|
||||
with self._skipped_runtime.get_lock():
|
||||
self._skipped_runtime.value = value
|
||||
|
||||
@property
|
||||
def failed(self):
|
||||
with self._failed.get_lock():
|
||||
return self._failed.value
|
||||
|
||||
@failed.setter
|
||||
def failed(self, value):
|
||||
with self._failed.get_lock():
|
||||
self._failed.value = value
|
||||
|
||||
@property
|
||||
def total(self):
|
||||
with self._total.get_lock():
|
||||
return self._total.value
|
||||
|
||||
class CMake:
|
||||
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
||||
dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
||||
|
||||
def __init__(self, testsuite, platform, source_dir, build_dir):
|
||||
|
||||
self.cwd = None
|
||||
self.capture_output = True
|
||||
|
||||
self.defconfig = {}
|
||||
self.cmake_cache = {}
|
||||
|
||||
self.instance = None
|
||||
self.testsuite = testsuite
|
||||
self.platform = platform
|
||||
self.source_dir = source_dir
|
||||
self.build_dir = build_dir
|
||||
self.log = "build.log"
|
||||
self.generator = None
|
||||
self.generator_cmd = None
|
||||
|
||||
self.default_encoding = sys.getdefaultencoding()
|
||||
|
||||
def parse_generated(self):
|
||||
self.defconfig = {}
|
||||
return {}
|
||||
|
||||
def run_build(self, args=[]):
|
||||
|
||||
logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
|
||||
|
||||
cmake_args = []
|
||||
cmake_args.extend(args)
|
||||
cmake = shutil.which('cmake')
|
||||
cmd = [cmake] + cmake_args
|
||||
kwargs = dict()
|
||||
|
||||
if self.capture_output:
|
||||
kwargs['stdout'] = subprocess.PIPE
|
||||
# CMake sends the output of message() to stderr unless it's STATUS
|
||||
kwargs['stderr'] = subprocess.STDOUT
|
||||
|
||||
if self.cwd:
|
||||
kwargs['cwd'] = self.cwd
|
||||
|
||||
p = subprocess.Popen(cmd, **kwargs)
|
||||
out, _ = p.communicate()
|
||||
|
||||
results = {}
|
||||
if p.returncode == 0:
|
||||
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
|
||||
|
||||
self.instance.status = "passed"
|
||||
if not self.instance.run:
|
||||
self.instance.add_missing_case_status("skipped", "Test was built only")
|
||||
results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
|
||||
|
||||
if out:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log.write(log_msg)
|
||||
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
# A real error occurred, raise an exception
|
||||
log_msg = ""
|
||||
if out:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log.write(log_msg)
|
||||
|
||||
if log_msg:
|
||||
overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg)
|
||||
if overflow_found and not self.overflow_as_errors:
|
||||
logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
|
||||
self.instance.status = "skipped"
|
||||
self.instance.reason = "{} overflow".format(overflow_found[0])
|
||||
else:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Build failure"
|
||||
|
||||
results = {
|
||||
"returncode": p.returncode,
|
||||
"instance": self.instance,
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
def run_cmake(self, args=[]):
|
||||
|
||||
if self.warnings_as_errors:
|
||||
ldflags = "-Wl,--fatal-warnings"
|
||||
cflags = "-Werror"
|
||||
aflags = "-Werror -Wa,--fatal-warnings"
|
||||
gen_defines_args = "--edtlib-Werror"
|
||||
else:
|
||||
ldflags = cflags = aflags = ""
|
||||
gen_defines_args = ""
|
||||
|
||||
logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
|
||||
cmake_args = [
|
||||
f'-B{self.build_dir}',
|
||||
f'-S{self.source_dir}',
|
||||
f'-DTC_RUNID={self.instance.run_id}',
|
||||
f'-DEXTRA_CFLAGS={cflags}',
|
||||
f'-DEXTRA_AFLAGS={aflags}',
|
||||
f'-DEXTRA_LDFLAGS={ldflags}',
|
||||
f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
|
||||
f'-G{self.generator}'
|
||||
]
|
||||
|
||||
args = ["-D{}".format(a.replace('"', '')) for a in args]
|
||||
cmake_args.extend(args)
|
||||
|
||||
cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
|
||||
cmake_args.extend(cmake_opts)
|
||||
|
||||
|
||||
logger.debug("Calling cmake with arguments: {}".format(cmake_args))
|
||||
cmake = shutil.which('cmake')
|
||||
cmd = [cmake] + cmake_args
|
||||
kwargs = dict()
|
||||
|
||||
if self.capture_output:
|
||||
kwargs['stdout'] = subprocess.PIPE
|
||||
# CMake sends the output of message() to stderr unless it's STATUS
|
||||
kwargs['stderr'] = subprocess.STDOUT
|
||||
|
||||
if self.cwd:
|
||||
kwargs['cwd'] = self.cwd
|
||||
|
||||
p = subprocess.Popen(cmd, **kwargs)
|
||||
out, _ = p.communicate()
|
||||
|
||||
if p.returncode == 0:
|
||||
filter_results = self.parse_generated()
|
||||
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
|
||||
logger.debug(msg)
|
||||
results = {'msg': msg, 'filter': filter_results}
|
||||
|
||||
else:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Cmake build failure"
|
||||
|
||||
for tc in self.instance.testcases:
|
||||
tc.status = self.instance.status
|
||||
|
||||
logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
|
||||
results = {"returncode": p.returncode}
|
||||
|
||||
if out:
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
log.write(log_msg)
|
||||
|
||||
return results
|
||||
|
||||
class FilterBuilder(CMake):
|
||||
|
||||
def __init__(self, testsuite, platform, source_dir, build_dir):
|
||||
super().__init__(testsuite, platform, source_dir, build_dir)
|
||||
|
||||
self.log = "config-twister.log"
|
||||
|
||||
def parse_generated(self):
|
||||
|
||||
if self.platform.name == "unit_testing":
|
||||
return {}
|
||||
|
||||
cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
|
||||
defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
|
||||
|
||||
with open(defconfig_path, "r") as fp:
|
||||
defconfig = {}
|
||||
for line in fp.readlines():
|
||||
m = self.config_re.match(line)
|
||||
if not m:
|
||||
if line.strip() and not line.startswith("#"):
|
||||
sys.stderr.write("Unrecognized line %s\n" % line)
|
||||
continue
|
||||
defconfig[m.group(1)] = m.group(2).strip()
|
||||
|
||||
self.defconfig = defconfig
|
||||
|
||||
cmake_conf = {}
|
||||
try:
|
||||
cache = CMakeCache.from_file(cmake_cache_path)
|
||||
except FileNotFoundError:
|
||||
cache = {}
|
||||
|
||||
for k in iter(cache):
|
||||
cmake_conf[k.name] = k.value
|
||||
|
||||
self.cmake_cache = cmake_conf
|
||||
|
||||
filter_data = {
|
||||
"ARCH": self.platform.arch,
|
||||
"PLATFORM": self.platform.name
|
||||
}
|
||||
filter_data.update(os.environ)
|
||||
filter_data.update(self.defconfig)
|
||||
filter_data.update(self.cmake_cache)
|
||||
|
||||
edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
|
||||
if self.testsuite and self.testsuite.ts_filter:
|
||||
try:
|
||||
if os.path.exists(edt_pickle):
|
||||
with open(edt_pickle, 'rb') as f:
|
||||
edt = pickle.load(f)
|
||||
else:
|
||||
edt = None
|
||||
res = expr_parser.parse(self.testsuite.ts_filter, filter_data, edt)
|
||||
|
||||
except (ValueError, SyntaxError) as se:
|
||||
sys.stderr.write(
|
||||
"Failed processing %s\n" % self.testsuite.yamlfile)
|
||||
raise se
|
||||
|
||||
if not res:
|
||||
return {os.path.join(self.platform.name, self.testsuite.name): True}
|
||||
else:
|
||||
return {os.path.join(self.platform.name, self.testsuite.name): False}
|
||||
else:
|
||||
self.platform.filter_data = filter_data
|
||||
return filter_data
|
||||
|
||||
|
||||
class ProjectBuilder(FilterBuilder):
|
||||
|
||||
def __init__(self, tplan, instance, **kwargs):
|
||||
super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir)
|
||||
|
||||
self.log = "build.log"
|
||||
self.instance = instance
|
||||
self.testplan = tplan
|
||||
self.filtered_tests = 0
|
||||
|
||||
self.lsan = kwargs.get('lsan', False)
|
||||
self.asan = kwargs.get('asan', False)
|
||||
self.ubsan = kwargs.get('ubsan', False)
|
||||
self.valgrind = kwargs.get('valgrind', False)
|
||||
self.extra_args = kwargs.get('extra_args', [])
|
||||
self.device_testing = kwargs.get('device_testing', False)
|
||||
self.cmake_only = kwargs.get('cmake_only', False)
|
||||
self.cleanup = kwargs.get('cleanup', False)
|
||||
self.coverage = kwargs.get('coverage', False)
|
||||
self.inline_logs = kwargs.get('inline_logs', False)
|
||||
self.generator = kwargs.get('generator', None)
|
||||
self.generator_cmd = kwargs.get('generator_cmd', None)
|
||||
self.verbose = kwargs.get('verbose', None)
|
||||
self.warnings_as_errors = kwargs.get('warnings_as_errors', True)
|
||||
self.overflow_as_errors = kwargs.get('overflow_as_errors', False)
|
||||
self.suite_name_check = kwargs.get('suite_name_check', True)
|
||||
self.seed = kwargs.get('seed', 0)
|
||||
|
||||
@staticmethod
|
||||
def log_info(filename, inline_logs):
|
||||
filename = os.path.abspath(os.path.realpath(filename))
|
||||
if inline_logs:
|
||||
logger.info("{:-^100}".format(filename))
|
||||
|
||||
try:
|
||||
with open(filename) as fp:
|
||||
data = fp.read()
|
||||
except Exception as e:
|
||||
data = "Unable to read log data (%s)\n" % (str(e))
|
||||
|
||||
logger.error(data)
|
||||
|
||||
logger.info("{:-^100}".format(filename))
|
||||
else:
|
||||
logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
|
||||
|
||||
def log_info_file(self, inline_logs):
|
||||
build_dir = self.instance.build_dir
|
||||
h_log = "{}/handler.log".format(build_dir)
|
||||
b_log = "{}/build.log".format(build_dir)
|
||||
v_log = "{}/valgrind.log".format(build_dir)
|
||||
d_log = "{}/device.log".format(build_dir)
|
||||
|
||||
if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
|
||||
self.log_info("{}".format(v_log), inline_logs)
|
||||
elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
|
||||
self.log_info("{}".format(h_log), inline_logs)
|
||||
elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
|
||||
self.log_info("{}".format(d_log), inline_logs)
|
||||
else:
|
||||
self.log_info("{}".format(b_log), inline_logs)
|
||||
|
||||
def setup_handler(self):
|
||||
|
||||
instance = self.instance
|
||||
args = []
|
||||
|
||||
# FIXME: Needs simplification
|
||||
if instance.platform.simulation == "qemu":
|
||||
instance.handler = QEMUHandler(instance, "qemu")
|
||||
args.append("QEMU_PIPE=%s" % instance.handler.get_fifo())
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.testsuite.type == "unit":
|
||||
instance.handler = BinaryHandler(instance, "unit")
|
||||
instance.handler.binary = os.path.join(instance.build_dir, "testbinary")
|
||||
if self.coverage:
|
||||
args.append("COVERAGE=1")
|
||||
elif instance.platform.type == "native":
|
||||
handler = BinaryHandler(instance, "native")
|
||||
|
||||
handler.asan = self.asan
|
||||
handler.valgrind = self.valgrind
|
||||
handler.lsan = self.lsan
|
||||
handler.ubsan = self.ubsan
|
||||
handler.coverage = self.coverage
|
||||
|
||||
handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
|
||||
instance.handler = handler
|
||||
elif instance.platform.simulation == "renode":
|
||||
if find_executable("renode"):
|
||||
instance.handler = BinaryHandler(instance, "renode")
|
||||
instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "tsim":
|
||||
instance.handler = BinaryHandler(instance, "tsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif self.device_testing:
|
||||
instance.handler = DeviceHandler(instance, "device")
|
||||
instance.handler.coverage = self.coverage
|
||||
elif instance.platform.simulation == "nsim":
|
||||
if find_executable("nsimdrv"):
|
||||
instance.handler = BinaryHandler(instance, "nsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "mdb-nsim":
|
||||
if find_executable("mdb"):
|
||||
instance.handler = BinaryHandler(instance, "nsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "armfvp":
|
||||
instance.handler = BinaryHandler(instance, "armfvp")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "xt-sim":
|
||||
instance.handler = BinaryHandler(instance, "xt-sim")
|
||||
instance.handler.call_make_run = True
|
||||
|
||||
if instance.handler:
|
||||
instance.handler.args = args
|
||||
instance.handler.generator_cmd = self.generator_cmd
|
||||
instance.handler.generator = self.generator
|
||||
instance.handler.suite_name_check = self.suite_name_check
|
||||
|
||||
def process(self, pipeline, done, message, lock, results):
|
||||
op = message.get('op')
|
||||
|
||||
if not self.instance.handler:
|
||||
self.setup_handler()
|
||||
|
||||
# The build process, call cmake and build with configured generator
|
||||
if op == "cmake":
|
||||
res = self.cmake()
|
||||
if self.instance.status in ["failed", "error"]:
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
elif self.cmake_only:
|
||||
if self.instance.status is None:
|
||||
self.instance.status = "passed"
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
# Here we check the runtime filter results coming from running cmake
|
||||
if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
|
||||
logger.debug("filtering %s" % self.instance.name)
|
||||
self.instance.status = "filtered"
|
||||
self.instance.reason = "runtime filter"
|
||||
results.skipped_runtime += 1
|
||||
self.instance.add_missing_case_status("skipped")
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "build", "test": self.instance})
|
||||
|
||||
elif op == "build":
|
||||
logger.debug("build test: %s" % self.instance.name)
|
||||
res = self.build()
|
||||
if not res:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Build Failure"
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
# Count skipped cases during build, for example
|
||||
# due to ram/rom overflow.
|
||||
if self.instance.status == "skipped":
|
||||
results.skipped_runtime += 1
|
||||
self.instance.add_missing_case_status("skipped", self.instance.reason)
|
||||
|
||||
if res.get('returncode', 1) > 0:
|
||||
self.instance.add_missing_case_status("blocked", self.instance.reason)
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "gather_metrics", "test": self.instance})
|
||||
|
||||
elif op == "gather_metrics":
|
||||
self.gather_metrics(self.instance)
|
||||
if self.instance.run and self.instance.handler:
|
||||
pipeline.put({"op": "run", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
|
||||
# Run the generated binary using one of the supported handlers
|
||||
elif op == "run":
|
||||
logger.debug("run test: %s" % self.instance.name)
|
||||
self.run()
|
||||
logger.debug(f"run status: {self.instance.name} {self.instance.status}")
|
||||
|
||||
# to make it work with pickle
|
||||
self.instance.handler.thread = None
|
||||
self.instance.handler.testplan = None
|
||||
pipeline.put({
|
||||
"op": "report",
|
||||
"test": self.instance,
|
||||
"status": self.instance.status,
|
||||
"reason": self.instance.reason
|
||||
}
|
||||
)
|
||||
|
||||
# Report results and output progress to screen
|
||||
elif op == "report":
|
||||
with lock:
|
||||
done.put(self.instance)
|
||||
self.report_out(results)
|
||||
|
||||
if self.cleanup and not self.coverage and self.instance.status == "passed":
|
||||
pipeline.put({
|
||||
"op": "cleanup",
|
||||
"test": self.instance
|
||||
})
|
||||
|
||||
elif op == "cleanup":
|
||||
if self.device_testing:
|
||||
self.cleanup_device_testing_artifacts()
|
||||
else:
|
||||
self.cleanup_artifacts()
|
||||
|
||||
def cleanup_artifacts(self, additional_keep=[]):
|
||||
logger.debug("Cleaning up {}".format(self.instance.build_dir))
|
||||
allow = [
|
||||
'zephyr/.config',
|
||||
'handler.log',
|
||||
'build.log',
|
||||
'device.log',
|
||||
'recording.csv',
|
||||
# below ones are needed to make --test-only work as well
|
||||
'Makefile',
|
||||
'CMakeCache.txt',
|
||||
'build.ninja',
|
||||
'CMakeFiles/rules.ninja'
|
||||
]
|
||||
|
||||
allow += additional_keep
|
||||
|
||||
allow = [os.path.join(self.instance.build_dir, file) for file in allow]
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
|
||||
for name in filenames:
|
||||
path = os.path.join(dirpath, name)
|
||||
if path not in allow:
|
||||
os.remove(path)
|
||||
# Remove empty directories and symbolic links to directories
|
||||
for dir in dirnames:
|
||||
path = os.path.join(dirpath, dir)
|
||||
if os.path.islink(path):
|
||||
os.remove(path)
|
||||
elif not os.listdir(path):
|
||||
os.rmdir(path)
|
||||
|
||||
def cleanup_device_testing_artifacts(self):
|
||||
logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
|
||||
|
||||
sanitizelist = [
|
||||
'CMakeCache.txt',
|
||||
'zephyr/runners.yaml',
|
||||
]
|
||||
keep = [
|
||||
'zephyr/zephyr.hex',
|
||||
'zephyr/zephyr.bin',
|
||||
'zephyr/zephyr.elf',
|
||||
]
|
||||
|
||||
keep += sanitizelist
|
||||
|
||||
self.cleanup_artifacts(keep)
|
||||
|
||||
# sanitize paths so files are relocatable
|
||||
for file in sanitizelist:
|
||||
file = os.path.join(self.instance.build_dir, file)
|
||||
|
||||
with open(file, "rt") as fin:
|
||||
data = fin.read()
|
||||
data = data.replace(canonical_zephyr_base+"/", "")
|
||||
|
||||
with open(file, "wt") as fin:
|
||||
fin.write(data)
|
||||
|
||||
def report_out(self, results):
|
||||
total_to_do = results.total
|
||||
total_tests_width = len(str(total_to_do))
|
||||
results.done += 1
|
||||
instance = self.instance
|
||||
|
||||
if instance.status in ["error", "failed"]:
|
||||
if instance.status == "error":
|
||||
results.error += 1
|
||||
else:
|
||||
results.failed += 1
|
||||
if self.verbose:
|
||||
status = Fore.RED + "FAILED " + Fore.RESET + instance.reason
|
||||
else:
|
||||
print("")
|
||||
logger.error(
|
||||
"{:<25} {:<50} {}FAILED{}: {}".format(
|
||||
instance.platform.name,
|
||||
instance.testsuite.name,
|
||||
Fore.RED,
|
||||
Fore.RESET,
|
||||
instance.reason))
|
||||
if not self.verbose:
|
||||
self.log_info_file(self.inline_logs)
|
||||
elif instance.status in ["skipped", "filtered"]:
|
||||
status = Fore.YELLOW + "SKIPPED" + Fore.RESET
|
||||
results.skipped_configs += 1
|
||||
results.skipped_cases += len(instance.testsuite.testcases)
|
||||
elif instance.status == "passed":
|
||||
status = Fore.GREEN + "PASSED" + Fore.RESET
|
||||
results.passed += 1
|
||||
for case in instance.testcases:
|
||||
if case.status == 'skipped':
|
||||
results.skipped_cases += 1
|
||||
else:
|
||||
logger.debug(f"Unknown status = {instance.status}")
|
||||
status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
|
||||
|
||||
if self.verbose:
|
||||
if self.cmake_only:
|
||||
more_info = "cmake"
|
||||
elif instance.status in ["skipped", "filtered"]:
|
||||
more_info = instance.reason
|
||||
else:
|
||||
if instance.handler and instance.run:
|
||||
more_info = instance.handler.type_str
|
||||
htime = instance.execution_time
|
||||
if htime:
|
||||
more_info += " {:.3f}s".format(htime)
|
||||
else:
|
||||
more_info = "build"
|
||||
|
||||
if ( instance.status in ["error", "failed", "timeout", "flash_error"]
|
||||
and hasattr(self.instance.handler, 'seed')
|
||||
and self.instance.handler.seed is not None ):
|
||||
more_info += "/seed: " + str(self.seed)
|
||||
|
||||
logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
|
||||
results.done + results.skipped_filter, total_tests_width, total_to_do , instance.platform.name,
|
||||
instance.testsuite.name, status, more_info))
|
||||
|
||||
if instance.status in ["error", "failed", "timeout"]:
|
||||
self.log_info_file(self.inline_logs)
|
||||
else:
|
||||
completed_perc = 0
|
||||
if total_to_do > 0:
|
||||
completed_perc = int((float(results.done + results.skipped_filter) / total_to_do) * 100)
|
||||
|
||||
sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % (
|
||||
Fore.GREEN,
|
||||
results.done + results.skipped_filter,
|
||||
total_to_do,
|
||||
Fore.RESET,
|
||||
completed_perc,
|
||||
Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
|
||||
results.skipped_filter + results.skipped_runtime,
|
||||
Fore.RESET,
|
||||
Fore.RED if results.failed > 0 else Fore.RESET,
|
||||
results.failed,
|
||||
Fore.RESET
|
||||
)
|
||||
)
|
||||
sys.stdout.flush()
|
||||
|
||||
def cmake(self):
|
||||
|
||||
instance = self.instance
|
||||
args = self.testsuite.extra_args[:]
|
||||
args += self.extra_args
|
||||
|
||||
if instance.handler:
|
||||
args += instance.handler.args
|
||||
|
||||
# merge overlay files into one variable
|
||||
def extract_overlays(args):
|
||||
re_overlay = re.compile('OVERLAY_CONFIG=(.*)')
|
||||
other_args = []
|
||||
overlays = []
|
||||
for arg in args:
|
||||
match = re_overlay.search(arg)
|
||||
if match:
|
||||
overlays.append(match.group(1).strip('\'"'))
|
||||
else:
|
||||
other_args.append(arg)
|
||||
|
||||
args[:] = other_args
|
||||
return overlays
|
||||
|
||||
overlays = extract_overlays(args)
|
||||
|
||||
if os.path.exists(os.path.join(instance.build_dir,
|
||||
"twister", "testsuite_extra.conf")):
|
||||
overlays.append(os.path.join(instance.build_dir,
|
||||
"twister", "testsuite_extra.conf"))
|
||||
|
||||
if overlays:
|
||||
args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
|
||||
|
||||
res = self.run_cmake(args)
|
||||
return res
|
||||
|
||||
def build(self):
|
||||
res = self.run_build(['--build', self.build_dir])
|
||||
return res
|
||||
|
||||
def run(self):
|
||||
|
||||
instance = self.instance
|
||||
|
||||
if instance.handler:
|
||||
if instance.handler.type_str == "device":
|
||||
instance.handler.testplan = self.testplan
|
||||
|
||||
if(self.seed is not None and instance.platform.name.startswith("native_posix")):
|
||||
self.parse_generated()
|
||||
if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
|
||||
self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
|
||||
instance.handler.seed = self.seed
|
||||
|
||||
instance.handler.handle()
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
def gather_metrics(self, instance):
|
||||
if self.testplan.enable_size_report and not self.testplan.cmake_only:
|
||||
self.calc_one_elf_size(instance)
|
||||
else:
|
||||
instance.metrics["ram_size"] = 0
|
||||
instance.metrics["rom_size"] = 0
|
||||
instance.metrics["unrecognized"] = []
|
||||
|
||||
@staticmethod
|
||||
def calc_one_elf_size(instance):
|
||||
if instance.status not in ["error", "failed", "skipped"]:
|
||||
if instance.platform.type != "native":
|
||||
size_calc = instance.calculate_sizes()
|
||||
instance.metrics["ram_size"] = size_calc.get_ram_size()
|
||||
instance.metrics["rom_size"] = size_calc.get_rom_size()
|
||||
instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
|
||||
else:
|
||||
instance.metrics["ram_size"] = 0
|
||||
instance.metrics["rom_size"] = 0
|
||||
instance.metrics["unrecognized"] = []
|
||||
|
||||
instance.metrics["handler_time"] = instance.execution_time
|
|
@ -17,8 +17,8 @@ import glob
|
|||
import logging
|
||||
from distutils.spawn import find_executable
|
||||
import colorama
|
||||
from colorama import Fore
|
||||
import pickle
|
||||
|
||||
|
||||
|
||||
import json
|
||||
from multiprocessing import Lock, Process, Value
|
||||
|
@ -32,6 +32,7 @@ from twister.platform import Platform
|
|||
from twister.config_parser import TwisterConfigParser
|
||||
from twister.size_calc import SizeCalculator
|
||||
from twister.testinstance import TestInstance
|
||||
from twister.runner import ExecutionCounter, ProjectBuilder
|
||||
|
||||
try:
|
||||
# Use the C LibYAML parser if available, rather than the Python parser.
|
||||
|
@ -55,139 +56,12 @@ from twister.enviornment import TwisterEnv, canonical_zephyr_base
|
|||
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
|
||||
|
||||
import scl
|
||||
import expr_parser
|
||||
|
||||
|
||||
logger = logging.getLogger('twister')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class ExecutionCounter(object):
|
||||
def __init__(self, total=0):
|
||||
self._done = Value('i', 0)
|
||||
self._passed = Value('i', 0)
|
||||
self._skipped_configs = Value('i', 0)
|
||||
self._skipped_runtime = Value('i', 0)
|
||||
self._skipped_filter = Value('i', 0)
|
||||
self._skipped_cases = Value('i', 0)
|
||||
self._error = Value('i', 0)
|
||||
self._failed = Value('i', 0)
|
||||
self._total = Value('i', total)
|
||||
self._cases = Value('i', 0)
|
||||
|
||||
|
||||
self.lock = Lock()
|
||||
|
||||
|
||||
def summary(self):
|
||||
logger.debug("--------------------------------")
|
||||
logger.debug(f"Total Test suites: {self.total}")
|
||||
logger.debug(f"Total Test cases: {self.cases}")
|
||||
logger.debug(f"Skipped test cases: {self.skipped_cases}")
|
||||
logger.debug(f"Completed Testsuites: {self.done}")
|
||||
logger.debug(f"Passing Testsuites: {self.passed}")
|
||||
logger.debug(f"Failing Testsuites: {self.failed}")
|
||||
logger.debug(f"Skipped Testsuites: {self.skipped_configs}")
|
||||
logger.debug(f"Skipped Testsuites (runtime): {self.skipped_runtime}")
|
||||
logger.debug(f"Skipped Testsuites (filter): {self.skipped_filter}")
|
||||
logger.debug(f"Errors: {self.error}")
|
||||
logger.debug("--------------------------------")
|
||||
|
||||
@property
|
||||
def cases(self):
|
||||
with self._cases.get_lock():
|
||||
return self._cases.value
|
||||
|
||||
@cases.setter
|
||||
def cases(self, value):
|
||||
with self._cases.get_lock():
|
||||
self._cases.value = value
|
||||
|
||||
@property
|
||||
def skipped_cases(self):
|
||||
with self._skipped_cases.get_lock():
|
||||
return self._skipped_cases.value
|
||||
|
||||
@skipped_cases.setter
|
||||
def skipped_cases(self, value):
|
||||
with self._skipped_cases.get_lock():
|
||||
self._skipped_cases.value = value
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
with self._error.get_lock():
|
||||
return self._error.value
|
||||
|
||||
@error.setter
|
||||
def error(self, value):
|
||||
with self._error.get_lock():
|
||||
self._error.value = value
|
||||
|
||||
@property
|
||||
def done(self):
|
||||
with self._done.get_lock():
|
||||
return self._done.value
|
||||
|
||||
@done.setter
|
||||
def done(self, value):
|
||||
with self._done.get_lock():
|
||||
self._done.value = value
|
||||
|
||||
@property
|
||||
def passed(self):
|
||||
with self._passed.get_lock():
|
||||
return self._passed.value
|
||||
|
||||
@passed.setter
|
||||
def passed(self, value):
|
||||
with self._passed.get_lock():
|
||||
self._passed.value = value
|
||||
|
||||
@property
|
||||
def skipped_configs(self):
|
||||
with self._skipped_configs.get_lock():
|
||||
return self._skipped_configs.value
|
||||
|
||||
@skipped_configs.setter
|
||||
def skipped_configs(self, value):
|
||||
with self._skipped_configs.get_lock():
|
||||
self._skipped_configs.value = value
|
||||
|
||||
@property
|
||||
def skipped_filter(self):
|
||||
with self._skipped_filter.get_lock():
|
||||
return self._skipped_filter.value
|
||||
|
||||
@skipped_filter.setter
|
||||
def skipped_filter(self, value):
|
||||
with self._skipped_filter.get_lock():
|
||||
self._skipped_filter.value = value
|
||||
|
||||
@property
|
||||
def skipped_runtime(self):
|
||||
with self._skipped_runtime.get_lock():
|
||||
return self._skipped_runtime.value
|
||||
|
||||
@skipped_runtime.setter
|
||||
def skipped_runtime(self, value):
|
||||
with self._skipped_runtime.get_lock():
|
||||
self._skipped_runtime.value = value
|
||||
|
||||
@property
|
||||
def failed(self):
|
||||
with self._failed.get_lock():
|
||||
return self._failed.value
|
||||
|
||||
@failed.setter
|
||||
def failed(self, value):
|
||||
with self._failed.get_lock():
|
||||
self._failed.value = value
|
||||
|
||||
@property
|
||||
def total(self):
|
||||
with self._total.get_lock():
|
||||
return self._total.value
|
||||
|
||||
|
||||
class ScanPathResult:
|
||||
"""Result of the TestSuite.scan_path function call.
|
||||
|
||||
|
@ -232,669 +106,6 @@ class ScanPathResult:
|
|||
(sorted(self.ztest_suite_names) ==
|
||||
sorted(other.ztest_suite_names)))
|
||||
|
||||
class CMake:
|
||||
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
||||
dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
||||
|
||||
def __init__(self, testsuite, platform, source_dir, build_dir):
|
||||
|
||||
self.cwd = None
|
||||
self.capture_output = True
|
||||
|
||||
self.defconfig = {}
|
||||
self.cmake_cache = {}
|
||||
|
||||
self.instance = None
|
||||
self.testsuite = testsuite
|
||||
self.platform = platform
|
||||
self.source_dir = source_dir
|
||||
self.build_dir = build_dir
|
||||
self.log = "build.log"
|
||||
self.generator = None
|
||||
self.generator_cmd = None
|
||||
|
||||
self.default_encoding = sys.getdefaultencoding()
|
||||
|
||||
def parse_generated(self):
|
||||
self.defconfig = {}
|
||||
return {}
|
||||
|
||||
def run_build(self, args=[]):
|
||||
|
||||
logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
|
||||
|
||||
cmake_args = []
|
||||
cmake_args.extend(args)
|
||||
cmake = shutil.which('cmake')
|
||||
cmd = [cmake] + cmake_args
|
||||
kwargs = dict()
|
||||
|
||||
if self.capture_output:
|
||||
kwargs['stdout'] = subprocess.PIPE
|
||||
# CMake sends the output of message() to stderr unless it's STATUS
|
||||
kwargs['stderr'] = subprocess.STDOUT
|
||||
|
||||
if self.cwd:
|
||||
kwargs['cwd'] = self.cwd
|
||||
|
||||
p = subprocess.Popen(cmd, **kwargs)
|
||||
out, _ = p.communicate()
|
||||
|
||||
results = {}
|
||||
if p.returncode == 0:
|
||||
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
|
||||
|
||||
self.instance.status = "passed"
|
||||
if not self.instance.run:
|
||||
self.instance.add_missing_case_status("skipped", "Test was built only")
|
||||
results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
|
||||
|
||||
if out:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log.write(log_msg)
|
||||
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
# A real error occurred, raise an exception
|
||||
log_msg = ""
|
||||
if out:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log.write(log_msg)
|
||||
|
||||
if log_msg:
|
||||
overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg)
|
||||
if overflow_found and not self.overflow_as_errors:
|
||||
logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
|
||||
self.instance.status = "skipped"
|
||||
self.instance.reason = "{} overflow".format(overflow_found[0])
|
||||
else:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Build failure"
|
||||
|
||||
results = {
|
||||
"returncode": p.returncode,
|
||||
"instance": self.instance,
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
def run_cmake(self, args=[]):
|
||||
|
||||
if self.warnings_as_errors:
|
||||
ldflags = "-Wl,--fatal-warnings"
|
||||
cflags = "-Werror"
|
||||
aflags = "-Werror -Wa,--fatal-warnings"
|
||||
gen_defines_args = "--edtlib-Werror"
|
||||
else:
|
||||
ldflags = cflags = aflags = ""
|
||||
gen_defines_args = ""
|
||||
|
||||
logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
|
||||
cmake_args = [
|
||||
f'-B{self.build_dir}',
|
||||
f'-S{self.source_dir}',
|
||||
f'-DTC_RUNID={self.instance.run_id}',
|
||||
f'-DEXTRA_CFLAGS={cflags}',
|
||||
f'-DEXTRA_AFLAGS={aflags}',
|
||||
f'-DEXTRA_LDFLAGS={ldflags}',
|
||||
f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
|
||||
f'-G{self.generator}'
|
||||
]
|
||||
|
||||
args = ["-D{}".format(a.replace('"', '')) for a in args]
|
||||
cmake_args.extend(args)
|
||||
|
||||
cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
|
||||
cmake_args.extend(cmake_opts)
|
||||
|
||||
|
||||
logger.debug("Calling cmake with arguments: {}".format(cmake_args))
|
||||
cmake = shutil.which('cmake')
|
||||
cmd = [cmake] + cmake_args
|
||||
kwargs = dict()
|
||||
|
||||
if self.capture_output:
|
||||
kwargs['stdout'] = subprocess.PIPE
|
||||
# CMake sends the output of message() to stderr unless it's STATUS
|
||||
kwargs['stderr'] = subprocess.STDOUT
|
||||
|
||||
if self.cwd:
|
||||
kwargs['cwd'] = self.cwd
|
||||
|
||||
p = subprocess.Popen(cmd, **kwargs)
|
||||
out, _ = p.communicate()
|
||||
|
||||
if p.returncode == 0:
|
||||
filter_results = self.parse_generated()
|
||||
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
|
||||
logger.debug(msg)
|
||||
results = {'msg': msg, 'filter': filter_results}
|
||||
|
||||
else:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Cmake build failure"
|
||||
|
||||
for tc in self.instance.testcases:
|
||||
tc.status = self.instance.status
|
||||
|
||||
logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
|
||||
results = {"returncode": p.returncode}
|
||||
|
||||
if out:
|
||||
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
|
||||
log_msg = out.decode(self.default_encoding)
|
||||
log.write(log_msg)
|
||||
|
||||
return results
|
||||
|
||||
class FilterBuilder(CMake):
|
||||
|
||||
def __init__(self, testsuite, platform, source_dir, build_dir):
|
||||
super().__init__(testsuite, platform, source_dir, build_dir)
|
||||
|
||||
self.log = "config-twister.log"
|
||||
|
||||
def parse_generated(self):
|
||||
|
||||
if self.platform.name == "unit_testing":
|
||||
return {}
|
||||
|
||||
cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
|
||||
defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
|
||||
|
||||
with open(defconfig_path, "r") as fp:
|
||||
defconfig = {}
|
||||
for line in fp.readlines():
|
||||
m = self.config_re.match(line)
|
||||
if not m:
|
||||
if line.strip() and not line.startswith("#"):
|
||||
sys.stderr.write("Unrecognized line %s\n" % line)
|
||||
continue
|
||||
defconfig[m.group(1)] = m.group(2).strip()
|
||||
|
||||
self.defconfig = defconfig
|
||||
|
||||
cmake_conf = {}
|
||||
try:
|
||||
cache = CMakeCache.from_file(cmake_cache_path)
|
||||
except FileNotFoundError:
|
||||
cache = {}
|
||||
|
||||
for k in iter(cache):
|
||||
cmake_conf[k.name] = k.value
|
||||
|
||||
self.cmake_cache = cmake_conf
|
||||
|
||||
filter_data = {
|
||||
"ARCH": self.platform.arch,
|
||||
"PLATFORM": self.platform.name
|
||||
}
|
||||
filter_data.update(os.environ)
|
||||
filter_data.update(self.defconfig)
|
||||
filter_data.update(self.cmake_cache)
|
||||
|
||||
edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
|
||||
if self.testsuite and self.testsuite.ts_filter:
|
||||
try:
|
||||
if os.path.exists(edt_pickle):
|
||||
with open(edt_pickle, 'rb') as f:
|
||||
edt = pickle.load(f)
|
||||
else:
|
||||
edt = None
|
||||
res = expr_parser.parse(self.testsuite.ts_filter, filter_data, edt)
|
||||
|
||||
except (ValueError, SyntaxError) as se:
|
||||
sys.stderr.write(
|
||||
"Failed processing %s\n" % self.testsuite.yamlfile)
|
||||
raise se
|
||||
|
||||
if not res:
|
||||
return {os.path.join(self.platform.name, self.testsuite.name): True}
|
||||
else:
|
||||
return {os.path.join(self.platform.name, self.testsuite.name): False}
|
||||
else:
|
||||
self.platform.filter_data = filter_data
|
||||
return filter_data
|
||||
|
||||
|
||||
class ProjectBuilder(FilterBuilder):
|
||||
|
||||
def __init__(self, tplan, instance, **kwargs):
|
||||
super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir)
|
||||
|
||||
self.log = "build.log"
|
||||
self.instance = instance
|
||||
self.testplan = tplan
|
||||
self.filtered_tests = 0
|
||||
|
||||
self.lsan = kwargs.get('lsan', False)
|
||||
self.asan = kwargs.get('asan', False)
|
||||
self.ubsan = kwargs.get('ubsan', False)
|
||||
self.valgrind = kwargs.get('valgrind', False)
|
||||
self.extra_args = kwargs.get('extra_args', [])
|
||||
self.device_testing = kwargs.get('device_testing', False)
|
||||
self.cmake_only = kwargs.get('cmake_only', False)
|
||||
self.cleanup = kwargs.get('cleanup', False)
|
||||
self.coverage = kwargs.get('coverage', False)
|
||||
self.inline_logs = kwargs.get('inline_logs', False)
|
||||
self.generator = kwargs.get('generator', None)
|
||||
self.generator_cmd = kwargs.get('generator_cmd', None)
|
||||
self.verbose = kwargs.get('verbose', None)
|
||||
self.warnings_as_errors = kwargs.get('warnings_as_errors', True)
|
||||
self.overflow_as_errors = kwargs.get('overflow_as_errors', False)
|
||||
self.suite_name_check = kwargs.get('suite_name_check', True)
|
||||
self.seed = kwargs.get('seed', 0)
|
||||
|
||||
@staticmethod
|
||||
def log_info(filename, inline_logs):
|
||||
filename = os.path.abspath(os.path.realpath(filename))
|
||||
if inline_logs:
|
||||
logger.info("{:-^100}".format(filename))
|
||||
|
||||
try:
|
||||
with open(filename) as fp:
|
||||
data = fp.read()
|
||||
except Exception as e:
|
||||
data = "Unable to read log data (%s)\n" % (str(e))
|
||||
|
||||
logger.error(data)
|
||||
|
||||
logger.info("{:-^100}".format(filename))
|
||||
else:
|
||||
logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
|
||||
|
||||
def log_info_file(self, inline_logs):
|
||||
build_dir = self.instance.build_dir
|
||||
h_log = "{}/handler.log".format(build_dir)
|
||||
b_log = "{}/build.log".format(build_dir)
|
||||
v_log = "{}/valgrind.log".format(build_dir)
|
||||
d_log = "{}/device.log".format(build_dir)
|
||||
|
||||
if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
|
||||
self.log_info("{}".format(v_log), inline_logs)
|
||||
elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
|
||||
self.log_info("{}".format(h_log), inline_logs)
|
||||
elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
|
||||
self.log_info("{}".format(d_log), inline_logs)
|
||||
else:
|
||||
self.log_info("{}".format(b_log), inline_logs)
|
||||
|
||||
def setup_handler(self):
|
||||
|
||||
instance = self.instance
|
||||
args = []
|
||||
|
||||
# FIXME: Needs simplification
|
||||
if instance.platform.simulation == "qemu":
|
||||
instance.handler = QEMUHandler(instance, "qemu")
|
||||
args.append("QEMU_PIPE=%s" % instance.handler.get_fifo())
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.testsuite.type == "unit":
|
||||
instance.handler = BinaryHandler(instance, "unit")
|
||||
instance.handler.binary = os.path.join(instance.build_dir, "testbinary")
|
||||
if self.coverage:
|
||||
args.append("COVERAGE=1")
|
||||
elif instance.platform.type == "native":
|
||||
handler = BinaryHandler(instance, "native")
|
||||
|
||||
handler.asan = self.asan
|
||||
handler.valgrind = self.valgrind
|
||||
handler.lsan = self.lsan
|
||||
handler.ubsan = self.ubsan
|
||||
handler.coverage = self.coverage
|
||||
|
||||
handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
|
||||
instance.handler = handler
|
||||
elif instance.platform.simulation == "renode":
|
||||
if find_executable("renode"):
|
||||
instance.handler = BinaryHandler(instance, "renode")
|
||||
instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "tsim":
|
||||
instance.handler = BinaryHandler(instance, "tsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif self.device_testing:
|
||||
instance.handler = DeviceHandler(instance, "device")
|
||||
instance.handler.coverage = self.coverage
|
||||
elif instance.platform.simulation == "nsim":
|
||||
if find_executable("nsimdrv"):
|
||||
instance.handler = BinaryHandler(instance, "nsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "mdb-nsim":
|
||||
if find_executable("mdb"):
|
||||
instance.handler = BinaryHandler(instance, "nsim")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "armfvp":
|
||||
instance.handler = BinaryHandler(instance, "armfvp")
|
||||
instance.handler.call_make_run = True
|
||||
elif instance.platform.simulation == "xt-sim":
|
||||
instance.handler = BinaryHandler(instance, "xt-sim")
|
||||
instance.handler.call_make_run = True
|
||||
|
||||
if instance.handler:
|
||||
instance.handler.args = args
|
||||
instance.handler.generator_cmd = self.generator_cmd
|
||||
instance.handler.generator = self.generator
|
||||
instance.handler.suite_name_check = self.suite_name_check
|
||||
|
||||
def process(self, pipeline, done, message, lock, results):
|
||||
op = message.get('op')
|
||||
|
||||
if not self.instance.handler:
|
||||
self.setup_handler()
|
||||
|
||||
# The build process, call cmake and build with configured generator
|
||||
if op == "cmake":
|
||||
res = self.cmake()
|
||||
if self.instance.status in ["failed", "error"]:
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
elif self.cmake_only:
|
||||
if self.instance.status is None:
|
||||
self.instance.status = "passed"
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
# Here we check the runtime filter results coming from running cmake
|
||||
if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
|
||||
logger.debug("filtering %s" % self.instance.name)
|
||||
self.instance.status = "filtered"
|
||||
self.instance.reason = "runtime filter"
|
||||
results.skipped_runtime += 1
|
||||
self.instance.add_missing_case_status("skipped")
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "build", "test": self.instance})
|
||||
|
||||
elif op == "build":
|
||||
logger.debug("build test: %s" % self.instance.name)
|
||||
res = self.build()
|
||||
if not res:
|
||||
self.instance.status = "error"
|
||||
self.instance.reason = "Build Failure"
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
# Count skipped cases during build, for example
|
||||
# due to ram/rom overflow.
|
||||
if self.instance.status == "skipped":
|
||||
results.skipped_runtime += 1
|
||||
self.instance.add_missing_case_status("skipped", self.instance.reason)
|
||||
|
||||
if res.get('returncode', 1) > 0:
|
||||
self.instance.add_missing_case_status("blocked", self.instance.reason)
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "gather_metrics", "test": self.instance})
|
||||
|
||||
elif op == "gather_metrics":
|
||||
self.gather_metrics(self.instance)
|
||||
if self.instance.run and self.instance.handler:
|
||||
pipeline.put({"op": "run", "test": self.instance})
|
||||
else:
|
||||
pipeline.put({"op": "report", "test": self.instance})
|
||||
|
||||
# Run the generated binary using one of the supported handlers
|
||||
elif op == "run":
|
||||
logger.debug("run test: %s" % self.instance.name)
|
||||
self.run()
|
||||
logger.debug(f"run status: {self.instance.name} {self.instance.status}")
|
||||
|
||||
# to make it work with pickle
|
||||
self.instance.handler.thread = None
|
||||
self.instance.handler.testplan = None
|
||||
pipeline.put({
|
||||
"op": "report",
|
||||
"test": self.instance,
|
||||
"status": self.instance.status,
|
||||
"reason": self.instance.reason
|
||||
}
|
||||
)
|
||||
|
||||
# Report results and output progress to screen
|
||||
elif op == "report":
|
||||
with lock:
|
||||
done.put(self.instance)
|
||||
self.report_out(results)
|
||||
|
||||
if self.cleanup and not self.coverage and self.instance.status == "passed":
|
||||
pipeline.put({
|
||||
"op": "cleanup",
|
||||
"test": self.instance
|
||||
})
|
||||
|
||||
elif op == "cleanup":
|
||||
if self.device_testing:
|
||||
self.cleanup_device_testing_artifacts()
|
||||
else:
|
||||
self.cleanup_artifacts()
|
||||
|
||||
def cleanup_artifacts(self, additional_keep=[]):
|
||||
logger.debug("Cleaning up {}".format(self.instance.build_dir))
|
||||
allow = [
|
||||
'zephyr/.config',
|
||||
'handler.log',
|
||||
'build.log',
|
||||
'device.log',
|
||||
'recording.csv',
|
||||
# below ones are needed to make --test-only work as well
|
||||
'Makefile',
|
||||
'CMakeCache.txt',
|
||||
'build.ninja',
|
||||
'CMakeFiles/rules.ninja'
|
||||
]
|
||||
|
||||
allow += additional_keep
|
||||
|
||||
allow = [os.path.join(self.instance.build_dir, file) for file in allow]
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
|
||||
for name in filenames:
|
||||
path = os.path.join(dirpath, name)
|
||||
if path not in allow:
|
||||
os.remove(path)
|
||||
# Remove empty directories and symbolic links to directories
|
||||
for dir in dirnames:
|
||||
path = os.path.join(dirpath, dir)
|
||||
if os.path.islink(path):
|
||||
os.remove(path)
|
||||
elif not os.listdir(path):
|
||||
os.rmdir(path)
|
||||
|
||||
def cleanup_device_testing_artifacts(self):
|
||||
logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
|
||||
|
||||
sanitizelist = [
|
||||
'CMakeCache.txt',
|
||||
'zephyr/runners.yaml',
|
||||
]
|
||||
keep = [
|
||||
'zephyr/zephyr.hex',
|
||||
'zephyr/zephyr.bin',
|
||||
'zephyr/zephyr.elf',
|
||||
]
|
||||
|
||||
keep += sanitizelist
|
||||
|
||||
self.cleanup_artifacts(keep)
|
||||
|
||||
# sanitize paths so files are relocatable
|
||||
for file in sanitizelist:
|
||||
file = os.path.join(self.instance.build_dir, file)
|
||||
|
||||
with open(file, "rt") as fin:
|
||||
data = fin.read()
|
||||
data = data.replace(canonical_zephyr_base+"/", "")
|
||||
|
||||
with open(file, "wt") as fin:
|
||||
fin.write(data)
|
||||
|
||||
def report_out(self, results):
|
||||
total_to_do = results.total
|
||||
total_tests_width = len(str(total_to_do))
|
||||
results.done += 1
|
||||
instance = self.instance
|
||||
|
||||
if instance.status in ["error", "failed"]:
|
||||
if instance.status == "error":
|
||||
results.error += 1
|
||||
else:
|
||||
results.failed += 1
|
||||
if self.verbose:
|
||||
status = Fore.RED + "FAILED " + Fore.RESET + instance.reason
|
||||
else:
|
||||
print("")
|
||||
logger.error(
|
||||
"{:<25} {:<50} {}FAILED{}: {}".format(
|
||||
instance.platform.name,
|
||||
instance.testsuite.name,
|
||||
Fore.RED,
|
||||
Fore.RESET,
|
||||
instance.reason))
|
||||
if not self.verbose:
|
||||
self.log_info_file(self.inline_logs)
|
||||
elif instance.status in ["skipped", "filtered"]:
|
||||
status = Fore.YELLOW + "SKIPPED" + Fore.RESET
|
||||
results.skipped_configs += 1
|
||||
results.skipped_cases += len(instance.testsuite.testcases)
|
||||
elif instance.status == "passed":
|
||||
status = Fore.GREEN + "PASSED" + Fore.RESET
|
||||
results.passed += 1
|
||||
for case in instance.testcases:
|
||||
if case.status == 'skipped':
|
||||
results.skipped_cases += 1
|
||||
else:
|
||||
logger.debug(f"Unknown status = {instance.status}")
|
||||
status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
|
||||
|
||||
if self.verbose:
|
||||
if self.cmake_only:
|
||||
more_info = "cmake"
|
||||
elif instance.status in ["skipped", "filtered"]:
|
||||
more_info = instance.reason
|
||||
else:
|
||||
if instance.handler and instance.run:
|
||||
more_info = instance.handler.type_str
|
||||
htime = instance.execution_time
|
||||
if htime:
|
||||
more_info += " {:.3f}s".format(htime)
|
||||
else:
|
||||
more_info = "build"
|
||||
|
||||
if ( instance.status in ["error", "failed", "timeout", "flash_error"]
|
||||
and hasattr(self.instance.handler, 'seed')
|
||||
and self.instance.handler.seed is not None ):
|
||||
more_info += "/seed: " + str(self.seed)
|
||||
|
||||
logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
|
||||
results.done + results.skipped_filter, total_tests_width, total_to_do , instance.platform.name,
|
||||
instance.testsuite.name, status, more_info))
|
||||
|
||||
if instance.status in ["error", "failed", "timeout"]:
|
||||
self.log_info_file(self.inline_logs)
|
||||
else:
|
||||
completed_perc = 0
|
||||
if total_to_do > 0:
|
||||
completed_perc = int((float(results.done + results.skipped_filter) / total_to_do) * 100)
|
||||
|
||||
sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % (
|
||||
Fore.GREEN,
|
||||
results.done + results.skipped_filter,
|
||||
total_to_do,
|
||||
Fore.RESET,
|
||||
completed_perc,
|
||||
Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
|
||||
results.skipped_filter + results.skipped_runtime,
|
||||
Fore.RESET,
|
||||
Fore.RED if results.failed > 0 else Fore.RESET,
|
||||
results.failed,
|
||||
Fore.RESET
|
||||
)
|
||||
)
|
||||
sys.stdout.flush()
|
||||
|
||||
def cmake(self):
|
||||
|
||||
instance = self.instance
|
||||
args = self.testsuite.extra_args[:]
|
||||
args += self.extra_args
|
||||
|
||||
if instance.handler:
|
||||
args += instance.handler.args
|
||||
|
||||
# merge overlay files into one variable
|
||||
def extract_overlays(args):
|
||||
re_overlay = re.compile('OVERLAY_CONFIG=(.*)')
|
||||
other_args = []
|
||||
overlays = []
|
||||
for arg in args:
|
||||
match = re_overlay.search(arg)
|
||||
if match:
|
||||
overlays.append(match.group(1).strip('\'"'))
|
||||
else:
|
||||
other_args.append(arg)
|
||||
|
||||
args[:] = other_args
|
||||
return overlays
|
||||
|
||||
overlays = extract_overlays(args)
|
||||
|
||||
if os.path.exists(os.path.join(instance.build_dir,
|
||||
"twister", "testsuite_extra.conf")):
|
||||
overlays.append(os.path.join(instance.build_dir,
|
||||
"twister", "testsuite_extra.conf"))
|
||||
|
||||
if overlays:
|
||||
args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
|
||||
|
||||
res = self.run_cmake(args)
|
||||
return res
|
||||
|
||||
def build(self):
|
||||
res = self.run_build(['--build', self.build_dir])
|
||||
return res
|
||||
|
||||
def run(self):
|
||||
|
||||
instance = self.instance
|
||||
|
||||
if instance.handler:
|
||||
if instance.handler.type_str == "device":
|
||||
instance.handler.testplan = self.testplan
|
||||
|
||||
if(self.seed is not None and instance.platform.name.startswith("native_posix")):
|
||||
self.parse_generated()
|
||||
if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
|
||||
self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
|
||||
instance.handler.seed = self.seed
|
||||
|
||||
instance.handler.handle()
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
def gather_metrics(self, instance):
|
||||
if self.testplan.enable_size_report and not self.testplan.cmake_only:
|
||||
self.calc_one_elf_size(instance)
|
||||
else:
|
||||
instance.metrics["ram_size"] = 0
|
||||
instance.metrics["rom_size"] = 0
|
||||
instance.metrics["unrecognized"] = []
|
||||
|
||||
@staticmethod
|
||||
def calc_one_elf_size(instance):
|
||||
if instance.status not in ["error", "failed", "skipped"]:
|
||||
if instance.platform.type != "native":
|
||||
size_calc = instance.calculate_sizes()
|
||||
instance.metrics["ram_size"] = size_calc.get_ram_size()
|
||||
instance.metrics["rom_size"] = size_calc.get_rom_size()
|
||||
instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
|
||||
else:
|
||||
instance.metrics["ram_size"] = 0
|
||||
instance.metrics["rom_size"] = 0
|
||||
instance.metrics["unrecognized"] = []
|
||||
|
||||
instance.metrics["handler_time"] = instance.execution_time
|
||||
|
||||
class Filters:
|
||||
# filters provided on command line by the user/tester
|
||||
CMD_LINE = 'command line filter'
|
||||
|
|
|
@ -206,7 +206,8 @@ except ImportError:
|
|||
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
|
||||
|
||||
import twisterlib
|
||||
from twisterlib import TestPlan, ExecutionCounter
|
||||
from twisterlib import TestPlan
|
||||
from twister.runner import ExecutionCounter
|
||||
from twister.enviornment import TwisterEnv, canonical_zephyr_base
|
||||
from twister.reports import Reporting
|
||||
from twister.hardwaremap import HardwareMap
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue