twister: pytest: Parse report file to get testcases

Extended parsing of report.xml file - produced by pytest-twister-harness
plugin. Now testcases are properly extracted and added to twister
report. Added unit tests.

Signed-off-by: Grzegorz Chwierut <grzegorz.chwierut@nordicsemi.no>
This commit is contained in:
Grzegorz Chwierut 2023-06-12 18:15:31 +02:00 committed by Anas Nashif
commit 4045bab1b7
5 changed files with 226 additions and 66 deletions

View file

@ -40,6 +40,7 @@ logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native"]
SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
def terminate_process(proc):

View file

@ -1,4 +1,5 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from asyncio.log import logger
import platform
import re
@ -13,7 +14,8 @@ import threading
import time
from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
from twisterlib.handlers import terminate_process
from twisterlib.handlers import Handler, terminate_process, SUPPORTED_SIMS_IN_PYTEST
from twisterlib.testinstance import TestInstance
logger = logging.getLogger('twister')
@ -21,8 +23,6 @@ logger.setLevel(logging.DEBUG)
_WINDOWS = platform.system() == 'Windows'
SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
# pylint: disable=anomalous-backslash-in-string
result_re = re.compile(".*(PASS|FAIL|SKIP) - (test_)?(.*) in (\\d*[.,]?\\d*) seconds")
@ -49,7 +49,6 @@ class Harness:
self.matches = OrderedDict()
self.ordered = True
self.repeat = 1
self.testcases = []
self.id = None
self.fail_on_fault = True
self.fault = False
@ -63,7 +62,7 @@ class Harness:
self.run_id = None
self.matched_run_id = False
self.run_id_exists = False
self.instance = None
self.instance: TestInstance | None = None
self.testcase_output = ""
self._match = False
@ -224,7 +223,7 @@ class PytestHarnessException(Exception):
class Pytest(Harness):
def configure(self, instance):
def configure(self, instance: TestInstance):
super(Pytest, self).configure(instance)
self.running_dir = instance.build_dir
self.source_dir = instance.testsuite.source_dir
@ -234,16 +233,15 @@ class Pytest(Harness):
def pytest_run(self, timeout):
try:
cmd = self.generate_command()
if not cmd:
logger.error('Pytest command not generated, check logs')
return
self.run_command(cmd, timeout)
except PytestHarnessException as pytest_exception:
logger.error(str(pytest_exception))
self.state = 'failed'
self.instance.reason = str(pytest_exception)
finally:
if self.reserved_serial:
self.instance.handler.make_device_available(self.reserved_serial)
self._apply_instance_status()
self._update_test_status()
def generate_command(self):
config = self.instance.testsuite.harness_config
@ -260,7 +258,7 @@ class Pytest(Harness):
]
command.extend(pytest_args)
handler = self.instance.handler
handler: Handler = self.instance.handler
if handler.options.verbose > 1:
command.append('--log-level=DEBUG')
@ -277,7 +275,7 @@ class Pytest(Harness):
raise PytestHarnessException(f'Handling of handler {handler.type_str} not implemented yet')
return command
def _generate_parameters_for_hardware(self, handler):
def _generate_parameters_for_hardware(self, handler: Handler):
command = ['--device-type=hardware']
hardware = handler.get_hardware()
if not hardware:
@ -318,53 +316,26 @@ class Pytest(Harness):
def run_command(self, cmd, timeout):
cmd, env = self._update_command_with_env_dependencies(cmd)
with subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env) as proc:
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env
) as proc:
try:
reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
reader_t.start()
reader_t.join(timeout)
if reader_t.is_alive():
terminate_process(proc)
logger.warning('Timeout has occurred.')
logger.warning('Timeout has occurred. Can be extended in testspec file. '
f'Currently set to {timeout} seconds.')
self.instance.reason = 'Pytest timeout'
self.state = 'failed'
proc.wait(timeout)
if self.state != 'failed':
tree = ET.parse(self.report_file)
root = tree.getroot()
for child in root:
if child.tag == 'testsuite':
if child.attrib['failures'] != '0':
self.state = "failed"
elif child.attrib['skipped'] != '0':
self.state = "skipped"
elif child.attrib['errors'] != '0':
self.state = "error"
else:
self.state = "passed"
self.instance.execution_time = float(child.attrib['time'])
except subprocess.TimeoutExpired:
self.state = 'failed'
proc.kill()
self.state = "failed"
except ET.ParseError:
self.state = "failed"
except IOError:
logger.warning("Can't access report.xml")
self.state = "failed"
tc = self.instance.get_case_or_create(self.id)
if self.state == "passed":
tc.status = "passed"
logger.debug("Pytest cases passed")
elif self.state == "skipped":
tc.status = "skipped"
logger.debug("Pytest cases skipped.")
else:
tc.status = "failed"
logger.info("Pytest cases failed.")
@staticmethod
def _update_command_with_env_dependencies(cmd):
@ -397,16 +368,52 @@ class Pytest(Harness):
logger.debug('PYTEST: %s', line)
proc.communicate()
def _apply_instance_status(self):
if self.state:
self.instance.status = self.state
if self.state in ["error", "failed"]:
self.instance.reason = "Pytest failed"
else:
self.instance.status = "failed"
self.instance.reason = "Pytest timeout"
if self.instance.status in ["error", "failed"]:
self.instance.add_missing_case_status("blocked", self.instance.reason)
def _update_test_status(self):
if not self.state:
self.instance.testcases = []
try:
self._parse_report_file(self.report_file)
except Exception as e:
logger.error(f'Error when parsing file {self.report_file}: {e}')
self.state = 'failed'
finally:
if not self.instance.testcases:
self.instance.init_cases()
self.instance.status = self.state or 'failed'
if self.instance.status in ['error', 'failed']:
self.instance.reason = self.instance.reason or 'Pytest failed'
self.instance.add_missing_case_status('blocked', self.instance.reason)
def _parse_report_file(self, report):
tree = ET.parse(report)
root = tree.getroot()
if elem_ts := root.find('testsuite'):
if elem_ts.get('failures') != '0':
self.state = 'failed'
elif elem_ts.get('errors') != '0':
self.state = 'error'
elif elem_ts.get('skipped') == elem_ts.get('tests'):
self.state = 'skipped'
else:
self.state = 'passed'
self.instance.execution_time = float(elem_ts.get('time'))
for elem_tc in elem_ts.findall('testcase'):
tc = self.instance.get_case_or_create(f"{self.id}.{elem_tc.get('name')}")
tc.duration = float(elem_tc.get('time'))
elem = elem_tc.find('*')
if elem is None:
tc.status = 'passed'
else:
if elem.tag == 'skipped':
tc.status = 'skipped'
elif elem.tag == 'failure':
tc.status = 'failed'
else:
tc.status = 'error'
tc.reason = elem.get('message')
tc.output = elem.text
class Gtest(Harness):

View file

@ -3,7 +3,7 @@
# Copyright (c) 2018-2022 Intel Corporation
# Copyright 2022 NXP
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import os
import hashlib
import random
@ -11,11 +11,19 @@ import logging
import shutil
import glob
from twisterlib.testsuite import TestCase
from twisterlib.testsuite import TestCase, TestSuite
from twisterlib.platform import Platform
from twisterlib.error import BuildError
from twisterlib.size_calc import SizeCalculator
from twisterlib.handlers import Handler, SimulationHandler, BinaryHandler, QEMUHandler, DeviceHandler, SUPPORTED_SIMS
from twisterlib.harness import SUPPORTED_SIMS_IN_PYTEST
from twisterlib.handlers import (
Handler,
SimulationHandler,
BinaryHandler,
QEMUHandler,
DeviceHandler,
SUPPORTED_SIMS,
SUPPORTED_SIMS_IN_PYTEST,
)
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
@ -33,8 +41,8 @@ class TestInstance:
def __init__(self, testsuite, platform, outdir):
self.testsuite = testsuite
self.platform = platform
self.testsuite: TestSuite = testsuite
self.platform: Platform = platform
self.status = None
self.reason = "Unknown"
@ -51,7 +59,7 @@ class TestInstance:
self.domains = None
self.run = False
self.testcases = []
self.testcases: list[TestCase] = []
self.init_cases()
self.filters = []
self.filter_type = None

View file

@ -9,6 +9,8 @@ import os
import sys
import pytest
pytest_plugins = ["pytester"]
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts"))

View file

@ -0,0 +1,142 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import pytest
import textwrap
from unittest import mock
from pathlib import Path
from twisterlib.harness import Pytest
from twisterlib.testsuite import TestSuite
from twisterlib.testinstance import TestInstance
from twisterlib.platform import Platform
@pytest.fixture
def testinstance() -> TestInstance:
testsuite = TestSuite('.', 'samples/hello', 'unit.test')
testsuite.harness_config = {}
testsuite.ignore_faults = False
platform = Platform()
testinstance = TestInstance(testsuite, platform, 'outdir')
testinstance.handler = mock.Mock()
testinstance.handler.options = mock.Mock()
testinstance.handler.options.verbose = 1
testinstance.handler.type_str = 'native'
return testinstance
@pytest.mark.parametrize('device_type', ['native', 'qemu'])
def test_pytest_command(testinstance: TestInstance, device_type):
pytest_harness = Pytest()
pytest_harness.configure(testinstance)
testinstance.handler.type_str = device_type
ref_command = [
'pytest',
'samples/hello/pytest',
f'--build-dir={testinstance.build_dir}',
f'--junit-xml={testinstance.build_dir}/report.xml',
f'--device-type={device_type}'
]
command = pytest_harness.generate_command()
for c in ref_command:
assert c in command
def test_if_report_is_parsed(pytester, testinstance: TestInstance):
test_file_content = textwrap.dedent("""
def test_1():
pass
def test_2():
pass
""")
test_file = pytester.path / 'test_valid.py'
test_file.write_text(test_file_content)
report_file = Path('report.xml')
result = pytester.runpytest(
str(test_file),
f'--junit-xml={str(report_file)}'
)
result.assert_outcomes(passed=2)
assert report_file.is_file()
pytest_harness = Pytest()
pytest_harness.configure(testinstance)
pytest_harness.report_file = report_file
pytest_harness._update_test_status()
assert pytest_harness.state == "passed"
assert testinstance.status == "passed"
assert len(testinstance.testcases) == 2
for tc in testinstance.testcases:
assert tc.status == "passed"
def test_if_report_with_error(pytester, testinstance: TestInstance):
test_file_content = textwrap.dedent("""
def test_exp():
raise Exception('Test error')
def test_err():
assert False
""")
test_file = pytester.path / 'test_error.py'
test_file.write_text(test_file_content)
report_file = pytester.path / 'report.xml'
result = pytester.runpytest(
str(test_file),
f'--junit-xml={str(report_file)}'
)
result.assert_outcomes(failed=2)
assert report_file.is_file()
pytest_harness = Pytest()
pytest_harness.configure(testinstance)
pytest_harness.report_file = report_file
pytest_harness._update_test_status()
assert pytest_harness.state == "failed"
assert testinstance.status == "failed"
assert len(testinstance.testcases) == 2
for tc in testinstance.testcases:
assert tc.status == "failed"
assert tc.output
assert tc.reason
def test_if_report_with_skip(pytester, testinstance: TestInstance):
test_file_content = textwrap.dedent("""
import pytest
@pytest.mark.skip('Test skipped')
def test_skip_1():
pass
def test_skip_2():
pytest.skip('Skipped on runtime')
""")
test_file = pytester.path / 'test_skip.py'
test_file.write_text(test_file_content)
report_file = pytester.path / 'report.xml'
result = pytester.runpytest(
str(test_file),
f'--junit-xml={str(report_file)}'
)
result.assert_outcomes(skipped=2)
assert report_file.is_file()
pytest_harness = Pytest()
pytest_harness.configure(testinstance)
pytest_harness.report_file = report_file
pytest_harness._update_test_status()
assert pytest_harness.state == "skipped"
assert testinstance.status == "skipped"
assert len(testinstance.testcases) == 2
for tc in testinstance.testcases:
assert tc.status == "skipped"