twister: fix serial connection for flash_before

The --flash-before flag allows devices to be flashed before the serial
connection is established. However, the implementation was incomplete and
only worked if the port address already existed at the start of the run.
This is incompatible with devices that implement the USB in software
(eg: USB-CDC).

This commit fixes the implementation to delay setting up the connection
until after the device is flashed, and to retry the connection for two
seconds after flashing to give the device time to enumerate as a USB
device.

Signed-off-by: Mike Szczys <mike@golioth.io>
This commit is contained in:
Mike Szczys 2024-05-19 22:28:15 -05:00 committed by Anas Nashif
commit 7eaca455fa
5 changed files with 40 additions and 5 deletions

View file

@ -14,6 +14,7 @@ import threading
import time
from datetime import datetime
from pathlib import Path
from serial import SerialException
from twister_harness.exceptions import (
TwisterHarnessException,
@ -69,14 +70,23 @@ class DeviceAdapter(abc.ABC):
if self.device_config.type != 'hardware':
self._flash_and_run()
self._device_run.set()
self._start_reader_thread()
self.connect()
return
self._device_run.set()
self._start_reader_thread()
self.connect()
if self.device_config.type == 'hardware':
if self.device_config.flash_before:
# For hardware devices with shared USB or software USB, connect after flashing.
# Retry for up to 10 seconds for USB-CDC based devices to enumerate.
self._flash_and_run()
self.connect(retry_s = 10)
else:
# On hardware, flash after connecting to COM port, otherwise some messages
# from target can be lost.
self.connect()
self._flash_and_run()
def close(self) -> None:
@ -89,7 +99,7 @@ class DeviceAdapter(abc.ABC):
self._device_run.clear()
self._join_reader_thread()
def connect(self) -> None:
def connect(self, retry_s: int = 0) -> None:
"""Connect to device - allow for output gathering."""
if self.is_device_connected():
logger.debug('Device already connected')
@ -98,7 +108,20 @@ class DeviceAdapter(abc.ABC):
msg = 'Cannot connect to not working device'
logger.error(msg)
raise TwisterHarnessException(msg)
self._connect_device()
if retry_s > 0:
retry_cycles = retry_s * 10
for i in range(retry_cycles):
try:
self._connect_device()
break
except SerialException:
if i == retry_cycles - 1:
raise
time.sleep(0.1)
else:
self._connect_device()
self._device_connected.set()
def disconnect(self) -> None:

View file

@ -84,6 +84,13 @@ def pytest_addoption(parser: pytest.Parser):
'--device-serial-pty',
help='Script for controlling pseudoterminal.'
)
twister_harness_group.addoption(
'--flash-before',
type=bool,
help='Flash device before attaching to serial port'
'This is useful for devices that share the same port for programming'
'and serial console, or use soft-USB, where flash must come first.'
)
twister_harness_group.addoption(
'--west-flash-extra-args',
help='Extend parameters for west flash. '

View file

@ -26,6 +26,7 @@ class DeviceConfig:
id: str = ''
product: str = ''
serial_pty: str = ''
flash_before: bool = False
west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
name: str = ''
pre_script: Path | None = None
@ -62,6 +63,7 @@ class TwisterHarnessConfig:
id=config.option.device_id,
product=config.option.device_product,
serial_pty=config.option.device_serial_pty,
flash_before=bool(config.option.flash_before),
west_flash_extra_args=west_flash_extra_args,
pre_script=_cast_to_path(config.option.pre_script),
post_script=_cast_to_path(config.option.post_script),

View file

@ -178,7 +178,7 @@ Artificially long but functional example:
parser.add_argument("--flash-before", action="store_true", default=False,
help="""Flash device before attaching to serial port.
This is useful for devices that share the same port for programming
and serial console, where flash must come first.
and serial console, or use soft-USB, where flash must come first.
""")
test_or_build.add_argument(

View file

@ -408,6 +408,9 @@ class Pytest(Harness):
if hardware.post_script:
command.append(f'--post-script={hardware.post_script}')
if hardware.flash_before:
command.append(f'--flash-before={hardware.flash_before}')
return command
def run_command(self, cmd, timeout):