diff --git a/scripts/pylib/twister/twisterlib/environment.py b/scripts/pylib/twister/twisterlib/environment.py index adf31a4e499..e8bc0d9f54b 100644 --- a/scripts/pylib/twister/twisterlib/environment.py +++ b/scripts/pylib/twister/twisterlib/environment.py @@ -863,6 +863,9 @@ def parse_arguments(parser, args, options = None): return options +def strip_ansi_sequences(s: str) -> str: + """Remove ANSI escape sequences from a string.""" + return re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', "", s) class TwisterEnv: @@ -967,8 +970,7 @@ class TwisterEnv: # for instance if twister is executed from inside a makefile. In such a # scenario it is then necessary to remove them, as otherwise the JSON decoding # will fail. - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - out = ansi_escape.sub('', out.decode()) + out = strip_ansi_sequences(out.decode()) if p.returncode == 0: msg = "Finished running %s" % (args[0]) diff --git a/scripts/pylib/twister/twisterlib/handlers.py b/scripts/pylib/twister/twisterlib/handlers.py index ed0ee9003b1..ae283c1722e 100755 --- a/scripts/pylib/twister/twisterlib/handlers.py +++ b/scripts/pylib/twister/twisterlib/handlers.py @@ -20,7 +20,7 @@ import threading import time from queue import Queue, Empty -from twisterlib.environment import ZEPHYR_BASE +from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences from twisterlib.error import TwisterException sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers")) from domains import Domains @@ -216,7 +216,7 @@ class BinaryHandler(Handler): else: stripped_line = line_decoded.rstrip() logger.debug("OUTPUT: %s", stripped_line) - log_out_fp.write(line_decoded) + log_out_fp.write(strip_ansi_sequences(line_decoded)) log_out_fp.flush() harness.handle(stripped_line) if harness.state: @@ -426,7 +426,7 @@ class DeviceHandler(Handler): sl = serial_line.decode('utf-8', 'ignore').lstrip() logger.debug("DEVICE: {0}".format(sl.rstrip())) - log_out_fp.write(sl.encode('utf-8')) + log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8')) log_out_fp.flush() harness.handle(sl.rstrip()) @@ -921,7 +921,7 @@ class QEMUHandler(Handler): continue # line contains a full line of data output from QEMU - log_out_fp.write(line) + log_out_fp.write(strip_ansi_sequences(line)) log_out_fp.flush() line = line.rstrip() logger.debug(f"QEMU ({pid}): {line}")