sanitycheck: pylint issues

run through pylint and fix.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2019-12-12 09:58:28 -05:00
commit d988238970

View file

@ -182,7 +182,6 @@ import concurrent.futures
from threading import BoundedSemaphore
import queue
import time
import datetime
import csv
import yaml
import glob
@ -194,6 +193,7 @@ from collections import OrderedDict
from itertools import islice
from pathlib import Path
from distutils.spawn import find_executable
try:
from anytree import Node, RenderTree, find
except ImportError:
@ -214,8 +214,6 @@ import edtlib
hw_map_local = threading.Lock()
report_lock = threading.Lock()
# Use this for internal comparisons; that's what canonicalization is
# for. Don't use it when invoking other components of the build system
# to avoid confusing and hard to trace inconsistencies in error messages
@ -229,7 +227,6 @@ sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
from sanity_chk import scl
from sanity_chk import expr_parser
VERBOSE = 0
RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk",
@ -248,14 +245,13 @@ else:
COLOR_GREEN = ""
COLOR_YELLOW = ""
logger = logging.getLogger('sanitycheck')
#coloredlogs.install(level='INFO', logger=logger, fmt="%(levelname)s %(message)s")
# coloredlogs.install(level='INFO', logger=logger, fmt="%(levelname)s %(message)s")
logger.setLevel(logging.DEBUG)
#log_format = "%(levelname)s %(message)s"
#logging.basicConfig(format=log_format, level=logging.INFO)
# log_format = "%(levelname)s %(message)s"
# logging.basicConfig(format=log_format, level=logging.INFO)
class CMakeCacheEntry:
'''Represents a CMake cache entry.
@ -334,7 +330,7 @@ class CMakeCacheEntry:
except ValueError as exc:
args = exc.args + ('on line {}: {}'.format(line_no, line),)
raise ValueError(args) from exc
elif type_ in ['STRING','INTERNAL']:
elif type_ in ['STRING', 'INTERNAL']:
# If the value is a CMake list (i.e. is a string which
# contains a ';'), convert to a Python list.
if ';' in value:
@ -512,7 +508,6 @@ class BinaryHandler(Handler):
self.asan = False
self.coverage = False
def try_kill_process_by_pid(self):
if self.pid_fn:
pid = int(open(self.pid_fn).read())
@ -542,8 +537,8 @@ class BinaryHandler(Handler):
harness.handle(line.decode('utf-8').rstrip())
if harness.state:
try:
#POSIX arch based ztests end on their own,
#so let's give it up to 100ms to do so
# POSIX arch based ztests end on their own,
# so let's give it up to 100ms to do so
proc.wait(0.1)
except subprocess.TimeoutExpired:
self.terminate(proc)
@ -567,8 +562,8 @@ class BinaryHandler(Handler):
if self.valgrind and shutil.which("valgrind"):
command = ["valgrind", "--error-exitcode=2",
"--leak-check=full",
"--suppressions="+ZEPHYR_BASE+"/scripts/valgrind.supp",
"--log-file="+self.build_dir+"/valgrind.log"
"--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
"--log-file=" + self.build_dir + "/valgrind.log"
] + command
run_valgrind = True
@ -587,7 +582,7 @@ class BinaryHandler(Handler):
with subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
t = threading.Thread(target=self._output_reader, args=(proc, harness, ), daemon=True)
t = threading.Thread(target=self._output_reader, args=(proc, harness,), daemon=True)
t.start()
t.join(self.timeout)
if t.is_alive():
@ -611,8 +606,8 @@ class BinaryHandler(Handler):
self.instance.results = harness.tests
if not self.terminated and self.returncode != 0:
#When a process is killed, the default handler returns 128 + SIGTERM
#so in that case the return code itself is not meaningful
# When a process is killed, the default handler returns 128 + SIGTERM
# so in that case the return code itself is not meaningful
self.set_state("failed", handler_time)
self.instance.reason = "Handler Error"
elif run_valgrind and self.returncode == 2:
@ -722,7 +717,6 @@ class DeviceHandler(Handler):
else:
command = [get_generator()[0], "-C", self.build_dir, "flash"]
while not self.device_is_available(self.instance.platform.name):
time.sleep(1)
@ -745,13 +739,13 @@ class DeviceHandler(Handler):
elif runner == "openocd" and product == "STM32 STLink":
command.append('--')
command.append("--cmd-pre-init")
command.append("hla_serial %s" %(board_id))
command.append("hla_serial %s" % (board_id))
elif runner == "openocd" and product == "EDBG CMSIS-DAP":
command.append('--')
command.append("--cmd-pre-init")
command.append("cmsis_dap_serial %s" %(board_id))
command.append("cmsis_dap_serial %s" % (board_id))
elif runner == "jlink":
command.append("--tool-opt=-SelectEmuBySN %s" %(board_id))
command.append("--tool-opt=-SelectEmuBySN %s" % (board_id))
serial_device = hardware['serial']
@ -766,7 +760,7 @@ class DeviceHandler(Handler):
)
except serial.SerialException as e:
self.set_state("failed", 0)
logger.error("Serial device error: %s" %(str(e)))
logger.error("Serial device error: %s" % (str(e)))
self.make_device_available(serial_device)
return
@ -841,7 +835,6 @@ class QEMUHandler(Handler):
for these to collect whether the test passed or failed.
"""
def __init__(self, instance, type_str):
"""Constructor
@ -853,7 +846,6 @@ class QEMUHandler(Handler):
self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
@staticmethod
def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness):
fifo_in = fifo_fn + ".in"
@ -925,7 +917,7 @@ class QEMUHandler(Handler):
# coverage is enabled since dumping this information can
# take some time.
if not timeout_extended or harness.capture_coverage:
timeout_extended= True
timeout_extended = True
if harness.capture_coverage:
timeout_time = time.time() + 30
else:
@ -984,7 +976,7 @@ class QEMUHandler(Handler):
self.thread.start()
subprocess.call(["stty", "sane"])
logger.debug("Running %s (%s)" %(self.name, self.type_str))
logger.debug("Running %s (%s)" % (self.name, self.type_str))
command = [get_generator()[0]]
command += ["-C", self.build_dir, "run"]
@ -1002,7 +994,6 @@ class QEMUHandler(Handler):
class SizeCalculator:
alloc_sections = [
"bss",
"noinit",
@ -1216,7 +1207,7 @@ class SizeCalculator:
platform_valid_keys = {
"supported_toolchains": {"type": "list", "default": []},
"env": {"type": "list", "default": []}
}
}
testcase_valid_keys = {"tags": {"type": "set", "required": False},
"type": {"type": "str", "default": "integration"},
@ -1266,7 +1257,6 @@ class SanityConfigParser:
if 'common' in self.data:
self.common = self.data['common']
def _cast_value(self, value, typestr):
if isinstance(value, str):
v = value.strip()
@ -1388,7 +1378,7 @@ class Platform:
Maps directly to BOARD when building"""
platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE,
"scripts","sanity_chk","platform-schema.yaml"))
"scripts", "sanity_chk", "platform-schema.yaml"))
def __init__(self):
"""Constructor.
@ -1503,7 +1493,6 @@ class TestCase(object):
self.min_flash = None
self.extra_sections = None
@staticmethod
def get_unique(testcase_root, workdir, name):
@ -1554,9 +1543,10 @@ class TestCase(object):
with open(inf_name) as inf:
if os.name == 'nt':
mmap_args = {'fileno':inf.fileno(), 'length':0, 'access':mmap.ACCESS_READ}
mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ}
else:
mmap_args = {'fileno':inf.fileno(), 'length':0, 'flags':mmap.MAP_PRIVATE, 'prot':mmap.PROT_READ, 'offset':0}
mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ,
'offset': 0}
with contextlib.closing(mmap.mmap(**mmap_args)) as main_c:
# contextlib makes pylint think main_c isn't subscriptable
@ -1581,7 +1571,7 @@ class TestCase(object):
_matches = re.findall(
stc_regex,
main_c[suite_regex_match.end():suite_run_match.start()])
matches = [ match.decode().replace("test_", "") for match in _matches ]
matches = [match.decode().replace("test_", "") for match in _matches]
return matches, warnings
def scan_path(self, path):
@ -1674,7 +1664,7 @@ class TestInstance:
self.run = False
return
runnable =bool(self.testcase.type == "unit" or \
runnable = bool(self.testcase.type == "unit" or \
self.platform.type == "native" or \
self.platform.simulation in ["nsim", "renode", "qemu"] or \
device_testing)
@ -1693,8 +1683,8 @@ class TestInstance:
# if we have a fixture that is also being supplied on the
# command-line, then we need to run the test, not just build it.
if "fixture" in self.testcase.harness_config:
fixture = self.testcase.harness_config['fixture']
if fixture in fixture:
fixture_cfg = self.testcase.harness_config['fixture']
if fixture_cfg in fixture:
_build_only = False
else:
_build_only = True
@ -1755,7 +1745,6 @@ class TestInstance:
class CMake():
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
@ -1801,7 +1790,7 @@ class CMake():
results = {}
if p.returncode == 0:
msg = "Finished building %s for %s" %(self.source_dir, self.platform.name)
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
self.instance.status = "passed"
results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
@ -1842,7 +1831,7 @@ class CMake():
def run_cmake(self, args=[]):
ldflags = "-Wl,--fatal-warnings"
logger.debug("Running cmake on %s for %s" %(self.source_dir, self.platform.name))
logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
# fixme: add additional cflags based on options
cmake_args = [
@ -1880,7 +1869,7 @@ class CMake():
if p.returncode == 0:
filter_results = self.parse_generated()
msg = "Finished building %s for %s" %(self.source_dir, self.platform.name)
msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
results = {'msg': msg, 'filter': filter_results}
@ -1984,7 +1973,8 @@ class ProjectBuilder(FilterBuilder):
self.coverage = kwargs.get('coverage', False)
self.inline_logs = kwargs.get('inline_logs', False)
def log_info(self, filename, inline_logs):
@staticmethod
def log_info(filename, inline_logs):
filename = os.path.relpath(os.path.realpath(filename))
if inline_logs:
logger.info("{:-^100}".format(filename))
@ -2078,7 +2068,7 @@ class ProjectBuilder(FilterBuilder):
pipeline.put({"op": "build", "test": self.instance})
elif op == "build":
logger.debug("build test: %s" %self.instance.name)
logger.debug("build test: %s" % self.instance.name)
results = self.build()
if results.get('returncode', 1) > 0:
@ -2090,7 +2080,7 @@ class ProjectBuilder(FilterBuilder):
pipeline.put({"op": "report", "test": self.instance})
# Run the generated binary using one of the supported handlers
elif op == "run":
logger.debug("run test: %s" %self.instance.name)
logger.debug("run test: %s" % self.instance.name)
self.run()
self.instance.status, _ = self.instance.handler.get_state()
pipeline.put({
@ -2151,7 +2141,7 @@ class ProjectBuilder(FilterBuilder):
instance.testcase.name, status, more_info))
if instance.status in ["failed", "timeout"]:
self.log_info_file(inline_logs)
self.log_info_file(self.inline_logs)
else:
sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % (
COLOR_GREEN,
@ -2190,7 +2180,7 @@ class ProjectBuilder(FilterBuilder):
if (self.testcase.extra_configs or self.coverage or
self.asan):
args.append("OVERLAY_CONFIG=\"%s %s\"" %(overlays,
args.append("OVERLAY_CONFIG=\"%s %s\"" % (overlays,
os.path.join(instance.build_dir,
"sanitycheck", "testcase_extra.conf")))
@ -2223,6 +2213,7 @@ class BoundedExecutor(concurrent.futures.ThreadPoolExecutor):
:param bound: Integer - the maximum number of items in the work queue
:param max_workers: Integer - the size of the thread pool
"""
def __init__(self, bound, max_workers, **kwargs):
super().__init__(max_workers)
# self.executor = ThreadPoolExecutor(max_workers=max_workers)
@ -2341,7 +2332,7 @@ class TestSuite:
delta = instance.metrics.get(metric, 0) - mtype(sm[metric])
if delta == 0:
continue
results.append((instance, metric, instance.metrics.get(metric, 0 ), delta,
results.append((instance, metric, instance.metrics.get(metric, 0), delta,
lower_better))
return results
@ -2364,7 +2355,7 @@ class TestSuite:
(footprint_threshold / 100.0)):
continue
info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
i.platform.name, i.testcase.name, COLOR_YELLOW,
"INFO" if all_deltas else "WARNING", COLOR_NORMAL,
metric, delta, value, percentage))
@ -2386,11 +2377,13 @@ class TestSuite:
failed += 1
if self.total_tests and self.total_tests != self.total_skipped:
pass_rate = (float(self.total_tests - self.total_failed - self.total_skipped)/ float(self.total_tests - self.total_skipped))
pass_rate = (float(self.total_tests - self.total_failed - self.total_skipped) / float(
self.total_tests - self.total_skipped))
else:
pass_rate = 0
logger.info("{}{} of {}{} tests passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
logger.info(
"{}{} of {}{} tests passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
COLOR_RED if failed else COLOR_GREEN,
self.total_tests - self.total_failed - self.total_skipped,
self.total_tests,
@ -2463,7 +2456,6 @@ class TestSuite:
logger.error("E: %s: can't load: %s" % (file, e))
self.load_errors += 1
def get_all_tests(self):
tests = []
for _, tc in self.testcases.items():
@ -2494,7 +2486,7 @@ class TestSuite:
for root in self.roots:
root = os.path.abspath(root)
logger.debug("Reading test case configuration files under %s..." %root)
logger.debug("Reading test case configuration files under %s..." % root)
for dirpath, dirnames, filenames in os.walk(root, topdown=True):
logger.debug("scanning %s" % dirpath)
@ -2576,7 +2568,7 @@ class TestSuite:
last_run = os.path.join(self.outdir, "sanitycheck.csv")
try:
if not os.path.exists(last_run):
raise SanityRuntimeError("Couldn't find last sanitycheck run.: %s" %last_run)
raise SanityRuntimeError("Couldn't find last sanitycheck run.: %s" % last_run)
except Exception as e:
print(str(e))
sys.exit(2)
@ -2603,7 +2595,7 @@ class TestSuite:
self.add_instances(instance_list)
tests_to_run = len(self.instances)
logger.info("%d tests passed already, retrying %d tests" %(total_tests - tests_to_run, tests_to_run))
logger.info("%d tests passed already, retrying %d tests" % (total_tests - tests_to_run, tests_to_run))
def load_from_file(self, file):
try:
@ -2784,12 +2776,12 @@ class TestSuite:
b = set(tc.platform_whitelist)
c = a.intersection(b)
if c:
aa = list( filter( lambda tc: tc.platform.name in c, instance_list))
aa = list(filter(lambda tc: tc.platform.name in c, instance_list))
self.add_instances(aa)
else:
self.add_instances(instance_list[:1])
else:
instances = list( filter( lambda tc: tc.platform.default, instance_list))
instances = list(filter(lambda tc: tc.platform.default, instance_list))
self.add_instances(instances)
for instance in list(filter(lambda tc: not tc.platform.default, instance_list)):
@ -2860,14 +2852,14 @@ class TestSuite:
# Start the load operation and mark the future with its URL
pb = ProjectBuilder(self,
test,
lsan = self.enable_lsan,
asan = self.enable_asan,
coverage = self.enable_coverage,
extra_args = self.extra_args,
device_testing = self.device_testing,
cmake_only = self.cmake_only,
valgrind = self.enable_valgrind,
inline_logs = self.inline_logs
lsan=self.enable_lsan,
asan=self.enable_asan,
coverage=self.enable_coverage,
extra_args=self.extra_args,
device_testing=self.device_testing,
cmake_only=self.cmake_only,
valgrind=self.enable_valgrind,
inline_logs=self.inline_logs
)
future_to_test[executor.submit(pb.process, message)] = test.name
@ -2923,7 +2915,7 @@ class TestSuite:
run = "Sanitycheck"
eleTestsuite = None
platforms = {inst.platform.name for _,inst in self.instances.items()}
platforms = {inst.platform.name for _, inst in self.instances.items()}
for platform in platforms:
errors = 0
passes = 0
@ -2951,7 +2943,7 @@ class TestSuite:
name=run, time="%f" % duration,
tests="%d" % (errors + passes + fails),
failures="%d" % fails,
errors="%d" % errors, skipped="%d" %skips)
errors="%d" % errors, skipped="%d" % skips)
handler_time = 0
@ -2962,8 +2954,9 @@ class TestSuite:
handler_time = instance.metrics.get('handler_time', 0)
for k in instance.results.keys():
eleTestcase = ET.SubElement(
eleTestsuite, 'testcase', classname="%s:%s" %(instance.platform.name, os.path.basename(instance.testcase.name)),
name="%s" % (k), time="%f" %handler_time)
eleTestsuite, 'testcase',
classname="%s:%s" % (instance.platform.name, os.path.basename(instance.testcase.name)),
name="%s" % (k), time="%f" % handler_time)
if instance.results[k] in ['FAIL', 'BLOCK']:
el = None
@ -2995,7 +2988,6 @@ class TestSuite:
type="skipped",
message="Skipped")
result = ET.tostring(eleTestsuites)
with open(os.path.join(outdir, platform + ".xml"), 'wb') as f:
f.write(result)
@ -3035,7 +3027,7 @@ class TestSuite:
name=run, time="%f" % duration,
tests="%d" % (errors + passes + fails + skips),
failures="%d" % fails,
errors="%d" %(errors), skip="%s" %(skips))
errors="%d" % (errors), skip="%s" % (skips))
for instance in self.instances.values():
@ -3050,10 +3042,13 @@ class TestSuite:
if instance.status != "failed" and instance.handler:
handler_time = instance.metrics.get("handler_time", 0)
eleTestcase = ET.SubElement(
eleTestsuite, 'testcase', classname="%s:%s" %
(instance.platform.name, instance.testcase.name), name="%s" %
(instance.testcase.name), time="%f" %handler_time)
eleTestsuite,
'testcase',
classname="%s:%s" % (instance.platform.name, instance.testcase.name),
name="%s" % (instance.testcase.name),
time="%f" % handler_time)
if instance.status == "failed":
failure = ET.SubElement(
@ -3078,7 +3073,7 @@ class TestSuite:
failure.text = filtered_string
f.close()
elif instance.status == "skipped":
ET.SubElement( eleTestcase, 'skipped', type="skipped", message="Skipped")
ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped")
result = ET.tostring(eleTestsuites)
with open(filename, 'wb') as report:
@ -3111,7 +3106,6 @@ class TestSuite:
rowdict["rom_size"] = rom_size
cw.writerow(rowdict)
def get_testcase(self, identifier):
results = []
for _, tc in self.testcases.items():
@ -3122,7 +3116,6 @@ class TestSuite:
def parse_arguments():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
@ -3260,7 +3253,6 @@ Artificially long but functional example:
metavar="FILENAME",
help="Export tests case meta-data to a file in CSV format.")
parser.add_argument("--timestamps",
action="store_true",
help="Print all messages with time stamps")
@ -3345,7 +3337,7 @@ Artificially long but functional example:
"--footprint-threshold=0")
parser.add_argument(
"-O", "--outdir",
default=os.path.join(os.getcwd(),"sanity-out"),
default=os.path.join(os.getcwd(), "sanity-out"),
help="Output directory for logs and binaries. "
"Default is 'sanity-out' in the current directory. "
"This directory will be deleted unless '--no-clean' is set.")
@ -3503,6 +3495,7 @@ structure in the main Zephyr tree: boards/<arch>/<board_name>/""")
return parser.parse_args()
def size_report(sc):
logger.info(sc.filename)
logger.info("SECTION NAME VMA LMA SIZE HEX SZ TYPE")
@ -3517,6 +3510,7 @@ def size_report(sc):
(sc.rom_size, sc.ram_size))
logger.info("")
class CoverageTool:
""" Base class for every supported coverage tool
"""
@ -3535,7 +3529,7 @@ class CoverageTool:
@staticmethod
def retrieve_gcov_data(intput_file):
if VERBOSE:
logger.debug("Working on %s" %intput_file)
logger.debug("Working on %s" % intput_file)
extracted_coverage_info = {}
capture_data = False
capture_complete = False
@ -3561,7 +3555,7 @@ class CoverageTool:
continue
else:
continue
extracted_coverage_info.update({file_name:hex_dump})
extracted_coverage_info.update({file_name: hex_dump})
if not capture_data:
capture_complete = True
return {'complete': capture_complete, 'data': extracted_coverage_info}
@ -3574,7 +3568,7 @@ class CoverageTool:
# if kobject_hash is given for coverage gcovr fails
# hence skipping it problem only in gcovr v4.1
if "kobject_hash" in filename:
filename = (filename[:-4]) +"gcno"
filename = (filename[:-4]) + "gcno"
try:
os.remove(filename)
except Exception:
@ -3584,7 +3578,6 @@ class CoverageTool:
with open(filename, 'wb') as fp:
fp.write(bytes.fromhex(hexdump_val))
def generate(self, outdir):
for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
gcov_data = self.__class__.retrieve_gcov_data(filename)
@ -3744,9 +3737,9 @@ def native_and_unit_first(a, b):
return -1
if b[0].startswith('native_posix'):
return 1
if a[0].split("/",1)[0].endswith("_bsim"):
if a[0].split("/", 1)[0].endswith("_bsim"):
return -1
if b[0].split("/",1)[0].endswith("_bsim"):
if b[0].split("/", 1)[0].endswith("_bsim"):
return 1
return (a > b) - (a < b)
@ -3819,7 +3812,7 @@ class HardwareMap:
s_dev['serial'] = d.device
s_dev['product'] = d.product
s_dev['runner'] = 'unknown'
for runner,_ in self.runner_mapping.items():
for runner, _ in self.runner_mapping.items():
products = self.runner_mapping.get(runner)
if d.product in products:
s_dev['runner'] = runner
@ -3833,7 +3826,7 @@ class HardwareMap:
s_dev['connected'] = True
self.detected.append(s_dev)
else:
logger.warning("Unsupported device (%s): %s" %(d.manufacturer, d))
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
def write_map(self, hwm_file):
# use existing map
@ -3859,12 +3852,11 @@ class HardwareMap:
print("")
table = []
header = ["Platform", "ID", "Serial device"]
for p in sorted(hwm, key = lambda i: i['platform']):
for p in sorted(hwm, key=lambda i: i['platform']):
platform = p.get('platform')
table.append([platform, p.get('id', None), p.get('serial')])
print(tabulate(table, headers=header, tablefmt="github"))
with open(hwm_file, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, default_flow_style=False)
@ -3873,8 +3865,10 @@ class HardwareMap:
with open(hwm_file, 'w') as yaml_file:
yaml.dump(self.detected, yaml_file, default_flow_style=False)
options = None
def main():
start_time = time.time()
global VERBOSE
@ -3887,7 +3881,7 @@ def main():
if os.path.exists(options.outdir):
logger.info("Keeping artifacts untouched")
elif os.path.exists(options.outdir):
for i in range(1,100):
for i in range(1, 100):
new_out = options.outdir + ".{}".format(i)
if not os.path.exists(new_out):
logger.info("Renaming output directory to {}".format(new_out))
@ -3913,7 +3907,6 @@ def main():
else:
ch.setLevel(logging.INFO)
# create formatter and add it to the handlers
if options.timestamps:
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
@ -3967,7 +3960,6 @@ def main():
size_report(SizeCalculator(fn, []))
sys.exit(0)
if options.subset:
subset, sets = options.subset.split("/")
if int(subset) > 0 and int(sets) >= int(subset):
@ -3976,7 +3968,6 @@ def main():
logger.error("You have provided a wrong subset value: %s." % options.subset)
return
if not options.testcase_root:
options.testcase_root = [os.path.join(ZEPHYR_BASE, "tests"),
os.path.join(ZEPHYR_BASE, "samples")]
@ -4023,14 +4014,13 @@ def main():
if platform['connected']:
options.platform.append(platform['platform'])
elif options.device_serial: #back-ward compatibility
elif options.device_serial: # back-ward compatibility
if options.platform and len(options.platform) == 1:
hwm.load_device_from_cmdline(options.device_serial, options.platform[0])
else:
logger.error("""When --device-testing is used with --device-serial, only one
platform is allowed""")
if suite.load_errors:
sys.exit(1)
@ -4125,7 +4115,6 @@ def main():
for pre, _, node in RenderTree(testsuite):
print("%s%s" % (pre, node.name))
return
discards = []
@ -4173,7 +4162,7 @@ def main():
if options.report_excluded:
all_tests = suite.get_all_tests()
to_be_run = set()
for i,p in suite.instances.items():
for i, p in suite.instances.items():
to_be_run.update(p.testcase.cases)
if all_tests - to_be_run:
@ -4184,7 +4173,7 @@ def main():
return
if options.subset:
#suite.instances = OrderedDict(sorted(suite.instances.items(),
# suite.instances = OrderedDict(sorted(suite.instances.items(),
# key=cmp_to_key(native_and_unit_first)))
subset, sets = options.subset.split("/")
total = len(suite.instances)
@ -4198,7 +4187,6 @@ def main():
sliced_instances = islice(suite.instances.items(), start, end)
suite.instances = OrderedDict(sliced_instances)
if options.save_tests:
suite.csv_report(options.save_tests)
return
@ -4232,7 +4220,7 @@ def main():
completed += 1
if completed > 1:
logger.info("%d Iteration:" %(completed ))
logger.info("%d Iteration:" % (completed))
time.sleep(60) # waiting for the system to settle down
suite.total_done = suite.total_tests - suite.total_failed
suite.total_failed = 0
@ -4282,7 +4270,6 @@ def main():
table.append(row)
print(tabulate(table, headers=header, tablefmt="github"))
suite.save_reports(options.report_name,
options.report_dir,
options.no_update,
@ -4292,5 +4279,6 @@ def main():
if suite.total_failed or (suite.warnings and options.warnings_as_errors):
sys.exit(1)
if __name__ == "__main__":
main()