scripts: Fix twisterlib for ruff - SIM115

This fixes ruff linting error SIM115,
where files were opened without the use of
a context handler.

Signed-off-by: Lukasz Mrugala <lukaszx.mrugala@intel.com>
This commit is contained in:
Lukasz Mrugala 2024-11-27 15:18:08 +00:00 committed by Carles Cufí
commit 22a875e490
4 changed files with 196 additions and 308 deletions

View file

@ -748,7 +748,6 @@
]
"./scripts/pylib/twister/expr_parser.py" = [
"SIM103", # https://docs.astral.sh/ruff/rules/needless-bool
"SIM115", # https://docs.astral.sh/ruff/rules/open-file-with-context-handler
"UP031", # https://docs.astral.sh/ruff/rules/printf-string-formatting
]
"./scripts/pylib/twister/scl.py" = [
@ -778,7 +777,6 @@
"./scripts/pylib/twister/twisterlib/handlers.py" = [
"E501", # https://docs.astral.sh/ruff/rules/line-too-long
"F541", # https://docs.astral.sh/ruff/rules/f-string-missing-placeholders
"SIM115", # https://docs.astral.sh/ruff/rules/open-file-with-context-handler
"UP007", # https://docs.astral.sh/ruff/rules/non-pep604-annotation
"UP030", # https://docs.astral.sh/ruff/rules/format-literals
"UP031", # https://docs.astral.sh/ruff/rules/printf-string-formatting
@ -812,7 +810,6 @@
"./scripts/pylib/twister/twisterlib/runner.py" = [
"E501", # https://docs.astral.sh/ruff/rules/line-too-long
"F541", # https://docs.astral.sh/ruff/rules/f-string-missing-placeholders
"SIM115", # https://docs.astral.sh/ruff/rules/open-file-with-context-handler
"UP031", # https://docs.astral.sh/ruff/rules/printf-string-formatting
"UP032", # https://docs.astral.sh/ruff/rules/f-string
]

View file

@ -189,7 +189,8 @@ class BinaryHandler(Handler):
def try_kill_process_by_pid(self):
if self.pid_fn:
pid = int(open(self.pid_fn).read())
with open(self.pid_fn) as pid_file:
pid = int(pid_file.read())
os.unlink(self.pid_fn)
self.pid_fn = None # clear so we don't try to kill the binary twice
with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
@ -396,8 +397,6 @@ class DeviceHandler(Handler):
return timeout
def monitor_serial(self, ser, halt_event, harness):
log_out_fp = open(self.log, "wb")
if self.options.enable_coverage:
# Set capture_coverage to True to indicate that right after
# test results we should get coverage data, otherwise we exit
@ -411,57 +410,51 @@ class DeviceHandler(Handler):
# Clear serial leftover.
ser.reset_input_buffer()
while ser.isOpen():
if halt_event.is_set():
logger.debug('halted')
ser.close()
break
with open(self.log, "wb") as log_out_fp:
while ser.isOpen():
if halt_event.is_set():
logger.debug('halted')
ser.close()
break
try:
if not ser.in_waiting:
# no incoming bytes are waiting to be read from
# the serial input buffer, let other threads run
try:
if not ser.in_waiting:
# no incoming bytes are waiting to be read from
# the serial input buffer, let other threads run
time.sleep(0.001)
continue
# maybe the serial port is still in reset
# check status may cause error
# wait for more time
except OSError:
time.sleep(0.001)
continue
# maybe the serial port is still in reset
# check status may cause error
# wait for more time
except OSError:
time.sleep(0.001)
continue
except TypeError:
# This exception happens if the serial port was closed and
# its file descriptor cleared in between of ser.isOpen()
# and ser.in_waiting checks.
logger.debug("Serial port is already closed, stop reading.")
break
except TypeError:
# This exception happens if the serial port was closed and
# its file descriptor cleared in between of ser.isOpen()
# and ser.in_waiting checks.
logger.debug("Serial port is already closed, stop reading.")
break
serial_line = None
try:
serial_line = ser.readline()
except TypeError:
pass
# ignore SerialException which may happen during the serial device
# power off/on process.
except serial.SerialException:
pass
serial_line = None
# SerialException may happen during the serial device power off/on process.
with contextlib.suppress(TypeError, serial.SerialException):
serial_line = ser.readline()
# Just because ser_fileno has data doesn't mean an entire line
# is available yet.
if serial_line:
# can be more lines in serial_line so split them before handling
for sl in serial_line.decode('utf-8', 'ignore').splitlines(keepends=True):
log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8'))
log_out_fp.flush()
if sl := sl.strip():
logger.debug("DEVICE: {0}".format(sl))
harness.handle(sl)
# Just because ser_fileno has data doesn't mean an entire line
# is available yet.
if serial_line:
# can be more lines in serial_line so split them before handling
for sl in serial_line.decode('utf-8', 'ignore').splitlines(keepends=True):
log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8'))
log_out_fp.flush()
if sl := sl.strip():
logger.debug(f"DEVICE: {sl}")
harness.handle(sl)
if harness.status != TwisterStatus.NONE and not harness.capture_coverage:
ser.close()
break
log_out_fp.close()
if harness.status != TwisterStatus.NONE and not harness.capture_coverage:
ser.close()
break
@staticmethod
@contextmanager
@ -878,42 +871,6 @@ class QEMUHandler(Handler):
return fifo_in, fifo_out
@staticmethod
def _thread_open_files(fifo_in, fifo_out, logfile):
# These in/out nodes are named from QEMU's perspective, not ours
if os.path.exists(fifo_in):
os.unlink(fifo_in)
os.mkfifo(fifo_in)
if os.path.exists(fifo_out):
os.unlink(fifo_out)
os.mkfifo(fifo_out)
# We don't do anything with out_fp but we need to open it for
# writing so that QEMU doesn't block, due to the way pipes work
out_fp = open(fifo_in, "wb")
# Disable internal buffering, we don't
# want read() or poll() to ever block if there is data in there
in_fp = open(fifo_out, "rb", buffering=0)
log_out_fp = open(logfile, "w")
return out_fp, in_fp, log_out_fp
@staticmethod
def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp):
log_out_fp.close()
out_fp.close()
in_fp.close()
if pid:
try:
if pid:
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, psutil.NoSuchProcess):
# Oh well, as long as it's dead! User probably sent Ctrl-C
pass
os.unlink(fifo_in)
os.unlink(fifo_out)
@staticmethod
def _thread_update_instance_info(handler, handler_time, status, reason):
handler.instance.execution_time = handler_time
@ -928,108 +885,130 @@ class QEMUHandler(Handler):
harness, ignore_unexpected_eof=False):
fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files(
fifo_in,
fifo_out,
logfile
)
# These in/out nodes are named from QEMU's perspective, not ours
if os.path.exists(fifo_in):
os.unlink(fifo_in)
os.mkfifo(fifo_in)
if os.path.exists(fifo_out):
os.unlink(fifo_out)
os.mkfifo(fifo_out)
start_time = time.time()
timeout_time = start_time + timeout
p = select.poll()
p.register(in_fp, select.POLLIN)
_status = TwisterStatus.NONE
_reason = None
with (
# We don't do anything with out_fp but we need to open it for
# writing so that QEMU doesn't block, due to the way pipes work
open(fifo_in, "wb") as _,
# Disable internal buffering, we don't
# want read() or poll() to ever block if there is data in there
open(fifo_out, "rb", buffering=0) as in_fp,
open(logfile, "w") as log_out_fp
):
start_time = time.time()
timeout_time = start_time + timeout
p = select.poll()
p.register(in_fp, select.POLLIN)
_status = TwisterStatus.NONE
_reason = None
line = ""
timeout_extended = False
line = ""
timeout_extended = False
pid = 0
if os.path.exists(pid_fn):
pid = int(open(pid_fn).read())
pid = 0
if os.path.exists(pid_fn):
with open(pid_fn) as pid_file:
pid = int(pid_file.read())
while True:
this_timeout = int((timeout_time - time.time()) * 1000)
if timeout_extended:
# Quit early after timeout extension if no more data is being received
this_timeout = min(this_timeout, 1000)
if this_timeout < 0 or not p.poll(this_timeout):
try:
if pid and this_timeout > 0:
# there's possibility we polled nothing because
# of not enough CPU time scheduled by host for
# QEMU process during p.poll(this_timeout)
cpu_time = QEMUHandler._get_cpu_time(pid)
if cpu_time < timeout and _status == TwisterStatus.NONE:
timeout_time = time.time() + (timeout - cpu_time)
continue
except psutil.NoSuchProcess:
pass
except ProcessLookupError:
_status = TwisterStatus.FAIL
_reason = "Execution error"
while True:
this_timeout = int((timeout_time - time.time()) * 1000)
if timeout_extended:
# Quit early after timeout extension if no more data is being received
this_timeout = min(this_timeout, 1000)
if this_timeout < 0 or not p.poll(this_timeout):
try:
if pid and this_timeout > 0:
# there's possibility we polled nothing because
# of not enough CPU time scheduled by host for
# QEMU process during p.poll(this_timeout)
cpu_time = QEMUHandler._get_cpu_time(pid)
if cpu_time < timeout and _status == TwisterStatus.NONE:
timeout_time = time.time() + (timeout - cpu_time)
continue
except psutil.NoSuchProcess:
pass
except ProcessLookupError:
_status = TwisterStatus.FAIL
_reason = "Execution error"
break
if _status == TwisterStatus.NONE:
_status = TwisterStatus.FAIL
_reason = "timeout"
break
if _status == TwisterStatus.NONE:
if pid == 0 and os.path.exists(pid_fn):
with open(pid_fn) as pid_file:
pid = int(pid_file.read())
try:
c = in_fp.read(1).decode("utf-8")
except UnicodeDecodeError:
# Test is writing something weird, fail
_status = TwisterStatus.FAIL
_reason = "timeout"
break
_reason = "unexpected byte"
break
if pid == 0 and os.path.exists(pid_fn):
pid = int(open(pid_fn).read())
if c == "":
# EOF, this shouldn't happen unless QEMU crashes
if not ignore_unexpected_eof:
_status = TwisterStatus.FAIL
_reason = "unexpected eof"
break
line = line + c
if c != "\n":
continue
try:
c = in_fp.read(1).decode("utf-8")
except UnicodeDecodeError:
# Test is writing something weird, fail
_status = TwisterStatus.FAIL
_reason = "unexpected byte"
break
# line contains a full line of data output from QEMU
log_out_fp.write(strip_ansi_sequences(line))
log_out_fp.flush()
line = line.rstrip()
logger.debug(f"QEMU ({pid}): {line}")
if c == "":
# EOF, this shouldn't happen unless QEMU crashes
if not ignore_unexpected_eof:
_status = TwisterStatus.FAIL
_reason = "unexpected eof"
break
line = line + c
if c != "\n":
continue
harness.handle(line)
if harness.status != TwisterStatus.NONE:
# if we have registered a fail make sure the status is not
# overridden by a false success message coming from the
# testsuite
if _status != TwisterStatus.FAIL:
_status = harness.status
_reason = harness.reason
# line contains a full line of data output from QEMU
log_out_fp.write(strip_ansi_sequences(line))
log_out_fp.flush()
line = line.rstrip()
logger.debug(f"QEMU ({pid}): {line}")
# if we get some status, that means test is doing well, we reset
# the timeout and wait for 2 more seconds to catch anything
# printed late. We wait much longer if code
# coverage is enabled since dumping this information can
# take some time.
if not timeout_extended or harness.capture_coverage:
timeout_extended = True
if harness.capture_coverage:
timeout_time = time.time() + 30
else:
timeout_time = time.time() + 2
line = ""
harness.handle(line)
if harness.status != TwisterStatus.NONE:
# if we have registered a fail make sure the status is not
# overridden by a false success message coming from the
# testsuite
if _status != TwisterStatus.FAIL:
_status = harness.status
_reason = harness.reason
handler_time = time.time() - start_time
logger.debug(
f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds"
)
# if we get some status, that means test is doing well, we reset
# the timeout and wait for 2 more seconds to catch anything
# printed late. We wait much longer if code
# coverage is enabled since dumping this information can
# take some time.
if not timeout_extended or harness.capture_coverage:
timeout_extended = True
if harness.capture_coverage:
timeout_time = time.time() + 30
else:
timeout_time = time.time() + 2
line = ""
QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
handler_time = time.time() - start_time
logger.debug(f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds")
if pid:
# Oh well, as long as it's dead! User probably sent Ctrl-C
with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
if pid:
os.kill(pid, signal.SIGTERM)
QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp)
os.unlink(fifo_in)
os.unlink(fifo_out)
def _set_qemu_filenames(self, sysbuild_build_dir):
# We pass this to QEMU which looks for fifos with .in and .out suffixes.
@ -1105,7 +1084,8 @@ class QEMUHandler(Handler):
self.returncode = proc.returncode
else:
if os.path.exists(self.pid_fn):
qemu_pid = int(open(self.pid_fn).read())
with open(self.pid_fn) as pid_file:
qemu_pid = int(pid_file.read())
logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
self.returncode = proc.returncode
# Need to wait for harness to finish processing
@ -1116,7 +1096,8 @@ class QEMUHandler(Handler):
logger.debug("Timed out while monitoring QEMU output")
if os.path.exists(self.pid_fn):
qemu_pid = int(open(self.pid_fn).read())
with open(self.pid_fn) as pid_file:
qemu_pid = int(pid_file.read())
os.unlink(self.pid_fn)
logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
@ -1281,8 +1262,11 @@ class QEMUWinHandler(Handler):
if self.pid == 0 and os.path.exists(pid_fn):
# pid file probably not contains pid yet, continue
with contextlib.suppress(ValueError):
self.pid = int(open(pid_fn).read())
with (
contextlib.suppress(ValueError),
open(pid_fn) as pid_file
):
self.pid = int(pid_file.read())
try:
c = queue.get_nowait()

View file

@ -1121,33 +1121,37 @@ class ProjectBuilder(FilterBuilder):
yaml_testsuite_name = self.instance.testsuite.id
logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}")
elf_file = self.instance.get_elf_file()
elf = ELFFile(open(elf_file, "rb"))
logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.")
new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
detected_cases = []
for section in elf.iter_sections():
if isinstance(section, SymbolTableSection):
for sym in section.iter_symbols():
# It is only meant for new ztest fx because only new ztest fx exposes test functions
# precisely.
m_ = new_ztest_unit_test_regex.search(sym.name)
if not m_:
continue
# Demangle C++ symbols
m_ = new_ztest_unit_test_regex.search(self.demangle(sym.name))
if not m_:
continue
# The 1st capture group is new ztest suite name.
# The 2nd capture group is new ztest unit test name.
new_ztest_suite = m_[1]
if new_ztest_suite not in self.instance.testsuite.ztest_suite_names:
logger.warning(f"Unexpected Ztest suite '{new_ztest_suite}' "
f"not present in: {self.instance.testsuite.ztest_suite_names}")
test_func_name = m_[2].replace("test_", "", 1)
testcase_id = f"{yaml_testsuite_name}.{new_ztest_suite}.{test_func_name}"
detected_cases.append(testcase_id)
elf_file = self.instance.get_elf_file()
with open(elf_file, "rb") as elf_fp:
elf = ELFFile(elf_fp)
for section in elf.iter_sections():
if isinstance(section, SymbolTableSection):
for sym in section.iter_symbols():
# It is only meant for new ztest fx
# because only new ztest fx exposes test functions precisely.
m_ = new_ztest_unit_test_regex.search(sym.name)
if not m_:
continue
# Demangle C++ symbols
m_ = new_ztest_unit_test_regex.search(self.demangle(sym.name))
if not m_:
continue
# The 1st capture group is new ztest suite name.
# The 2nd capture group is new ztest unit test name.
new_ztest_suite = m_[1]
if new_ztest_suite not in self.instance.testsuite.ztest_suite_names:
logger.warning(
f"Unexpected Ztest suite '{new_ztest_suite}' "
f"not present in: {self.instance.testsuite.ztest_suite_names}"
)
test_func_name = m_[2].replace("test_", "", 1)
testcase_id = f"{yaml_testsuite_name}.{new_ztest_suite}.{test_func_name}"
detected_cases.append(testcase_id)
if detected_cases:
logger.debug(f"Detected Ztest cases: [{', '.join(detected_cases)}] in {elf_file}")

View file

@ -1740,98 +1740,6 @@ def test_qemuhandler_thread_get_fifo_names():
assert fifo_in == 'dummy.in'
assert fifo_out == 'dummy.out'
TESTDATA_22 = [
(False, False),
(False, True),
(True, False),
(True, True),
]
@pytest.mark.parametrize(
'fifo_in_exists, fifo_out_exists',
TESTDATA_22,
ids=['both missing', 'out exists', 'in exists', 'both exist']
)
def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists):
def mock_exists(path):
if path == 'fifo.in':
return fifo_in_exists
elif path == 'fifo.out':
return fifo_out_exists
else:
raise ValueError('Unexpected path in mock of os.path.exists')
unlink_mock = mock.Mock()
exists_mock = mock.Mock(side_effect=mock_exists)
mkfifo_mock = mock.Mock()
fifo_in = 'fifo.in'
fifo_out = 'fifo.out'
logfile = 'log.file'
with mock.patch('os.unlink', unlink_mock), \
mock.patch('os.mkfifo', mkfifo_mock), \
mock.patch('os.path.exists', exists_mock), \
mock.patch('builtins.open', mock.mock_open()) as open_mock:
_, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile)
open_mock.assert_has_calls([
mock.call('fifo.in', 'wb'),
mock.call('fifo.out', 'rb', buffering=0),
mock.call('log.file', 'w'),
])
if fifo_in_exists:
unlink_mock.assert_any_call('fifo.in')
if fifo_out_exists:
unlink_mock.assert_any_call('fifo.out')
TESTDATA_23 = [
(False, False),
(True, True),
(True, False)
]
@pytest.mark.parametrize(
'is_pid, is_lookup_error',
TESTDATA_23,
ids=['pid missing', 'pid lookup error', 'pid ok']
)
def test_qemuhandler_thread_close_files(is_pid, is_lookup_error):
is_process_killed = {}
def mock_kill(pid, sig):
if is_lookup_error:
raise ProcessLookupError(f'Couldn\'t find pid: {pid}.')
elif sig == signal.SIGTERM:
is_process_killed[pid] = True
unlink_mock = mock.Mock()
kill_mock = mock.Mock(side_effect=mock_kill)
fifo_in = 'fifo.in'
fifo_out = 'fifo.out'
pid = 12345 if is_pid else None
out_fp = mock.Mock()
in_fp = mock.Mock()
log_out_fp = mock.Mock()
with mock.patch('os.unlink', unlink_mock), \
mock.patch('os.kill', kill_mock):
QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp,
in_fp, log_out_fp)
out_fp.close.assert_called_once()
in_fp.close.assert_called_once()
log_out_fp.close.assert_called_once()
unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')])
if is_pid and not is_lookup_error:
assert is_process_killed[pid]
TESTDATA_24 = [
(TwisterStatus.FAIL, 'timeout', TwisterStatus.FAIL, 'timeout'),
@ -1983,6 +1891,8 @@ def test_qemuhandler_thread(
handler.pid_fn = 'pid_fn'
handler.fifo_fn = 'fifo_fn'
file_objs = {}
def mocked_open(filename, *args, **kwargs):
if filename == handler.pid_fn:
contents = str(pid).encode('utf-8')
@ -1993,6 +1903,9 @@ def test_qemuhandler_thread(
file_object = mock.mock_open(read_data=contents).return_value
file_object.__iter__.return_value = contents.splitlines(True)
if file_object not in file_objs:
file_objs[filename] = file_object
return file_object
harness = mock.Mock(capture_coverage=capture_coverage, handle=print)
@ -2006,27 +1919,19 @@ def test_qemuhandler_thread(
mock_thread_get_fifo_names = mock.Mock(
return_value=('fifo_fn.in', 'fifo_fn.out')
)
log_fp_mock = mock.Mock()
in_fp_mock = mocked_open('fifo_fn.out')
out_fp_mock = mock.Mock()
mock_thread_open_files = mock.Mock(
return_value=(out_fp_mock, in_fp_mock, log_fp_mock)
)
mock_thread_close_files = mock.Mock()
mock_thread_update_instance_info = mock.Mock()
with mock.patch('time.time', side_effect=faux_timer.time), \
mock.patch('builtins.open', new=mocked_open), \
mock.patch('select.poll', return_value=p), \
mock.patch('os.path.exists', return_value=True), \
mock.patch('os.unlink', mock.Mock()), \
mock.patch('os.mkfifo', mock.Mock()), \
mock.patch('os.kill', mock.Mock()), \
mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time',
mock_cputime), \
mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names',
mock_thread_get_fifo_names), \
mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files',
mock_thread_open_files), \
mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files',
mock_thread_close_files), \
mock.patch('twisterlib.handlers.QEMUHandler.' \
'_thread_update_instance_info',
mock_thread_update_instance_info):
@ -2041,8 +1946,6 @@ def test_qemuhandler_thread(
handler.ignore_unexpected_eof
)
print(mock_thread_update_instance_info.call_args_list)
mock_thread_update_instance_info.assert_called_once_with(
handler,
mock.ANY,
@ -2050,7 +1953,7 @@ def test_qemuhandler_thread(
mock.ANY
)
log_fp_mock.write.assert_has_calls(expected_log_calls)
file_objs[handler.log].write.assert_has_calls(expected_log_calls)
TESTDATA_26 = [