sanitycheck: move hardware map generation out of main

Move all hardware map generation/usage to a seperate class. This will
make it easier to extend the supported hardware in the future.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2019-11-25 08:19:25 -05:00
commit 5f908829b2

View file

@ -198,6 +198,10 @@ try:
except ImportError:
print("Install the anytree module to use the --test-tree option")
try:
from tabulate import tabulate
except ImportError:
print("Install tabulate python module with pip to use --device-testing option.")
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
if not ZEPHYR_BASE:
@ -694,14 +698,14 @@ class DeviceHandler(Handler):
def device_is_available(self, device):
for i in self.suite.connected_hardware:
if i['platform'] == device and i['available'] and i['connected']:
if i['platform'] == device and i['available'] and i['serial']:
return True
return False
def get_available_device(self, device):
for i in self.suite.connected_hardware:
if i['platform'] == device and i['available']:
if i['platform'] == device and i['available'] and i['serial']:
i['available'] = False
i['counter'] += 1
return i
@ -2206,6 +2210,7 @@ class TestSuite:
# Keep track of which test cases we've filtered out and why
self.testcases = {}
self.platforms = []
self.selected_platforms = []
self.default_platforms = []
self.outdir = os.path.abspath(outdir)
self.discards = None
@ -2339,14 +2344,13 @@ class TestSuite:
COLOR_NORMAL,
self.duration))
platforms = set(p.platform for p in self.instances.values())
self.total_platforms = len(self.platforms)
if self.platforms:
info("In total {} test cases were executed on {} out of total {} platforms ({:02.2f}%)".format(
self.total_cases,
len(platforms),
len(self.selected_platforms),
self.total_platforms,
(100 * len(platforms) / len(self.platforms))
(100 * len(self.selected_platforms) / len(self.platforms))
))
def save_reports(self):
@ -2378,24 +2382,7 @@ class TestSuite:
if log_file:
log_file.close()
def load_hardware_map_from_cmdline(self, serial, platform):
device = {
"serial": serial,
"platform": platform,
"counter": 0,
"available": True,
"connected": True
}
self.connected_hardware = [device]
def load_hardware_map(self, map_file):
with open(map_file, 'r') as stream:
try:
self.connected_hardware = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
for i in self.connected_hardware:
i['counter'] = 0
def add_configurations(self):
@ -2406,7 +2393,7 @@ class TestSuite:
board_root)
for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")):
verbose("Found plaform configuration " + file)
verbose("Found platform configuration " + file)
try:
platform = Platform()
platform.load(file)
@ -2740,6 +2727,7 @@ class TestSuite:
case.create_overlay(case.platform)
self.discards = discards
self.selected_platforms = set(p.platform.name for p in self.instances.values())
return discards
@ -3718,6 +3706,121 @@ def native_and_unit_first(a, b):
return (a > b) - (a < b)
class HardwareMap:
manufacturer = [
'ARM',
'SEGGER',
'MBED',
'STMicroelectronics',
'Atmel Corp.',
'Texas Instruments',
'Silicon Labs',
'NXP Semiconductors'
]
runner_mapping = {
'pyocd': [
'DAPLink CMSIS-DAP',
'MBED CMSIS-DAP'
],
'jlink': [
'J-Link',
'J-Link OB'
],
'openocd': [
'STM32 STLink', '^XDS110.*'
]
}
def __init__(self):
self.detected = []
self.connected_hardware = []
def load_device_from_cmdline(self, serial, platform):
device = {
"serial": serial,
"platform": platform,
"counter": 0,
"available": True,
"connected": True
}
self.connected_hardware.append(device)
def load_hardware_map(self, map_file):
with open(map_file, 'r') as stream:
try:
self.connected_hardware = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
for i in self.connected_hardware:
i['counter'] = 0
def scan_hw(self):
from serial.tools import list_ports
serial_devices = list_ports.comports()
for d in serial_devices:
if d.manufacturer in self.manufacturer:
# TI XDS110 can have multiple serial devices for a single board
# assume endpoint 0 is the serial, skip all others
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
continue
s_dev = {}
s_dev['platform'] = "unknown"
s_dev['id'] = d.serial_number
s_dev['serial'] = d.device
s_dev['product'] = d.product
s_dev['runner'] = 'unknown'
for runner in self.runner_mapping.keys():
products = self.runner_mapping.get(runner)
if d.product in products:
s_dev['runner'] = runner
continue
# Try regex matching
for p in products:
if re.match(p, d.product):
s_dev['runner'] = runner
s_dev['available'] = True
s_dev['connected'] = True
self.detected.append(s_dev)
else:
print("Unsupported device (%s): %s" %(d.manufacturer, d))
def write_map(self, hwm_file):
# use existing map
if os.path.exists(hwm_file):
with open(hwm_file, 'r') as yaml_file:
hwm = yaml.load(yaml_file, Loader=yaml.FullLoader)
# disconnect everything
for h in hwm:
h['connected'] = False
h['serial'] = None
for d in self.detected:
for h in hwm:
if d['id'] == h['id'] and d['product'] == h['product']:
print("Already in map: %s (%s)" %(d['product'], d['id']))
h['connected'] = True
h['serial'] = d['serial']
d['match'] = True
new = list(filter(lambda n: not n.get('match', False), self.detected))
hwm = hwm + new
with open(hwm_file, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, default_flow_style=False)
else:
# create new file
with open(hwm_file, 'w') as yaml_file:
yaml.dump(self.detected, yaml_file, default_flow_style=False)
return
run_individual_tests = None
options = None
@ -3729,74 +3832,24 @@ def main():
options = parse_arguments()
hwm = HardwareMap()
if options.generate_hardware_map:
from serial.tools import list_ports
serial_devices = list_ports.comports()
filtered = []
for d in serial_devices:
if d.manufacturer in ['ARM', 'SEGGER', 'MBED', 'STMicroelectronics',
'Atmel Corp.', 'Texas Instruments',
'Silicon Labs', 'NXP Semiconductors']:
# TI XDS110 can have multiple serial devices for a single board
# assume endpoint 0 is the serial, skip all others
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
continue
s_dev = {}
s_dev['platform'] = "unknown"
s_dev['id'] = d.serial_number
s_dev['serial'] = d.device
s_dev['product'] = d.product
if s_dev['product'] in ['DAPLink CMSIS-DAP', 'MBED CMSIS-DAP']:
s_dev['runner'] = "pyocd"
elif s_dev['product'] in ['J-Link', 'J-Link OB']:
s_dev['runner'] = "jlink"
elif s_dev['product'] in ['STM32 STLink']:
s_dev['runner'] = "openocd"
elif s_dev['product'].startswith('XDS110'):
s_dev['runner'] = "openocd"
else:
s_dev['runner'] = "unknown"
s_dev['available'] = True
s_dev['connected'] = True
filtered.append(s_dev)
else:
print("Unsupported device (%s): %s" %(d.manufacturer, d))
if os.path.exists(options.generate_hardware_map):
# use existing map
with open(options.generate_hardware_map, 'r') as yaml_file:
hwm = yaml.load(yaml_file, Loader=yaml.FullLoader)
# disconnect everything
for h in hwm:
h['connected'] = False
h['serial'] = None
for d in filtered:
for h in hwm:
if d['id'] == h['id'] and d['product'] == h['product']:
print("Already in map: %s (%s)" %(d['product'], d['id']))
h['connected'] = True
h['serial'] = d['serial']
d['match'] = True
new = list(filter(lambda n: not n.get('match', False), filtered))
hwm = hwm + new
#import pprint
#pprint.pprint(hwm)
with open(options.generate_hardware_map, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, default_flow_style=False)
else:
# create new file
with open(options.generate_hardware_map, 'w') as yaml_file:
yaml.dump(filtered, yaml_file, default_flow_style=False)
hwm.scan_hw()
hwm.write_map(options.generate_hardware_map)
return
if not options.device_testing and options.hardware_map:
hwm.load_hardware_map(options.hardware_map)
print("\nAvailable devices:")
table = []
header = ["Platform", "ID", "Serial device"]
for p in hwm.connected_hardware:
platform = p.get('platform')
if p['connected']:
table.append([platform, p.get('id', None), p['serial']])
print(tabulate(table, headers=header, tablefmt="github"))
return
if options.west_runner and not options.west_flash:
error("west-runner requires west-flash to be enabled")
@ -3857,17 +3910,17 @@ def main():
if options.device_testing:
if options.hardware_map:
suite.load_hardware_map(options.hardware_map)
hwm.load_hardware_map(options.hardware_map)
suite.connected_hardware = hwm.connected_hardware
if not options.platform:
options.platform = []
for platform in suite.connected_hardware:
for platform in hwm.connected_hardware:
if platform['connected']:
options.platform.append(platform['platform'])
elif options.device_serial: #back-ward compatibility
if options.platform and len(options.platform) == 1:
suite.load_hardware_map_from_cmdline(options.device_serial,
options.platform[0])
hwm.load_device_from_cmdline(options.device_serial, options.platform[0])
else:
error("""When --device-testing is used with --device-serial, only one
platform is allowed""")
@ -3974,6 +4027,7 @@ def main():
if options.only_failed:
suite.get_last_failed()
suite.selected_platforms = set(p.platform.name for p in suite.instances.values())
elif options.load_tests:
suite.load_from_file(options.load_tests)
elif options.test_only:
@ -4037,9 +4091,14 @@ def main():
if options.device_testing:
print("\nDevice testing on:")
table = []
header = ["Platform", "ID", "Serial device"]
for p in suite.connected_hardware:
if p['connected']:
print("%s (%s) on %s" %(p['platform'], p.get('id', None), p['serial']))
platform = p.get('platform')
if p['connected'] and platform in suite.selected_platforms:
table.append([platform, p.get('id', None), p['serial']])
print(tabulate(table, headers=header, tablefmt="github"))
print("")
if options.dry_run:
duration = time.time() - start_time
@ -4098,9 +4157,14 @@ def main():
if options.device_testing:
print("\nHardware distribution summary:\n")
for p in suite.connected_hardware:
if p['connected']:
print("%s (%s): %d" %(p['platform'], p.get('id', None), p['counter']))
table = []
header = ['Board', 'ID', 'Counter']
for p in hwm.connected_hardware:
if p['connected'] and p['platform'] in suite.selected_platforms:
row = [p['platform'], p.get('id', None), p['counter']]
table.append(row)
print(tabulate(table, headers=header, tablefmt="github"))
suite.save_reports()
if suite.total_failed or (suite.warnings and options.warnings_as_errors):