scripts: west_commands: decouple runners pkg from west

I've had some requests to be able to use code in the runners package
without having west installed.

It turns out to be pretty easy to make this happen, as west is
currently only used for west.log and some trivial helper methods:

- To replace west log, use the standard logging module
- Add an appropriate handler for each runner's logger in
  run_common.py which delegates to west.log, to keep
  output working as expected.

Signed-off-by: Marti Bolivar <marti.bolivar@nordicsemi.no>
This commit is contained in:
Marti Bolivar 2019-06-02 21:33:41 -06:00 committed by Carles Cufí
commit ddce583ca2
10 changed files with 148 additions and 97 deletions

View file

@ -6,6 +6,7 @@
''' '''
import argparse import argparse
import logging
from os import getcwd, path from os import getcwd, path
from subprocess import CalledProcessError from subprocess import CalledProcessError
import textwrap import textwrap
@ -25,6 +26,44 @@ from zephyr_ext_common import cached_runner_config
# Don't change this, or output from argparse won't match up. # Don't change this, or output from argparse won't match up.
INDENT = ' ' * 2 INDENT = ' ' * 2
if log.VERBOSE >= log.VERBOSE_NORMAL:
# Using level 1 allows sub-DEBUG levels of verbosity. The
# west.log module decides whether or not to actually print the
# message.
#
# https://docs.python.org/3.7/library/logging.html#logging-levels.
LOG_LEVEL = 1
else:
LOG_LEVEL = logging.INFO
class WestLogFormatter(logging.Formatter):
def __init__(self):
super().__init__(fmt='%(message)s')
class WestLogHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setFormatter(WestLogFormatter())
self.setLevel(LOG_LEVEL)
def emit(self, record):
fmt = self.format(record)
lvl = record.levelno
if lvl > logging.CRITICAL:
log.die(fmt)
elif lvl >= logging.ERROR:
log.err(fmt)
elif lvl >= logging.WARNING:
log.wrn(fmt)
elif lvl >= logging.INFO:
log.inf(fmt)
elif lvl >= logging.DEBUG:
log.dbg(fmt)
else:
log.dbg(fmt, level=log.VERBOSE_EXTREME)
def add_parser_common(parser_adder, command): def add_parser_common(parser_adder, command):
parser = parser_adder.add_parser( parser = parser_adder.add_parser(
@ -201,9 +240,13 @@ def do_run_common(command, args, runner_args, cached_runner_var):
# At this point, the common options above are already parsed in # At this point, the common options above are already parsed in
# 'args', and unrecognized arguments are in 'runner_args'. # 'args', and unrecognized arguments are in 'runner_args'.
# #
# - Set up runner logging to delegate to west.
# - Pull the RunnerConfig out of the cache # - Pull the RunnerConfig out of the cache
# - Override cached values with applicable command-line options # - Override cached values with applicable command-line options
logger = logging.getLogger('runners')
logger.setLevel(LOG_LEVEL)
logger.addHandler(WestLogHandler())
cfg = cached_runner_config(build_dir, cache) cfg = cached_runner_config(build_dir, cache)
_override_config_from_namespace(cfg, args) _override_config_from_namespace(cfg, args)

View file

@ -14,19 +14,21 @@ as well as some other helpers for concrete runner classes.
import abc import abc
import argparse import argparse
import errno import errno
import logging
import os import os
import platform import platform
import shlex
import shutil import shutil
import signal import signal
import subprocess import subprocess
from west import log # Turn on to enable just logging the commands that would be run (at
from west.util import quote_sh_list # info rather than debug level), without actually running them. This
# can break runners that are expecting output or if one command
# depends on another, so it's just for debugging.
_DRY_RUN = False
# Turn on to enable just printing the commands that would be run, _logger = logging.getLogger('runners')
# without actually running them. This can break runners that are expecting
# output or if one command depends on another, so it's just for debugging.
JUST_PRINT = False
class _DebugDummyPopen: class _DebugDummyPopen:
@ -313,6 +315,17 @@ class ZephyrBinaryRunner(abc.ABC):
3. Give your runner's name to the Zephyr build system in your 3. Give your runner's name to the Zephyr build system in your
board's board.cmake. board's board.cmake.
Some advice on input and output:
- If you need to ask the user something (e.g. using input()), do it
in your create() classmethod, not do_run(). That ensures your
__init__() really has everything it needs to call do_run(), and also
avoids calling input() when not instantiating within a command line
application.
- Use self.logger to log messages using the standard library's
logging API; your logger is named "runner.<your-runner-name()>"
For command-line invocation from the Zephyr build system, runners For command-line invocation from the Zephyr build system, runners
define their own argparse-based interface through the common define their own argparse-based interface through the common
add_parser() (and runner-specific do_add_parser() it delegates add_parser() (and runner-specific do_add_parser() it delegates
@ -330,6 +343,10 @@ class ZephyrBinaryRunner(abc.ABC):
``cfg`` is a RunnerConfig instance.''' ``cfg`` is a RunnerConfig instance.'''
self.cfg = cfg self.cfg = cfg
'''RunnerConfig for this instance.'''
self.logger = logging.getLogger('runners.{}'.format(self.name()))
'''logging.Logger for this instance.'''
@staticmethod @staticmethod
def get_runners(): def get_runners():
@ -465,6 +482,13 @@ class ZephyrBinaryRunner(abc.ABC):
server_proc.terminate() server_proc.terminate()
server_proc.wait() server_proc.wait()
def _log_cmd(self, cmd):
escaped = ' '.join(shlex.quote(s) for s in cmd)
if not _DRY_RUN:
self.logger.debug(escaped)
else:
self.logger.info(escaped)
def call(self, cmd): def call(self, cmd):
'''Subclass subprocess.call() wrapper. '''Subclass subprocess.call() wrapper.
@ -472,13 +496,9 @@ class ZephyrBinaryRunner(abc.ABC):
subprocess and get its return code, rather than subprocess and get its return code, rather than
using subprocess directly, to keep accurate debug logs. using subprocess directly, to keep accurate debug logs.
''' '''
quoted = quote_sh_list(cmd) self._log_cmd(cmd)
if _DRY_RUN:
if JUST_PRINT:
log.inf(quoted)
return 0 return 0
log.dbg(quoted)
return subprocess.call(cmd) return subprocess.call(cmd)
def check_call(self, cmd): def check_call(self, cmd):
@ -488,13 +508,9 @@ class ZephyrBinaryRunner(abc.ABC):
subprocess and check that it executed correctly, rather than subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs. using subprocess directly, to keep accurate debug logs.
''' '''
quoted = quote_sh_list(cmd) self._log_cmd(cmd)
if _DRY_RUN:
if JUST_PRINT:
log.inf(quoted)
return return
log.dbg(quoted)
try: try:
subprocess.check_call(cmd) subprocess.check_call(cmd)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
@ -507,13 +523,9 @@ class ZephyrBinaryRunner(abc.ABC):
subprocess and check that it executed correctly, rather than subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs. using subprocess directly, to keep accurate debug logs.
''' '''
quoted = quote_sh_list(cmd) self._log_cmd(cmd)
if _DRY_RUN:
if JUST_PRINT:
log.inf(quoted)
return b'' return b''
log.dbg(quoted)
try: try:
return subprocess.check_output(cmd) return subprocess.check_output(cmd)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
@ -526,16 +538,14 @@ class ZephyrBinaryRunner(abc.ABC):
cflags = 0 cflags = 0
preexec = None preexec = None
system = platform.system() system = platform.system()
quoted = quote_sh_list(cmd)
if system == 'Windows': if system == 'Windows':
cflags |= subprocess.CREATE_NEW_PROCESS_GROUP cflags |= subprocess.CREATE_NEW_PROCESS_GROUP
elif system in {'Linux', 'Darwin'}: elif system in {'Linux', 'Darwin'}:
preexec = os.setsid preexec = os.setsid
if JUST_PRINT: self._log_cmd(cmd)
log.inf(quoted) if _DRY_RUN:
return _DebugDummyPopen() return _DebugDummyPopen()
log.dbg(quoted)
return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec) return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec)

View file

@ -8,8 +8,6 @@ from collections import namedtuple
import sys import sys
import time import time
from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps, \ from runners.core import ZephyrBinaryRunner, RunnerCaps, \
BuildConfiguration BuildConfiguration
@ -36,6 +34,7 @@ class DfuUtilBinaryRunner(ZephyrBinaryRunner):
else: else:
self.dfuse = True self.dfuse = True
self.dfuse_config = dfuse_config self.dfuse_config = dfuse_config
self.reset = False
@classmethod @classmethod
def name(cls): def name(cls):
@ -80,8 +79,17 @@ class DfuUtilBinaryRunner(ZephyrBinaryRunner):
else: else:
dcfg = None dcfg = None
return DfuUtilBinaryRunner(cfg, args.pid, args.alt, args.img, ret = DfuUtilBinaryRunner(cfg, args.pid, args.alt, args.img,
exe=args.dfu_util, dfuse_config=dcfg) exe=args.dfu_util, dfuse_config=dcfg)
ret.ensure_device()
return ret
def ensure_device(self):
if not self.find_device():
self.reset = True
print('Please reset your board to switch to DFU mode...')
while not self.find_device():
time.sleep(0.1)
def find_device(self): def find_device(self):
cmd = list(self.cmd) + ['-l'] cmd = list(self.cmd) + ['-l']
@ -91,17 +99,9 @@ class DfuUtilBinaryRunner(ZephyrBinaryRunner):
def do_run(self, command, **kwargs): def do_run(self, command, **kwargs):
self.require(self.cmd[0]) self.require(self.cmd[0])
reset = False
if not self.find_device(): if not self.find_device():
reset = True raise RuntimeError('device not found')
log.dbg('Device not found, waiting for it',
level=log.VERBOSE_EXTREME)
# Use of print() here is advised. We don't want to lose
# this information in a separate log -- this is
# interactive and requires a terminal.
print('Please reset your board to switch to DFU mode...')
while not self.find_device():
time.sleep(0.1)
cmd = list(self.cmd) cmd = list(self.cmd)
if self.dfuse: if self.dfuse:
@ -118,6 +118,6 @@ class DfuUtilBinaryRunner(ZephyrBinaryRunner):
# #
# DfuSe targets do as well, except when 'leave' is given # DfuSe targets do as well, except when 'leave' is given
# as an option. # as an option.
reset = False self.reset = False
if reset: if self.reset:
print('Now reset your board again to switch back to runtime mode.') print('Now reset your board again to switch back to runtime mode.')

View file

@ -6,8 +6,6 @@
from os import path from os import path
from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps from runners.core import ZephyrBinaryRunner, RunnerCaps
@ -95,8 +93,9 @@ class Esp32BinaryRunner(ZephyrBinaryRunner):
else: else:
cmd_flash.extend(['0x1000', bin_name]) cmd_flash.extend(['0x1000', bin_name])
log.inf("Converting ELF to BIN") self.logger.info("Converting ELF to BIN")
self.check_call(cmd_convert) self.check_call(cmd_convert)
log.inf("Flashing ESP32 on {} ({}bps)".format(self.device, self.baud)) self.logger.info("Flashing ESP32 on {} ({}bps)".
format(self.device, self.baud))
self.check_call(cmd_flash) self.check_call(cmd_flash)

View file

@ -7,7 +7,6 @@
from os import path from os import path
import time import time
import signal import signal
from west import log
from runners.core import ZephyrBinaryRunner from runners.core import ZephyrBinaryRunner
@ -84,7 +83,7 @@ class IntelS1000BinaryRunner(ZephyrBinaryRunner):
jtag_instr_file = kwargs['ocd-jtag-instr'] jtag_instr_file = kwargs['ocd-jtag-instr']
gdb_flash_file = kwargs['gdb-flash-file'] gdb_flash_file = kwargs['gdb-flash-file']
self.print_gdbserver_message(self.gdb_port) self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir, server_cmd = [self.xt_ocd_dir,
'-c', topology_file, '-c', topology_file,
'-I', jtag_instr_file] '-I', jtag_instr_file]
@ -122,7 +121,7 @@ class IntelS1000BinaryRunner(ZephyrBinaryRunner):
topology_file = kwargs['ocd-topology'] topology_file = kwargs['ocd-topology']
jtag_instr_file = kwargs['ocd-jtag-instr'] jtag_instr_file = kwargs['ocd-jtag-instr']
self.print_gdbserver_message(self.gdb_port) self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir, server_cmd = [self.xt_ocd_dir,
'-c', topology_file, '-c', topology_file,
'-I', jtag_instr_file] '-I', jtag_instr_file]
@ -152,14 +151,11 @@ class IntelS1000BinaryRunner(ZephyrBinaryRunner):
server_proc.terminate() server_proc.terminate()
server_proc.wait() server_proc.wait()
def print_gdbserver_message(self, gdb_port):
log.inf('Intel S1000 GDB server running on port {}'.format(gdb_port))
def debugserver(self, **kwargs): def debugserver(self, **kwargs):
topology_file = kwargs['ocd-topology'] topology_file = kwargs['ocd-topology']
jtag_instr_file = kwargs['ocd-jtag-instr'] jtag_instr_file = kwargs['ocd-jtag-instr']
self.print_gdbserver_message(self.gdb_port) self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir, server_cmd = [self.xt_ocd_dir,
'-c', topology_file, '-c', topology_file,
'-I', jtag_instr_file] '-I', jtag_instr_file]
@ -170,3 +166,7 @@ class IntelS1000BinaryRunner(ZephyrBinaryRunner):
time.sleep(6) time.sleep(6)
server_proc.terminate() server_proc.terminate()
self.check_call(server_cmd) self.check_call(server_cmd)
def log_gdbserver_message(self, gdb_port):
self.logger.info('Intel S1000 GDB server running on port {}'.
format(gdb_port))

View file

@ -7,11 +7,9 @@
import argparse import argparse
import os import os
import shlex import shlex
import shutil
import sys import sys
import tempfile import tempfile
from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps, \ from runners.core import ZephyrBinaryRunner, RunnerCaps, \
BuildConfiguration BuildConfiguration
@ -105,7 +103,8 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
tui=args.tui, tool_opt=args.tool_opt) tui=args.tui, tool_opt=args.tool_opt)
def print_gdbserver_message(self): def print_gdbserver_message(self):
log.inf('J-Link GDB server running on port {}'.format(self.gdb_port)) self.logger.info('J-Link GDB server running on port {}'.
format(self.gdb_port))
def do_run(self, command, **kwargs): def do_run(self, command, **kwargs):
server_cmd = ([self.gdbserver] + server_cmd = ([self.gdbserver] +
@ -162,8 +161,8 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
lines.append('g') # Start the CPU lines.append('g') # Start the CPU
lines.append('q') # Close the connection and quit lines.append('q') # Close the connection and quit
log.dbg('JLink commander script:') self.logger.debug('JLink commander script:')
log.dbg('\n'.join(lines)) self.logger.debug('\n'.join(lines))
# Don't use NamedTemporaryFile: the resulting file can't be # Don't use NamedTemporaryFile: the resulting file can't be
# opened again on Windows. # opened again on Windows.
@ -179,5 +178,5 @@ class JLinkBinaryRunner(ZephyrBinaryRunner):
'-CommanderScript', fname] + '-CommanderScript', fname] +
self.tool_opt) self.tool_opt)
log.inf('Flashing Target Device') self.logger.info('Flashing Target Device')
self.check_call(cmd) self.check_call(cmd)

View file

@ -4,7 +4,6 @@
'''Runner for NIOS II, based on quartus-flash.py and GDB.''' '''Runner for NIOS II, based on quartus-flash.py and GDB.'''
from west import log
from runners.core import ZephyrBinaryRunner, NetworkPortHelper from runners.core import ZephyrBinaryRunner, NetworkPortHelper
@ -65,7 +64,8 @@ class Nios2BinaryRunner(ZephyrBinaryRunner):
self.check_call(cmd) self.check_call(cmd)
def print_gdbserver_message(self, gdb_port): def print_gdbserver_message(self, gdb_port):
log.inf('Nios II GDB server running on port {}'.format(gdb_port)) self.logger.info('Nios II GDB server running on port {}'.
format(gdb_port))
def debug_debugserver(self, command, **kwargs): def debug_debugserver(self, command, **kwargs):
# Per comments in the shell script, the NIOSII GDB server # Per comments in the shell script, the NIOSII GDB server

View file

@ -7,8 +7,6 @@
import sys import sys
from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps from runners.core import ZephyrBinaryRunner, RunnerCaps
@ -46,8 +44,14 @@ class NrfJprogBinaryRunner(ZephyrBinaryRunner):
@classmethod @classmethod
def create(cls, cfg, args): def create(cls, cfg, args):
return NrfJprogBinaryRunner(cfg, args.nrf_family, args.softreset, ret = NrfJprogBinaryRunner(cfg, args.nrf_family, args.softreset,
args.snr, erase=args.erase) args.snr, erase=args.erase)
ret.ensure_snr()
return ret
def ensure_snr(self):
if not self.snr:
self.snr = self.get_board_snr_from_user()
def get_board_snr_from_user(self): def get_board_snr_from_user(self):
snrs = self.check_output(['nrfjprog', '--ids']) snrs = self.check_output(['nrfjprog', '--ids'])
@ -63,9 +67,6 @@ class NrfJprogBinaryRunner(ZephyrBinaryRunner):
'is a debugger already connected?') 'is a debugger already connected?')
return board_snr return board_snr
log.dbg("Refusing the temptation to guess a board",
level=log.VERBOSE_EXTREME)
# Use of print() here is advised. We don't want to lose # Use of print() here is advised. We don't want to lose
# this information in a separate log -- this is # this information in a separate log -- this is
# interactive and requires a terminal. # interactive and requires a terminal.
@ -91,13 +92,13 @@ class NrfJprogBinaryRunner(ZephyrBinaryRunner):
commands = [] commands = []
if self.snr is None: if self.snr is None:
board_snr = self.get_board_snr_from_user() raise ValueError("self.snr must not be None")
else: else:
board_snr = self.snr.lstrip("0") board_snr = self.snr.lstrip("0")
program_cmd = ['nrfjprog', '--program', self.hex_, '-f', self.family, program_cmd = ['nrfjprog', '--program', self.hex_, '-f', self.family,
'--snr', board_snr] '--snr', board_snr]
print('Flashing file: {}'.format(self.hex_)) self.logger.info('Flashing file: {}'.format(self.hex_))
if self.erase: if self.erase:
commands.extend([ commands.extend([
['nrfjprog', ['nrfjprog',
@ -129,5 +130,5 @@ class NrfJprogBinaryRunner(ZephyrBinaryRunner):
for cmd in commands: for cmd in commands:
self.check_call(cmd) self.check_call(cmd)
log.inf('Board with serial number {} flashed successfully.'.format( self.logger.info('Board with serial number {} flashed successfully.'.
board_snr)) format(board_snr))

View file

@ -6,8 +6,6 @@
import os import os
from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps, \ from runners.core import ZephyrBinaryRunner, RunnerCaps, \
BuildConfiguration BuildConfiguration
@ -84,19 +82,10 @@ class PyOcdBinaryRunner(ZephyrBinaryRunner):
@classmethod @classmethod
def create(cls, cfg, args): def create(cls, cfg, args):
daparg = os.environ.get('PYOCD_DAPARG')
if daparg:
log.wrn('Setting PYOCD_DAPARG in the environment is',
'deprecated; use the --daparg option instead.')
if args.daparg is None:
log.dbg('Missing --daparg set to {} from environment'.format(
daparg), level=log.VERBOSE_VERY)
args.daparg = daparg
build_conf = BuildConfiguration(cfg.build_dir) build_conf = BuildConfiguration(cfg.build_dir)
flash_addr = cls.get_flash_address(args, build_conf) flash_addr = cls.get_flash_address(args, build_conf)
return PyOcdBinaryRunner( ret = PyOcdBinaryRunner(
cfg, args.target, cfg, args.target,
pyocd=args.pyocd, pyocd=args.pyocd,
flash_addr=flash_addr, flash_opts=args.flash_opt, flash_addr=flash_addr, flash_opts=args.flash_opt,
@ -104,6 +93,14 @@ class PyOcdBinaryRunner(ZephyrBinaryRunner):
board_id=args.board_id, daparg=args.daparg, board_id=args.board_id, daparg=args.daparg,
frequency=args.frequency) frequency=args.frequency)
daparg = os.environ.get('PYOCD_DAPARG')
if not ret.daparg_args and daparg:
ret.logger.warning('PYOCD_DAPARG is deprecated; use --daparg')
ret.logger.debug('--daparg={} via PYOCD_DAPARG'.format(daparg))
ret.daparg_args = ['-da', daparg]
return ret
def port_args(self): def port_args(self):
return ['-p', str(self.gdb_port)] return ['-p', str(self.gdb_port)]
@ -135,11 +132,12 @@ class PyOcdBinaryRunner(ZephyrBinaryRunner):
self.flash_extra + self.flash_extra +
[fname]) [fname])
log.inf('Flashing Target Device') self.logger.info('Flashing Target Device')
self.check_call(cmd) self.check_call(cmd)
def print_gdbserver_message(self): def log_gdbserver_message(self):
log.inf('pyOCD GDB server running on port {}'.format(self.gdb_port)) self.logger.info('pyOCD GDB server running on port {}'.
format(self.gdb_port))
def debug_debugserver(self, command, **kwargs): def debug_debugserver(self, command, **kwargs):
server_cmd = ([self.pyocd] + server_cmd = ([self.pyocd] +
@ -151,7 +149,7 @@ class PyOcdBinaryRunner(ZephyrBinaryRunner):
self.frequency_args) self.frequency_args)
if command == 'debugserver': if command == 'debugserver':
self.print_gdbserver_message() self.log_gdbserver_message()
self.check_call(server_cmd) self.check_call(server_cmd)
else: else:
if self.gdb_cmd is None: if self.gdb_cmd is None:
@ -168,5 +166,5 @@ class PyOcdBinaryRunner(ZephyrBinaryRunner):
'-ex', 'load'] '-ex', 'load']
self.require(client_cmd[0]) self.require(client_cmd[0])
self.print_gdbserver_message() self.log_gdbserver_message()
self.run_server_and_client(server_cmd, client_cmd) self.run_server_and_client(server_cmd, client_cmd)

View file

@ -210,14 +210,15 @@ def test_nrfjprog_init(cc, get_snr, req, test_case, runner_config):
runner = NrfJprogBinaryRunner(runner_config, family, softreset, snr, runner = NrfJprogBinaryRunner(runner_config, family, softreset, snr,
erase=erase) erase=erase)
runner.run('flash')
assert req.called
assert cc.call_args_list == [call(x) for x in
expected_commands(*test_case)]
if snr is None: if snr is None:
get_snr.assert_called_once_with() with pytest.raises(ValueError) as e:
runner.run('flash')
assert 'snr must not be None' in str(e)
else: else:
runner.run('flash')
assert req.called
assert cc.call_args_list == [call(x) for x in
expected_commands(*test_case)]
get_snr.assert_not_called() get_snr.assert_not_called()