sanitycheck: fix scope and cleanup

Reorder classes to be on top of loose functions. Move a few globals into
the related classes where they are being used.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2020-03-23 21:28:38 -04:00
commit 1f6cdbe70a

View file

@ -242,6 +242,9 @@ logger = logging.getLogger('sanitycheck')
logger.setLevel(logging.DEBUG)
options = None
pipeline = queue.LifoQueue()
class CMakeCacheEntry:
'''Represents a CMake cache entry.
@ -1228,46 +1231,6 @@ class SizeCalculator:
"type": stype, "recognized": recognized})
# "list" - List of strings
# "list:<type>" - List of <type>
# "set" - Set of unordered, unique strings
# "set:<type>" - Set of <type>
# "float" - Floating point
# "int" - Integer
# "bool" - Boolean
# "str" - String
# XXX Be sure to update __doc__ if you change any of this!!
platform_valid_keys = {
"supported_toolchains": {"type": "list", "default": []},
"env": {"type": "list", "default": []}
}
testcase_valid_keys = {"tags": {"type": "set", "required": False},
"type": {"type": "str", "default": "integration"},
"extra_args": {"type": "list"},
"extra_configs": {"type": "list"},
"build_only": {"type": "bool", "default": False},
"build_on_all": {"type": "bool", "default": False},
"skip": {"type": "bool", "default": False},
"slow": {"type": "bool", "default": False},
"timeout": {"type": "int", "default": 60},
"min_ram": {"type": "int", "default": 8},
"depends_on": {"type": "set"},
"min_flash": {"type": "int", "default": 32},
"arch_whitelist": {"type": "set"},
"arch_exclude": {"type": "set"},
"extra_sections": {"type": "list", "default": []},
"platform_exclude": {"type": "set"},
"platform_whitelist": {"type": "set"},
"toolchain_exclude": {"type": "set"},
"toolchain_whitelist": {"type": "set"},
"filter": {"type": "str"},
"harness": {"type": "str"},
"harness_config": {"type": "map", "default": {}}
}
class SanityConfigParser:
"""Class to read test case files with semantic checking
@ -2296,9 +2259,6 @@ class ProjectBuilder(FilterBuilder):
sys.stdout.flush()
pipeline = queue.LifoQueue()
class BoundedExecutor(concurrent.futures.ThreadPoolExecutor):
"""BoundedExecutor behaves as a ThreadPoolExecutor which will block on
calls to submit() once the limit given as "bound" work items are queued for
@ -2332,6 +2292,31 @@ class TestSuite:
os.path.join(ZEPHYR_BASE,
"scripts", "sanity_chk", "testcase-schema.yaml"))
testcase_valid_keys = {"tags": {"type": "set", "required": False},
"type": {"type": "str", "default": "integration"},
"extra_args": {"type": "list"},
"extra_configs": {"type": "list"},
"build_only": {"type": "bool", "default": False},
"build_on_all": {"type": "bool", "default": False},
"skip": {"type": "bool", "default": False},
"slow": {"type": "bool", "default": False},
"timeout": {"type": "int", "default": 60},
"min_ram": {"type": "int", "default": 8},
"depends_on": {"type": "set"},
"min_flash": {"type": "int", "default": 32},
"arch_whitelist": {"type": "set"},
"arch_exclude": {"type": "set"},
"extra_sections": {"type": "list", "default": []},
"platform_exclude": {"type": "set"},
"platform_whitelist": {"type": "set"},
"toolchain_exclude": {"type": "set"},
"toolchain_whitelist": {"type": "set"},
"filter": {"type": "str"},
"harness": {"type": "str"},
"harness_config": {"type": "map", "default": {}}
}
def __init__(self, board_root_list, testcase_roots, outdir):
self.roots = testcase_roots
@ -2960,10 +2945,6 @@ class TestSuite:
except concurrent.futures.TimeoutError:
logger.warning("{} stuck?".format(test))
if self.enable_size_report and not self.cmake_only:
# Parallelize size calculation
executor = concurrent.futures.ThreadPoolExecutor(self.jobs)
@ -3196,6 +3177,370 @@ class TestSuite:
return results
class CoverageTool:
""" Base class for every supported coverage tool
"""
def __init__(self):
self.gcov_tool = options.gcov_tool
@staticmethod
def factory(tool):
if tool == 'lcov':
return Lcov()
if tool == 'gcovr':
return Gcovr()
logger.error("Unsupported coverage tool specified: {}".format(tool))
@staticmethod
def retrieve_gcov_data(intput_file):
if VERBOSE:
logger.debug("Working on %s" % intput_file)
extracted_coverage_info = {}
capture_data = False
capture_complete = False
with open(intput_file, 'r') as fp:
for line in fp.readlines():
if re.search("GCOV_COVERAGE_DUMP_START", line):
capture_data = True
continue
if re.search("GCOV_COVERAGE_DUMP_END", line):
capture_complete = True
break
# Loop until the coverage data is found.
if not capture_data:
continue
if line.startswith("*"):
sp = line.split("<")
if len(sp) > 1:
# Remove the leading delimiter "*"
file_name = sp[0][1:]
# Remove the trailing new line char
hex_dump = sp[1][:-1]
else:
continue
else:
continue
extracted_coverage_info.update({file_name: hex_dump})
if not capture_data:
capture_complete = True
return {'complete': capture_complete, 'data': extracted_coverage_info}
@staticmethod
def create_gcda_files(extracted_coverage_info):
if VERBOSE:
logger.debug("Generating gcda files")
for filename, hexdump_val in extracted_coverage_info.items():
# if kobject_hash is given for coverage gcovr fails
# hence skipping it problem only in gcovr v4.1
if "kobject_hash" in filename:
filename = (filename[:-4]) + "gcno"
try:
os.remove(filename)
except Exception:
pass
continue
with open(filename, 'wb') as fp:
fp.write(bytes.fromhex(hexdump_val))
def generate(self, outdir):
for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
gcov_data = self.__class__.retrieve_gcov_data(filename)
capture_complete = gcov_data['complete']
extracted_coverage_info = gcov_data['data']
if capture_complete:
self.__class__.create_gcda_files(extracted_coverage_info)
logger.debug("Gcov data captured: {}".format(filename))
else:
logger.error("Gcov data capture incomplete: {}".format(filename))
with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
ret = self._generate(outdir, coveragelog)
if ret == 0:
logger.info("HTML report generated: {}".format(
os.path.join(outdir, "coverage", "index.html")))
class Lcov(CoverageTool):
def __init__(self):
super().__init__()
self.ignores = []
def add_ignore_file(self, pattern):
self.ignores.append('*' + pattern + '*')
def add_ignore_directory(self, pattern):
self.ignores.append(pattern + '/*')
def _generate(self, outdir, coveragelog):
coveragefile = os.path.join(outdir, "coverage.info")
ztestfile = os.path.join(outdir, "ztest.info")
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool,
"--capture", "--directory", outdir,
"--rc", "lcov_branch_coverage=1",
"--output-file", coveragefile], stdout=coveragelog)
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract",
coveragefile,
os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"),
"--output-file", ztestfile,
"--rc", "lcov_branch_coverage=1"], stdout=coveragelog)
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove",
ztestfile,
os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"),
"--output-file", ztestfile,
"--rc", "lcov_branch_coverage=1"],
stdout=coveragelog)
files = [coveragefile, ztestfile]
else:
files = [coveragefile]
for i in self.ignores:
subprocess.call(
["lcov", "--gcov-tool", self.gcov_tool, "--remove",
coveragefile, i, "--output-file",
coveragefile, "--rc", "lcov_branch_coverage=1"],
stdout=coveragelog)
# The --ignore-errors source option is added to avoid it exiting due to
# samples/application_development/external_lib/
return subprocess.call(["genhtml", "--legend", "--branch-coverage",
"--ignore-errors", "source",
"-output-directory",
os.path.join(outdir, "coverage")] + files,
stdout=coveragelog)
class Gcovr(CoverageTool):
def __init__(self):
super().__init__()
self.ignores = []
def add_ignore_file(self, pattern):
self.ignores.append('.*' + pattern + '.*')
def add_ignore_directory(self, pattern):
self.ignores.append(pattern + '/.*')
@staticmethod
def _interleave_list(prefix, list):
tuple_list = [(prefix, item) for item in list]
return [item for sublist in tuple_list for item in sublist]
def _generate(self, outdir, coveragelog):
coveragefile = os.path.join(outdir, "coverage.json")
ztestfile = os.path.join(outdir, "ztest.json")
excludes = Gcovr._interleave_list("-e", self.ignores)
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable",
self.gcov_tool, "-e", "tests/*"] + excludes +
["--json", "-o", coveragefile, outdir],
stdout=coveragelog)
subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable",
self.gcov_tool, "-f", "tests/ztest", "-e",
"tests/ztest/test/*", "--json", "-o", ztestfile,
outdir], stdout=coveragelog)
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
files = [coveragefile, ztestfile]
else:
files = [coveragefile]
subdir = os.path.join(outdir, "coverage")
os.makedirs(subdir, exist_ok=True)
tracefiles = self._interleave_list("--add-tracefile", files)
return subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--html",
"--html-details"] + tracefiles +
["-o", os.path.join(subdir, "index.html")],
stdout=coveragelog)
class HardwareMap:
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", "hwmap-schema.yaml")
manufacturer = [
'ARM',
'SEGGER',
'MBED',
'STMicroelectronics',
'Atmel Corp.',
'Texas Instruments',
'Silicon Labs',
'NXP Semiconductors',
'Microchip Technology Inc.',
'FTDI',
'Digilent'
]
runner_mapping = {
'pyocd': [
'DAPLink CMSIS-DAP',
'MBED CMSIS-DAP'
],
'jlink': [
'J-Link',
'J-Link OB'
],
'openocd': [
'STM32 STLink', '^XDS110.*'
],
'dediprog': [
'TTL232R-3V3',
'MCP2200 USB Serial Port Emulator'
]
}
def __init__(self):
self.detected = []
self.connected_hardware = []
def load_device_from_cmdline(self, serial, platform):
device = {
"serial": serial,
"platform": platform,
"counter": 0,
"available": True,
"connected": True
}
self.connected_hardware.append(device)
def load_hardware_map(self, map_file):
hwm_schema = scl.yaml_load(self.schema_path)
self.connected_hardware = scl.yaml_load_verify(map_file, hwm_schema)
for i in self.connected_hardware:
i['counter'] = 0
def scan_hw(self):
from serial.tools import list_ports
serial_devices = list_ports.comports()
logger.info("Scanning connected hardware...")
for d in serial_devices:
if d.manufacturer in self.manufacturer:
# TI XDS110 can have multiple serial devices for a single board
# assume endpoint 0 is the serial, skip all others
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
continue
s_dev = {}
s_dev['platform'] = "unknown"
s_dev['id'] = d.serial_number
s_dev['serial'] = d.device
s_dev['product'] = d.product
s_dev['runner'] = 'unknown'
for runner, _ in self.runner_mapping.items():
products = self.runner_mapping.get(runner)
if d.product in products:
s_dev['runner'] = runner
continue
# Try regex matching
for p in products:
if re.match(p, d.product):
s_dev['runner'] = runner
s_dev['available'] = True
s_dev['connected'] = True
self.detected.append(s_dev)
else:
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
def write_map(self, hwm_file):
# use existing map
if os.path.exists(hwm_file):
with open(hwm_file, 'r') as yaml_file:
hwm = yaml.load(yaml_file, Loader=yaml.FullLoader)
# disconnect everything
for h in hwm:
h['connected'] = False
h['serial'] = None
for d in self.detected:
for h in hwm:
if d['id'] == h['id'] and d['product'] == h['product']:
h['connected'] = True
h['serial'] = d['serial']
d['match'] = True
new = list(filter(lambda n: not n.get('match', False), self.detected))
hwm = hwm + new
logger.info("Registered devices:")
self.dump(hwm)
with open(hwm_file, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, default_flow_style=False)
else:
# create new file
with open(hwm_file, 'w') as yaml_file:
yaml.dump(self.detected, yaml_file, default_flow_style=False)
logger.info("Detected devices:")
self.dump(self.detected)
@staticmethod
def dump(hwmap=[], filtered=[], header=[], connected_only=False):
print("")
table = []
if not header:
header = ["Platform", "ID", "Serial device"]
for p in sorted(hwmap, key=lambda i: i['platform']):
platform = p.get('platform')
connected = p.get('connected', False)
if filtered and platform not in filtered:
continue
if not connected_only or connected:
table.append([platform, p.get('id', None), p.get('serial')])
print(tabulate(table, headers=header, tablefmt="github"))
def size_report(sc):
logger.info(sc.filename)
logger.info("SECTION NAME VMA LMA SIZE HEX SZ TYPE")
for i in range(len(sc.sections)):
v = sc.sections[i]
logger.info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" %
(v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"],
v["type"]))
logger.info("Totals: %d bytes (ROM), %d bytes (RAM)" %
(sc.rom_size, sc.ram_size))
logger.info("")
def export_tests(filename, tests):
with open(filename, "wt") as csvfile:
fieldnames = ['section', 'subsection', 'title', 'reference']
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
for test in tests:
data = test.split(".")
if len(data) > 1:
subsec = " ".join(data[1].split("_")).title()
rowdict = {
"section": data[0].capitalize(),
"subsection": subsec,
"title": test,
"reference": test
}
cw.writerow(rowdict)
else:
logger.info("{} can't be exported".format(test))
def parse_arguments():
parser = argparse.ArgumentParser(
description=__doc__,
@ -3592,371 +3937,6 @@ structure in the main Zephyr tree: boards/<arch>/<board_name>/""")
return parser.parse_args()
def size_report(sc):
logger.info(sc.filename)
logger.info("SECTION NAME VMA LMA SIZE HEX SZ TYPE")
for i in range(len(sc.sections)):
v = sc.sections[i]
logger.info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" %
(v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"],
v["type"]))
logger.info("Totals: %d bytes (ROM), %d bytes (RAM)" %
(sc.rom_size, sc.ram_size))
logger.info("")
class CoverageTool:
""" Base class for every supported coverage tool
"""
def __init__(self):
self.gcov_tool = options.gcov_tool
@staticmethod
def factory(tool):
if tool == 'lcov':
return Lcov()
if tool == 'gcovr':
return Gcovr()
logger.error("Unsupported coverage tool specified: {}".format(tool))
@staticmethod
def retrieve_gcov_data(intput_file):
if VERBOSE:
logger.debug("Working on %s" % intput_file)
extracted_coverage_info = {}
capture_data = False
capture_complete = False
with open(intput_file, 'r') as fp:
for line in fp.readlines():
if re.search("GCOV_COVERAGE_DUMP_START", line):
capture_data = True
continue
if re.search("GCOV_COVERAGE_DUMP_END", line):
capture_complete = True
break
# Loop until the coverage data is found.
if not capture_data:
continue
if line.startswith("*"):
sp = line.split("<")
if len(sp) > 1:
# Remove the leading delimiter "*"
file_name = sp[0][1:]
# Remove the trailing new line char
hex_dump = sp[1][:-1]
else:
continue
else:
continue
extracted_coverage_info.update({file_name: hex_dump})
if not capture_data:
capture_complete = True
return {'complete': capture_complete, 'data': extracted_coverage_info}
@staticmethod
def create_gcda_files(extracted_coverage_info):
if VERBOSE:
logger.debug("Generating gcda files")
for filename, hexdump_val in extracted_coverage_info.items():
# if kobject_hash is given for coverage gcovr fails
# hence skipping it problem only in gcovr v4.1
if "kobject_hash" in filename:
filename = (filename[:-4]) + "gcno"
try:
os.remove(filename)
except Exception:
pass
continue
with open(filename, 'wb') as fp:
fp.write(bytes.fromhex(hexdump_val))
def generate(self, outdir):
for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
gcov_data = self.__class__.retrieve_gcov_data(filename)
capture_complete = gcov_data['complete']
extracted_coverage_info = gcov_data['data']
if capture_complete:
self.__class__.create_gcda_files(extracted_coverage_info)
logger.debug("Gcov data captured: {}".format(filename))
else:
logger.error("Gcov data capture incomplete: {}".format(filename))
with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
ret = self._generate(outdir, coveragelog)
if ret == 0:
logger.info("HTML report generated: {}".format(
os.path.join(outdir, "coverage", "index.html")))
class Lcov(CoverageTool):
def __init__(self):
super().__init__()
self.ignores = []
def add_ignore_file(self, pattern):
self.ignores.append('*' + pattern + '*')
def add_ignore_directory(self, pattern):
self.ignores.append(pattern + '/*')
def _generate(self, outdir, coveragelog):
coveragefile = os.path.join(outdir, "coverage.info")
ztestfile = os.path.join(outdir, "ztest.info")
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool,
"--capture", "--directory", outdir,
"--rc", "lcov_branch_coverage=1",
"--output-file", coveragefile], stdout=coveragelog)
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract",
coveragefile,
os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"),
"--output-file", ztestfile,
"--rc", "lcov_branch_coverage=1"], stdout=coveragelog)
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove",
ztestfile,
os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"),
"--output-file", ztestfile,
"--rc", "lcov_branch_coverage=1"],
stdout=coveragelog)
files = [coveragefile, ztestfile]
else:
files = [coveragefile]
for i in self.ignores:
subprocess.call(
["lcov", "--gcov-tool", self.gcov_tool, "--remove",
coveragefile, i, "--output-file",
coveragefile, "--rc", "lcov_branch_coverage=1"],
stdout=coveragelog)
# The --ignore-errors source option is added to avoid it exiting due to
# samples/application_development/external_lib/
return subprocess.call(["genhtml", "--legend", "--branch-coverage",
"--ignore-errors", "source",
"-output-directory",
os.path.join(outdir, "coverage")] + files,
stdout=coveragelog)
class Gcovr(CoverageTool):
def __init__(self):
super().__init__()
self.ignores = []
def add_ignore_file(self, pattern):
self.ignores.append('.*' + pattern + '.*')
def add_ignore_directory(self, pattern):
self.ignores.append(pattern + '/.*')
@staticmethod
def _interleave_list(prefix, list):
tuple_list = [(prefix, item) for item in list]
return [item for sublist in tuple_list for item in sublist]
def _generate(self, outdir, coveragelog):
coveragefile = os.path.join(outdir, "coverage.json")
ztestfile = os.path.join(outdir, "ztest.json")
excludes = Gcovr._interleave_list("-e", self.ignores)
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable",
self.gcov_tool, "-e", "tests/*"] + excludes +
["--json", "-o", coveragefile, outdir],
stdout=coveragelog)
subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable",
self.gcov_tool, "-f", "tests/ztest", "-e",
"tests/ztest/test/*", "--json", "-o", ztestfile,
outdir], stdout=coveragelog)
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
files = [coveragefile, ztestfile]
else:
files = [coveragefile]
subdir = os.path.join(outdir, "coverage")
os.makedirs(subdir, exist_ok=True)
tracefiles = self._interleave_list("--add-tracefile", files)
return subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--html",
"--html-details"] + tracefiles +
["-o", os.path.join(subdir, "index.html")],
stdout=coveragelog)
def export_tests(filename, tests):
with open(filename, "wt") as csvfile:
fieldnames = ['section', 'subsection', 'title', 'reference']
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
for test in tests:
data = test.split(".")
if len(data) > 1:
subsec = " ".join(data[1].split("_")).title()
rowdict = {
"section": data[0].capitalize(),
"subsection": subsec,
"title": test,
"reference": test
}
cw.writerow(rowdict)
else:
logger.info("{} can't be exported".format(test))
class HardwareMap:
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", "hwmap-schema.yaml")
manufacturer = [
'ARM',
'SEGGER',
'MBED',
'STMicroelectronics',
'Atmel Corp.',
'Texas Instruments',
'Silicon Labs',
'NXP Semiconductors',
'Microchip Technology Inc.',
'FTDI',
'Digilent'
]
runner_mapping = {
'pyocd': [
'DAPLink CMSIS-DAP',
'MBED CMSIS-DAP'
],
'jlink': [
'J-Link',
'J-Link OB'
],
'openocd': [
'STM32 STLink', '^XDS110.*'
],
'dediprog': [
'TTL232R-3V3',
'MCP2200 USB Serial Port Emulator'
]
}
def __init__(self):
self.detected = []
self.connected_hardware = []
def load_device_from_cmdline(self, serial, platform):
device = {
"serial": serial,
"platform": platform,
"counter": 0,
"available": True,
"connected": True
}
self.connected_hardware.append(device)
def load_hardware_map(self, map_file):
hwm_schema = scl.yaml_load(self.schema_path)
self.connected_hardware = scl.yaml_load_verify(map_file, hwm_schema)
for i in self.connected_hardware:
i['counter'] = 0
def scan_hw(self):
from serial.tools import list_ports
serial_devices = list_ports.comports()
logger.info("Scanning connected hardware...")
for d in serial_devices:
if d.manufacturer in self.manufacturer:
# TI XDS110 can have multiple serial devices for a single board
# assume endpoint 0 is the serial, skip all others
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
continue
s_dev = {}
s_dev['platform'] = "unknown"
s_dev['id'] = d.serial_number
s_dev['serial'] = d.device
s_dev['product'] = d.product
s_dev['runner'] = 'unknown'
for runner, _ in self.runner_mapping.items():
products = self.runner_mapping.get(runner)
if d.product in products:
s_dev['runner'] = runner
continue
# Try regex matching
for p in products:
if re.match(p, d.product):
s_dev['runner'] = runner
s_dev['available'] = True
s_dev['connected'] = True
self.detected.append(s_dev)
else:
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
def write_map(self, hwm_file):
# use existing map
if os.path.exists(hwm_file):
with open(hwm_file, 'r') as yaml_file:
hwm = yaml.load(yaml_file, Loader=yaml.FullLoader)
# disconnect everything
for h in hwm:
h['connected'] = False
h['serial'] = None
for d in self.detected:
for h in hwm:
if d['id'] == h['id'] and d['product'] == h['product']:
h['connected'] = True
h['serial'] = d['serial']
d['match'] = True
new = list(filter(lambda n: not n.get('match', False), self.detected))
hwm = hwm + new
logger.info("Registered devices:")
self.dump(hwm)
with open(hwm_file, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, default_flow_style=False)
else:
# create new file
with open(hwm_file, 'w') as yaml_file:
yaml.dump(self.detected, yaml_file, default_flow_style=False)
logger.info("Detected devices:")
self.dump(self.detected)
@staticmethod
def dump(hwmap=[], filtered=[], header=[], connected_only=False):
print("")
table = []
if not header:
header = ["Platform", "ID", "Serial device"]
for p in sorted(hwmap, key=lambda i: i['platform']):
platform = p.get('platform')
connected = p.get('connected', False)
if filtered and platform not in filtered:
continue
if not connected_only or connected:
table.append([platform, p.get('id', None), p.get('serial')])
print(tabulate(table, headers=header, tablefmt="github"))
options = None
def main():
start_time = time.time()
global VERBOSE