twister: split hardwaremap class out
Split hardware map class into own file. Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
parent
d5424d5116
commit
2b213d3231
3 changed files with 302 additions and 292 deletions
300
scripts/pylib/twister/hardwaremap.py
Normal file
300
scripts/pylib/twister/hardwaremap.py
Normal file
|
@ -0,0 +1,300 @@
|
|||
#!/usr/bin/env python3
|
||||
# vim: set syntax=python ts=4 :
|
||||
#
|
||||
# Copyright (c) 2022 Intel Corporation
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import os
|
||||
from multiprocessing import Lock, Value
|
||||
from enviornment import ZEPHYR_BASE
|
||||
|
||||
class DUT(object):
|
||||
def __init__(self,
|
||||
id=None,
|
||||
serial=None,
|
||||
serial_baud=None,
|
||||
platform=None,
|
||||
product=None,
|
||||
serial_pty=None,
|
||||
connected=False,
|
||||
runner_params=None,
|
||||
pre_script=None,
|
||||
post_script=None,
|
||||
post_flash_script=None,
|
||||
runner=None):
|
||||
|
||||
self.serial = serial
|
||||
self.baud = serial_baud or 115200
|
||||
self.platform = platform
|
||||
self.serial_pty = serial_pty
|
||||
self._counter = Value("i", 0)
|
||||
self._available = Value("i", 1)
|
||||
self.connected = connected
|
||||
self.pre_script = pre_script
|
||||
self.id = id
|
||||
self.product = product
|
||||
self.runner = runner
|
||||
self.runner_params = runner_params
|
||||
self.fixtures = []
|
||||
self.post_flash_script = post_flash_script
|
||||
self.post_script = post_script
|
||||
self.pre_script = pre_script
|
||||
self.probe_id = None
|
||||
self.notes = None
|
||||
self.lock = Lock()
|
||||
self.match = False
|
||||
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
with self._available.get_lock():
|
||||
return self._available.value
|
||||
|
||||
@available.setter
|
||||
def available(self, value):
|
||||
with self._available.get_lock():
|
||||
self._available.value = value
|
||||
|
||||
@property
|
||||
def counter(self):
|
||||
with self._counter.get_lock():
|
||||
return self._counter.value
|
||||
|
||||
@counter.setter
|
||||
def counter(self, value):
|
||||
with self._counter.get_lock():
|
||||
self._counter.value = value
|
||||
|
||||
def to_dict(self):
|
||||
d = {}
|
||||
exclude = ['_available', '_counter', 'match']
|
||||
v = vars(self)
|
||||
for k in v.keys():
|
||||
if k not in exclude and v[k]:
|
||||
d[k] = v[k]
|
||||
return d
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self.platform} ({self.product}) on {self.serial}>"
|
||||
|
||||
class HardwareMap:
|
||||
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml")
|
||||
|
||||
manufacturer = [
|
||||
'ARM',
|
||||
'SEGGER',
|
||||
'MBED',
|
||||
'STMicroelectronics',
|
||||
'Atmel Corp.',
|
||||
'Texas Instruments',
|
||||
'Silicon Labs',
|
||||
'NXP Semiconductors',
|
||||
'Microchip Technology Inc.',
|
||||
'FTDI',
|
||||
'Digilent'
|
||||
]
|
||||
|
||||
runner_mapping = {
|
||||
'pyocd': [
|
||||
'DAPLink CMSIS-DAP',
|
||||
'MBED CMSIS-DAP'
|
||||
],
|
||||
'jlink': [
|
||||
'J-Link',
|
||||
'J-Link OB'
|
||||
],
|
||||
'openocd': [
|
||||
'STM32 STLink', '^XDS110.*', 'STLINK-V3'
|
||||
],
|
||||
'dediprog': [
|
||||
'TTL232R-3V3',
|
||||
'MCP2200 USB Serial Port Emulator'
|
||||
]
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.detected = []
|
||||
self.duts = []
|
||||
|
||||
def add_device(self, serial, platform, pre_script, is_pty, baud=None):
|
||||
device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud)
|
||||
|
||||
if is_pty:
|
||||
device.serial_pty = serial
|
||||
else:
|
||||
device.serial = serial
|
||||
|
||||
self.duts.append(device)
|
||||
|
||||
def load(self, map_file):
|
||||
hwm_schema = scl.yaml_load(self.schema_path)
|
||||
duts = scl.yaml_load_verify(map_file, hwm_schema)
|
||||
for dut in duts:
|
||||
pre_script = dut.get('pre_script')
|
||||
post_script = dut.get('post_script')
|
||||
post_flash_script = dut.get('post_flash_script')
|
||||
platform = dut.get('platform')
|
||||
id = dut.get('id')
|
||||
runner = dut.get('runner')
|
||||
runner_params = dut.get('runner_params')
|
||||
serial_pty = dut.get('serial_pty')
|
||||
serial = dut.get('serial')
|
||||
baud = dut.get('baud', None)
|
||||
product = dut.get('product')
|
||||
fixtures = dut.get('fixtures', [])
|
||||
connected= dut.get('connected') and ((serial or serial_pty) is not None)
|
||||
new_dut = DUT(platform=platform,
|
||||
product=product,
|
||||
runner=runner,
|
||||
runner_params=runner_params,
|
||||
id=id,
|
||||
serial_pty=serial_pty,
|
||||
serial=serial,
|
||||
serial_baud=baud,
|
||||
connected=connected,
|
||||
pre_script=pre_script,
|
||||
post_script=post_script,
|
||||
post_flash_script=post_flash_script)
|
||||
new_dut.fixtures = fixtures
|
||||
new_dut.counter = 0
|
||||
self.duts.append(new_dut)
|
||||
|
||||
def scan(self, persistent=False):
|
||||
from serial.tools import list_ports
|
||||
|
||||
if persistent and platform.system() == 'Linux':
|
||||
# On Linux, /dev/serial/by-id provides symlinks to
|
||||
# '/dev/ttyACMx' nodes using names which are unique as
|
||||
# long as manufacturers fill out USB metadata nicely.
|
||||
#
|
||||
# This creates a map from '/dev/ttyACMx' device nodes
|
||||
# to '/dev/serial/by-id/usb-...' symlinks. The symlinks
|
||||
# go into the hardware map because they stay the same
|
||||
# even when the user unplugs / replugs the device.
|
||||
#
|
||||
# Some inexpensive USB/serial adapters don't result
|
||||
# in unique names here, though, so use of this feature
|
||||
# requires explicitly setting persistent=True.
|
||||
by_id = Path('/dev/serial/by-id')
|
||||
def readlink(link):
|
||||
return str((by_id / link).resolve())
|
||||
|
||||
persistent_map = {readlink(link): str(link)
|
||||
for link in by_id.iterdir()}
|
||||
else:
|
||||
persistent_map = {}
|
||||
|
||||
serial_devices = list_ports.comports()
|
||||
logger.info("Scanning connected hardware...")
|
||||
for d in serial_devices:
|
||||
if d.manufacturer in self.manufacturer:
|
||||
|
||||
# TI XDS110 can have multiple serial devices for a single board
|
||||
# assume endpoint 0 is the serial, skip all others
|
||||
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
|
||||
continue
|
||||
s_dev = DUT(platform="unknown",
|
||||
id=d.serial_number,
|
||||
serial=persistent_map.get(d.device, d.device),
|
||||
product=d.product,
|
||||
runner='unknown',
|
||||
connected=True)
|
||||
|
||||
for runner, _ in self.runner_mapping.items():
|
||||
products = self.runner_mapping.get(runner)
|
||||
if d.product in products:
|
||||
s_dev.runner = runner
|
||||
continue
|
||||
# Try regex matching
|
||||
for p in products:
|
||||
if re.match(p, d.product):
|
||||
s_dev.runner = runner
|
||||
|
||||
s_dev.connected = True
|
||||
s_dev.lock = None
|
||||
self.detected.append(s_dev)
|
||||
else:
|
||||
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
|
||||
|
||||
def save(self, hwm_file):
|
||||
# use existing map
|
||||
self.detected.sort(key=lambda x: x.serial or '')
|
||||
if os.path.exists(hwm_file):
|
||||
with open(hwm_file, 'r') as yaml_file:
|
||||
hwm = yaml.load(yaml_file, Loader=SafeLoader)
|
||||
if hwm:
|
||||
hwm.sort(key=lambda x: x.get('id', ''))
|
||||
|
||||
# disconnect everything
|
||||
for h in hwm:
|
||||
h['connected'] = False
|
||||
h['serial'] = None
|
||||
|
||||
for _detected in self.detected:
|
||||
for h in hwm:
|
||||
if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match:
|
||||
h['connected'] = True
|
||||
h['serial'] = _detected.serial
|
||||
_detected.match = True
|
||||
|
||||
new_duts = list(filter(lambda d: not d.match, self.detected))
|
||||
new = []
|
||||
for d in new_duts:
|
||||
new.append(d.to_dict())
|
||||
|
||||
if hwm:
|
||||
hwm = hwm + new
|
||||
else:
|
||||
hwm = new
|
||||
|
||||
with open(hwm_file, 'w') as yaml_file:
|
||||
yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False)
|
||||
|
||||
self.load(hwm_file)
|
||||
logger.info("Registered devices:")
|
||||
self.dump()
|
||||
|
||||
else:
|
||||
# create new file
|
||||
dl = []
|
||||
for _connected in self.detected:
|
||||
platform = _connected.platform
|
||||
id = _connected.id
|
||||
runner = _connected.runner
|
||||
serial = _connected.serial
|
||||
product = _connected.product
|
||||
d = {
|
||||
'platform': platform,
|
||||
'id': id,
|
||||
'runner': runner,
|
||||
'serial': serial,
|
||||
'product': product,
|
||||
'connected': _connected.connected
|
||||
}
|
||||
dl.append(d)
|
||||
with open(hwm_file, 'w') as yaml_file:
|
||||
yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False)
|
||||
logger.info("Detected devices:")
|
||||
self.dump(detected=True)
|
||||
|
||||
def dump(self, filtered=[], header=[], connected_only=False, detected=False):
|
||||
print("")
|
||||
table = []
|
||||
if detected:
|
||||
to_show = self.detected
|
||||
else:
|
||||
to_show = self.duts
|
||||
|
||||
if not header:
|
||||
header = ["Platform", "ID", "Serial device"]
|
||||
for p in to_show:
|
||||
platform = p.platform
|
||||
connected = p.connected
|
||||
if filtered and platform not in filtered:
|
||||
continue
|
||||
|
||||
if not connected_only or connected:
|
||||
table.append([platform, p.id, p.serial])
|
||||
|
||||
print(tabulate(table, headers=header, tablefmt="github"))
|
|
@ -3613,296 +3613,5 @@ class Gcovr(CoverageTool):
|
|||
stdout=coveragelog)
|
||||
|
||||
|
||||
class DUT(object):
|
||||
def __init__(self,
|
||||
id=None,
|
||||
serial=None,
|
||||
serial_baud=None,
|
||||
platform=None,
|
||||
product=None,
|
||||
serial_pty=None,
|
||||
connected=False,
|
||||
runner_params=None,
|
||||
pre_script=None,
|
||||
post_script=None,
|
||||
post_flash_script=None,
|
||||
runner=None):
|
||||
|
||||
self.serial = serial
|
||||
self.baud = serial_baud or 115200
|
||||
self.platform = platform
|
||||
self.serial_pty = serial_pty
|
||||
self._counter = Value("i", 0)
|
||||
self._available = Value("i", 1)
|
||||
self.connected = connected
|
||||
self.pre_script = pre_script
|
||||
self.id = id
|
||||
self.product = product
|
||||
self.runner = runner
|
||||
self.runner_params = runner_params
|
||||
self.fixtures = []
|
||||
self.post_flash_script = post_flash_script
|
||||
self.post_script = post_script
|
||||
self.pre_script = pre_script
|
||||
self.probe_id = None
|
||||
self.notes = None
|
||||
self.lock = Lock()
|
||||
self.match = False
|
||||
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
with self._available.get_lock():
|
||||
return self._available.value
|
||||
|
||||
@available.setter
|
||||
def available(self, value):
|
||||
with self._available.get_lock():
|
||||
self._available.value = value
|
||||
|
||||
@property
|
||||
def counter(self):
|
||||
with self._counter.get_lock():
|
||||
return self._counter.value
|
||||
|
||||
@counter.setter
|
||||
def counter(self, value):
|
||||
with self._counter.get_lock():
|
||||
self._counter.value = value
|
||||
|
||||
def to_dict(self):
|
||||
d = {}
|
||||
exclude = ['_available', '_counter', 'match']
|
||||
v = vars(self)
|
||||
for k in v.keys():
|
||||
if k not in exclude and v[k]:
|
||||
d[k] = v[k]
|
||||
return d
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self.platform} ({self.product}) on {self.serial}>"
|
||||
|
||||
class HardwareMap:
|
||||
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml")
|
||||
|
||||
manufacturer = [
|
||||
'ARM',
|
||||
'SEGGER',
|
||||
'MBED',
|
||||
'STMicroelectronics',
|
||||
'Atmel Corp.',
|
||||
'Texas Instruments',
|
||||
'Silicon Labs',
|
||||
'NXP Semiconductors',
|
||||
'Microchip Technology Inc.',
|
||||
'FTDI',
|
||||
'Digilent'
|
||||
]
|
||||
|
||||
runner_mapping = {
|
||||
'pyocd': [
|
||||
'DAPLink CMSIS-DAP',
|
||||
'MBED CMSIS-DAP'
|
||||
],
|
||||
'jlink': [
|
||||
'J-Link',
|
||||
'J-Link OB'
|
||||
],
|
||||
'openocd': [
|
||||
'STM32 STLink', '^XDS110.*', 'STLINK-V3'
|
||||
],
|
||||
'dediprog': [
|
||||
'TTL232R-3V3',
|
||||
'MCP2200 USB Serial Port Emulator'
|
||||
]
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.detected = []
|
||||
self.duts = []
|
||||
|
||||
def add_device(self, serial, platform, pre_script, is_pty, baud=None):
|
||||
device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud)
|
||||
|
||||
if is_pty:
|
||||
device.serial_pty = serial
|
||||
else:
|
||||
device.serial = serial
|
||||
|
||||
self.duts.append(device)
|
||||
|
||||
def load(self, map_file):
|
||||
hwm_schema = scl.yaml_load(self.schema_path)
|
||||
duts = scl.yaml_load_verify(map_file, hwm_schema)
|
||||
for dut in duts:
|
||||
pre_script = dut.get('pre_script')
|
||||
post_script = dut.get('post_script')
|
||||
post_flash_script = dut.get('post_flash_script')
|
||||
platform = dut.get('platform')
|
||||
id = dut.get('id')
|
||||
runner = dut.get('runner')
|
||||
runner_params = dut.get('runner_params')
|
||||
serial_pty = dut.get('serial_pty')
|
||||
serial = dut.get('serial')
|
||||
baud = dut.get('baud', None)
|
||||
product = dut.get('product')
|
||||
fixtures = dut.get('fixtures', [])
|
||||
connected= dut.get('connected') and ((serial or serial_pty) is not None)
|
||||
new_dut = DUT(platform=platform,
|
||||
product=product,
|
||||
runner=runner,
|
||||
runner_params=runner_params,
|
||||
id=id,
|
||||
serial_pty=serial_pty,
|
||||
serial=serial,
|
||||
serial_baud=baud,
|
||||
connected=connected,
|
||||
pre_script=pre_script,
|
||||
post_script=post_script,
|
||||
post_flash_script=post_flash_script)
|
||||
new_dut.fixtures = fixtures
|
||||
new_dut.counter = 0
|
||||
self.duts.append(new_dut)
|
||||
|
||||
def scan(self, persistent=False):
|
||||
from serial.tools import list_ports
|
||||
|
||||
if persistent and platform.system() == 'Linux':
|
||||
# On Linux, /dev/serial/by-id provides symlinks to
|
||||
# '/dev/ttyACMx' nodes using names which are unique as
|
||||
# long as manufacturers fill out USB metadata nicely.
|
||||
#
|
||||
# This creates a map from '/dev/ttyACMx' device nodes
|
||||
# to '/dev/serial/by-id/usb-...' symlinks. The symlinks
|
||||
# go into the hardware map because they stay the same
|
||||
# even when the user unplugs / replugs the device.
|
||||
#
|
||||
# Some inexpensive USB/serial adapters don't result
|
||||
# in unique names here, though, so use of this feature
|
||||
# requires explicitly setting persistent=True.
|
||||
by_id = Path('/dev/serial/by-id')
|
||||
def readlink(link):
|
||||
return str((by_id / link).resolve())
|
||||
|
||||
persistent_map = {readlink(link): str(link)
|
||||
for link in by_id.iterdir()}
|
||||
else:
|
||||
persistent_map = {}
|
||||
|
||||
serial_devices = list_ports.comports()
|
||||
logger.info("Scanning connected hardware...")
|
||||
for d in serial_devices:
|
||||
if d.manufacturer in self.manufacturer:
|
||||
|
||||
# TI XDS110 can have multiple serial devices for a single board
|
||||
# assume endpoint 0 is the serial, skip all others
|
||||
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
|
||||
continue
|
||||
s_dev = DUT(platform="unknown",
|
||||
id=d.serial_number,
|
||||
serial=persistent_map.get(d.device, d.device),
|
||||
product=d.product,
|
||||
runner='unknown',
|
||||
connected=True)
|
||||
|
||||
for runner, _ in self.runner_mapping.items():
|
||||
products = self.runner_mapping.get(runner)
|
||||
if d.product in products:
|
||||
s_dev.runner = runner
|
||||
continue
|
||||
# Try regex matching
|
||||
for p in products:
|
||||
if re.match(p, d.product):
|
||||
s_dev.runner = runner
|
||||
|
||||
s_dev.connected = True
|
||||
s_dev.lock = None
|
||||
self.detected.append(s_dev)
|
||||
else:
|
||||
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
|
||||
|
||||
def save(self, hwm_file):
|
||||
# use existing map
|
||||
self.detected.sort(key=lambda x: x.serial or '')
|
||||
if os.path.exists(hwm_file):
|
||||
with open(hwm_file, 'r') as yaml_file:
|
||||
hwm = yaml.load(yaml_file, Loader=SafeLoader)
|
||||
if hwm:
|
||||
hwm.sort(key=lambda x: x.get('id', ''))
|
||||
|
||||
# disconnect everything
|
||||
for h in hwm:
|
||||
h['connected'] = False
|
||||
h['serial'] = None
|
||||
|
||||
for _detected in self.detected:
|
||||
for h in hwm:
|
||||
if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match:
|
||||
h['connected'] = True
|
||||
h['serial'] = _detected.serial
|
||||
_detected.match = True
|
||||
|
||||
new_duts = list(filter(lambda d: not d.match, self.detected))
|
||||
new = []
|
||||
for d in new_duts:
|
||||
new.append(d.to_dict())
|
||||
|
||||
if hwm:
|
||||
hwm = hwm + new
|
||||
else:
|
||||
hwm = new
|
||||
|
||||
with open(hwm_file, 'w') as yaml_file:
|
||||
yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False)
|
||||
|
||||
self.load(hwm_file)
|
||||
logger.info("Registered devices:")
|
||||
self.dump()
|
||||
|
||||
else:
|
||||
# create new file
|
||||
dl = []
|
||||
for _connected in self.detected:
|
||||
platform = _connected.platform
|
||||
id = _connected.id
|
||||
runner = _connected.runner
|
||||
serial = _connected.serial
|
||||
product = _connected.product
|
||||
d = {
|
||||
'platform': platform,
|
||||
'id': id,
|
||||
'runner': runner,
|
||||
'serial': serial,
|
||||
'product': product,
|
||||
'connected': _connected.connected
|
||||
}
|
||||
dl.append(d)
|
||||
with open(hwm_file, 'w') as yaml_file:
|
||||
yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False)
|
||||
logger.info("Detected devices:")
|
||||
self.dump(detected=True)
|
||||
|
||||
def dump(self, filtered=[], header=[], connected_only=False, detected=False):
|
||||
print("")
|
||||
table = []
|
||||
if detected:
|
||||
to_show = self.detected
|
||||
else:
|
||||
to_show = self.duts
|
||||
|
||||
if not header:
|
||||
header = ["Platform", "ID", "Serial device"]
|
||||
for p in to_show:
|
||||
platform = p.platform
|
||||
connected = p.connected
|
||||
if filtered and platform not in filtered:
|
||||
continue
|
||||
|
||||
if not connected_only or connected:
|
||||
table.append([platform, p.id, p.serial])
|
||||
|
||||
print(tabulate(table, headers=header, tablefmt="github"))
|
||||
|
||||
def init(colorama_strip):
|
||||
colorama.init(strip=colorama_strip)
|
||||
|
|
|
@ -206,9 +206,10 @@ except ImportError:
|
|||
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
|
||||
|
||||
import twisterlib
|
||||
from twisterlib import HardwareMap, TestPlan, SizeCalculator, CoverageTool, ExecutionCounter
|
||||
from twisterlib import TestPlan, SizeCalculator, CoverageTool, ExecutionCounter
|
||||
from enviornment import TwisterEnv, canonical_zephyr_base
|
||||
from reports import Reporting
|
||||
from hardwaremap import HardwareMap
|
||||
|
||||
logger = logging.getLogger('twister')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue