scripts: west: add CANopen flash runner
Add west flash runner for program download via CANopen. Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
This commit is contained in:
parent
ed9f2fc28a
commit
3c2984d4a9
8 changed files with 389 additions and 2 deletions
2
.github/workflows/doc-build.yml
vendored
2
.github/workflows/doc-build.yml
vendored
|
@ -33,7 +33,7 @@ jobs:
|
|||
pip3 install 'breathe>=4.9.1,<4.15.0' 'docutils>=0.14' \
|
||||
'sphinx>=1.7.5,<3.0' sphinx_rtd_theme sphinx-tabs \
|
||||
sphinxcontrib-svg2pdfconverter 'west>=0.6.2'
|
||||
pip3 install pyelftools
|
||||
pip3 install pyelftools canopen progress
|
||||
|
||||
- name: west setup
|
||||
run: |
|
||||
|
|
2
.github/workflows/west_cmds.yml
vendored
2
.github/workflows/west_cmds.yml
vendored
|
@ -54,7 +54,7 @@ jobs:
|
|||
${{ runner.os }}-pip-${{ matrix.python-version }}
|
||||
- name: install pytest
|
||||
run: |
|
||||
pip3 install pytest west pyelftools
|
||||
pip3 install pytest west pyelftools canopen progress
|
||||
- name: run pytest-win
|
||||
if: runner.os == 'Windows'
|
||||
run: |
|
||||
|
|
4
boards/common/canopen.board.cmake
Normal file
4
boards/common/canopen.board.cmake
Normal file
|
@ -0,0 +1,4 @@
|
|||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
board_set_flasher_ifnset(canopen)
|
||||
board_finalize_runner_args(canopen)
|
|
@ -11,7 +11,9 @@ pyelftools>=0.24
|
|||
PyYAML>=5.1
|
||||
|
||||
# used by west_commands
|
||||
canopen
|
||||
packaging
|
||||
progress
|
||||
|
||||
# intelhex used by mergehex.py
|
||||
intelhex
|
||||
|
|
|
@ -13,6 +13,7 @@ from runners.core import ZephyrBinaryRunner, MissingProgram
|
|||
# Keep this list sorted by runner name.
|
||||
from runners import blackmagicprobe
|
||||
from runners import bossac
|
||||
from runners import canopen_program
|
||||
from runners import dediprog
|
||||
from runners import dfu
|
||||
from runners import esp32
|
||||
|
|
274
scripts/west_commands/runners/canopen_program.py
Normal file
274
scripts/west_commands/runners/canopen_program.py
Normal file
|
@ -0,0 +1,274 @@
|
|||
# Copyright (c) 2020 Vestas Wind Systems A/S
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
'''Runner for performing program download over CANopen (DSP 302-3).'''
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import canopen
|
||||
from progress.bar import Bar
|
||||
|
||||
from runners.core import ZephyrBinaryRunner, RunnerCaps
|
||||
|
||||
# Default Python-CAN context to use, see python-can documentation for details
|
||||
DEFAULT_CAN_CONTEXT = 'default'
|
||||
|
||||
# Object dictionary indexes
|
||||
H1F50_PROGRAM_DATA = 0x1F50
|
||||
H1F51_PROGRAM_CTRL = 0x1F51
|
||||
H1F56_PROGRAM_SWID = 0x1F56
|
||||
H1F57_FLASH_STATUS = 0x1F57
|
||||
|
||||
# Program control commands
|
||||
PROGRAM_CTRL_STOP = 0x00
|
||||
PROGRAM_CTRL_START = 0x01
|
||||
PROGRAM_CTRL_RESET = 0x02
|
||||
PROGRAM_CTRL_CLEAR = 0x03
|
||||
PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
|
||||
|
||||
class ToggleAction(argparse.Action):
|
||||
'''Toggle argument parser'''
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
setattr(namespace, self.dest, not option_string.startswith('--no-'))
|
||||
|
||||
class CANopenBinaryRunner(ZephyrBinaryRunner):
|
||||
'''Runner front-end for CANopen.'''
|
||||
def __init__(self, cfg, node_id, can_context=DEFAULT_CAN_CONTEXT,
|
||||
program_number=1, confirm=True,
|
||||
confirm_only=True, timeout=10):
|
||||
super(CANopenBinaryRunner, self).__init__(cfg)
|
||||
self.bin_file = cfg.bin_file
|
||||
self.confirm = confirm
|
||||
self.confirm_only = confirm_only
|
||||
self.timeout = timeout
|
||||
self.downloader = CANopenProgramDownloader(logger=self.logger,
|
||||
node_id=node_id,
|
||||
can_context=can_context,
|
||||
program_number=program_number)
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return 'canopen'
|
||||
|
||||
@classmethod
|
||||
def capabilities(cls):
|
||||
return RunnerCaps(commands={'flash'}, flash_addr=False)
|
||||
|
||||
@classmethod
|
||||
def do_add_parser(cls, parser):
|
||||
# Required:
|
||||
parser.add_argument('--node-id', required=True, help='Node ID')
|
||||
|
||||
# Optional:
|
||||
parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
|
||||
help='Custom Python-CAN context to use')
|
||||
parser.add_argument('--program-number', default=1,
|
||||
help='program number, default is 1')
|
||||
parser.add_argument('--confirm', '--no-confirm',
|
||||
dest='confirm', nargs=0,
|
||||
action=ToggleAction,
|
||||
help='confirm after starting? (default: yes)')
|
||||
parser.add_argument('--confirm-only', default=False, action='store_true',
|
||||
help='confirm only, no program download (default: no)')
|
||||
parser.add_argument('--timeout', default=10,
|
||||
help='boot-up timeout, default is 10 seconds')
|
||||
|
||||
parser.set_defaults(confirm=True)
|
||||
|
||||
@classmethod
|
||||
def create(cls, cfg, args):
|
||||
return CANopenBinaryRunner(cfg, int(args.node_id),
|
||||
can_context=args.can_context,
|
||||
program_number=int(args.program_number),
|
||||
confirm=args.confirm,
|
||||
confirm_only=args.confirm_only,
|
||||
timeout=int(args.timeout))
|
||||
|
||||
def do_run(self, command, **kwargs):
|
||||
if command == 'flash':
|
||||
self.flash(**kwargs)
|
||||
|
||||
def flash(self, **kwargs):
|
||||
'''Download program to flash over CANopen'''
|
||||
self.logger.info('Using Node ID %d, program number %d',
|
||||
self.downloader.node_id,
|
||||
self.downloader.program_number)
|
||||
|
||||
self.downloader.connect()
|
||||
status = self.downloader.flash_status()
|
||||
if status == 0:
|
||||
self.downloader.swid()
|
||||
else:
|
||||
self.logger.warning('Flash status 0x{:02x}, '
|
||||
'skipping software identification'.format(status))
|
||||
|
||||
self.downloader.enter_pre_operational()
|
||||
|
||||
if self.confirm_only:
|
||||
self.downloader.zephyr_confirm_program()
|
||||
self.downloader.disconnect()
|
||||
return
|
||||
|
||||
if self.bin_file is None:
|
||||
raise ValueError('Cannot download program; bin_file is missing')
|
||||
|
||||
self.downloader.stop_program()
|
||||
self.downloader.clear_program()
|
||||
self.downloader.download(self.bin_file)
|
||||
|
||||
status = self.downloader.flash_status()
|
||||
if status != 0:
|
||||
raise ValueError('Program download failed: '
|
||||
'flash status 0x{:02x}'.format(status))
|
||||
|
||||
self.downloader.start_program()
|
||||
self.downloader.wait_for_bootup(self.timeout)
|
||||
self.downloader.swid()
|
||||
|
||||
if self.confirm:
|
||||
self.downloader.enter_pre_operational()
|
||||
self.downloader.zephyr_confirm_program()
|
||||
|
||||
self.downloader.disconnect()
|
||||
|
||||
class CANopenProgramDownloader(object):
|
||||
'''CANopen program downloader'''
|
||||
def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
|
||||
program_number=1):
|
||||
super(CANopenProgramDownloader, self).__init__()
|
||||
self.logger = logger
|
||||
self.node_id = node_id
|
||||
self.can_context = can_context
|
||||
self.program_number = program_number
|
||||
self.network = canopen.Network()
|
||||
self.node = self.network.add_node(self.node_id,
|
||||
self.create_object_dictionary())
|
||||
self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
|
||||
self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
|
||||
self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
|
||||
self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
|
||||
|
||||
def connect(self):
|
||||
'''Connect to CAN network'''
|
||||
try:
|
||||
self.network.connect(context=self.can_context)
|
||||
except:
|
||||
raise ValueError('Unable to connect to CAN network')
|
||||
|
||||
def disconnect(self):
|
||||
'''Disconnect from CAN network'''
|
||||
self.network.disconnect()
|
||||
|
||||
def enter_pre_operational(self):
|
||||
'''Enter pre-operational NMT state'''
|
||||
self.logger.info("Entering pre-operational mode")
|
||||
try:
|
||||
self.node.nmt.state = 'PRE-OPERATIONAL'
|
||||
except:
|
||||
raise ValueError('Failed to enter pre-operational mode')
|
||||
|
||||
def _ctrl_program(self, cmd):
|
||||
'''Write program control command to CANopen object dictionary (0x1f51)'''
|
||||
try:
|
||||
self.ctrl_sdo.raw = cmd
|
||||
except:
|
||||
raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
|
||||
|
||||
def stop_program(self):
|
||||
'''Write stop control command to CANopen object dictionary (0x1f51)'''
|
||||
self.logger.info('Stopping program')
|
||||
self._ctrl_program(PROGRAM_CTRL_STOP)
|
||||
|
||||
def start_program(self):
|
||||
'''Write start control command to CANopen object dictionary (0x1f51)'''
|
||||
self.logger.info('Starting program')
|
||||
self._ctrl_program(PROGRAM_CTRL_START)
|
||||
|
||||
def clear_program(self):
|
||||
'''Write clear control command to CANopen object dictionary (0x1f51)'''
|
||||
self.logger.info('Clearing program')
|
||||
self._ctrl_program(PROGRAM_CTRL_CLEAR)
|
||||
|
||||
def zephyr_confirm_program(self):
|
||||
'''Write confirm control command to CANopen object dictionary (0x1f51)'''
|
||||
self.logger.info('Confirming program')
|
||||
self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
|
||||
|
||||
def swid(self):
|
||||
'''Read software identification from CANopen object dictionary (0x1f56)'''
|
||||
try:
|
||||
swid = self.swid_sdo.raw
|
||||
except:
|
||||
raise ValueError('Failed to read software identification')
|
||||
self.logger.info('Program software identification: 0x{:08x}'.format(swid))
|
||||
return swid
|
||||
|
||||
def flash_status(self):
|
||||
'''Read flash status identification'''
|
||||
try:
|
||||
status = self.flash_sdo.raw
|
||||
except:
|
||||
raise ValueError('Failed to read flash status identification')
|
||||
return status
|
||||
|
||||
def download(self, bin_file):
|
||||
'''Download program to CANopen object dictionary (0x1f50)'''
|
||||
self.logger.info('Downloading program: %s', bin_file)
|
||||
try:
|
||||
size = os.path.getsize(bin_file)
|
||||
infile = open(bin_file, 'rb')
|
||||
outfile = self.data_sdo.open('wb', size=size)
|
||||
|
||||
progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
|
||||
while True:
|
||||
chunk = infile.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
outfile.write(chunk)
|
||||
progress.next(n=len(chunk))
|
||||
progress.finish()
|
||||
infile.close()
|
||||
outfile.close()
|
||||
except:
|
||||
raise ValueError('Failed to download program')
|
||||
|
||||
def wait_for_bootup(self, timeout=10):
|
||||
'''Wait for boot-up message reception'''
|
||||
self.logger.info('Waiting for boot-up message...')
|
||||
try:
|
||||
self.node.nmt.wait_for_bootup(timeout=timeout)
|
||||
except:
|
||||
raise ValueError('Timeout waiting for boot-up message')
|
||||
|
||||
@staticmethod
|
||||
def create_object_dictionary():
|
||||
'''Create a synthetic CANopen object dictionary for program download'''
|
||||
objdict = canopen.objectdictionary.ObjectDictionary()
|
||||
|
||||
array = canopen.objectdictionary.Array('Program data', 0x1f50)
|
||||
member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
|
||||
member.data_type = canopen.objectdictionary.DOMAIN
|
||||
array.add_member(member)
|
||||
objdict.add_object(array)
|
||||
|
||||
array = canopen.objectdictionary.Array('Program control', 0x1f51)
|
||||
member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
|
||||
member.data_type = canopen.objectdictionary.UNSIGNED8
|
||||
array.add_member(member)
|
||||
objdict.add_object(array)
|
||||
|
||||
array = canopen.objectdictionary.Array('Program sofware ID', 0x1f56)
|
||||
member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
|
||||
member.data_type = canopen.objectdictionary.UNSIGNED32
|
||||
array.add_member(member)
|
||||
objdict.add_object(array)
|
||||
|
||||
array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
|
||||
member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
|
||||
member.data_type = canopen.objectdictionary.UNSIGNED32
|
||||
array.add_member(member)
|
||||
objdict.add_object(array)
|
||||
|
||||
return objdict
|
105
scripts/west_commands/tests/test_canopen_program.py
Normal file
105
scripts/west_commands/tests/test_canopen_program.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
# Copyright (c) 2020 Vestas Wind Systems A/S
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from unittest.mock import patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
from runners.canopen_program import CANopenBinaryRunner
|
||||
from conftest import RC_KERNEL_BIN
|
||||
|
||||
#
|
||||
# Test values
|
||||
#
|
||||
|
||||
TEST_DEF_CONTEXT = 'default'
|
||||
TEST_ALT_CONTEXT = 'alternate'
|
||||
|
||||
#
|
||||
# Test cases
|
||||
#
|
||||
|
||||
TEST_CASES = [(n, x, p, c, o, t)
|
||||
for n in range(1, 3)
|
||||
for x in (None, TEST_ALT_CONTEXT)
|
||||
for p in range(1, 3)
|
||||
for c in (False, True)
|
||||
for o in (False, True)
|
||||
for t in range(1, 3)]
|
||||
|
||||
def os_path_isfile_patch(filename):
|
||||
if filename == RC_KERNEL_BIN:
|
||||
return True
|
||||
return os.path.isfile(filename)
|
||||
|
||||
@pytest.mark.parametrize('test_case', TEST_CASES)
|
||||
@patch('runners.canopen_program.CANopenProgramDownloader')
|
||||
def test_canopen_program_create(cpd, test_case, runner_config):
|
||||
'''Test CANopen runner created from command line parameters.'''
|
||||
node_id, context, program_number, confirm, confirm_only, timeout = test_case
|
||||
|
||||
args = ['--node-id', str(node_id)]
|
||||
if context is not None:
|
||||
args.extend(['--can-context', context])
|
||||
if program_number:
|
||||
args.extend(['--program-number', str(program_number)])
|
||||
if not confirm:
|
||||
args.append('--no-confirm')
|
||||
if confirm_only:
|
||||
args.append('--confirm-only')
|
||||
if timeout:
|
||||
args.extend(['--timeout', str(timeout)])
|
||||
|
||||
mock = cpd.return_value
|
||||
mock.flash_status.return_value = 0
|
||||
mock.swid.return_value = 0
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
CANopenBinaryRunner.add_parser(parser)
|
||||
arg_namespace = parser.parse_args(args)
|
||||
runner = CANopenBinaryRunner.create(runner_config, arg_namespace)
|
||||
with patch('os.path.isfile', side_effect=os_path_isfile_patch):
|
||||
runner.run('flash')
|
||||
|
||||
cpd.assert_called_once()
|
||||
if context:
|
||||
assert cpd.call_args == call(node_id=node_id,
|
||||
can_context=context,
|
||||
logger=runner.logger,
|
||||
program_number=program_number)
|
||||
else:
|
||||
assert cpd.call_args == call(node_id=node_id,
|
||||
can_context=TEST_DEF_CONTEXT,
|
||||
logger=runner.logger,
|
||||
program_number=program_number)
|
||||
|
||||
mock.connect.assert_called_once()
|
||||
|
||||
if confirm_only:
|
||||
mock.flash_status.assert_called_once()
|
||||
mock.swid.assert_called_once()
|
||||
mock.enter_pre_operational.assert_called_once()
|
||||
mock.zephyr_confirm_program.assert_called_once()
|
||||
mock.clear_program.assert_not_called()
|
||||
mock.stop_program.assert_not_called()
|
||||
mock.download.assert_not_called()
|
||||
mock.start_program.assert_not_called()
|
||||
mock.wait_for_bootup.assert_not_called()
|
||||
else:
|
||||
mock.enter_pre_operational.assert_called()
|
||||
mock.flash_status.assert_called()
|
||||
mock.swid.assert_called()
|
||||
mock.stop_program.assert_called_once()
|
||||
mock.clear_program.assert_called_once()
|
||||
mock.download.assert_called_once_with(RC_KERNEL_BIN)
|
||||
mock.start_program.assert_called_once()
|
||||
mock.wait_for_bootup.assert_called_once_with(timeout)
|
||||
if confirm:
|
||||
mock.zephyr_confirm_program.assert_called_once()
|
||||
else:
|
||||
mock.zephyr_confirm_program.assert_not_called()
|
||||
|
||||
mock.disconnect.assert_called_once()
|
|
@ -17,6 +17,7 @@ def test_runner_imports():
|
|||
expected = set(('arc-nsim',
|
||||
'blackmagicprobe',
|
||||
'bossac',
|
||||
'canopen',
|
||||
'dediprog',
|
||||
'dfu-util',
|
||||
'esp32',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue