Twister: Add power measurements support for Twister.

Add support for powerShield of stm32l562e_dk board.

Signed-off-by: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
This commit is contained in:
Arkadiusz Cholewinski 2025-02-17 16:03:46 +01:00 committed by Benjamin Cabé
commit 35b13e1629
11 changed files with 1006 additions and 1 deletions

View file

@ -0,0 +1,40 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
import string
from abc import ABC, abstractmethod
class PowerMonitor(ABC):
@abstractmethod
def init(self, device_id: string):
"""
Abstract method to initialize the power monitor.
Agr:
string: Address of the power monitor
Return:
bool: True of False.
"""
@abstractmethod
def measure(self, duration: int):
"""
Abstract method to measure current with specified measurement time.
Args:
duration (int): The duration of the measurement in seconds.
"""
@abstractmethod
def get_data(self, duration: int) -> list[float]:
"""
Measure current with specified measurement time.
Args:
duration (int): The duration of the measurement in seconds.
Returns:
List[float]: An array of measured current values in amperes.
"""

View file

@ -0,0 +1,61 @@
# Copyright: (c) 2025, Intel Corporation
import json
import logging
import pytest
from twister_harness import DeviceAdapter
def pytest_addoption(parser):
parser.addoption('--testdata')
@pytest.fixture
def probe_class(request, probe_path):
path = probe_path # Get path of power device
if request.param == 'stm_powershield':
from stm32l562e_dk.PowerShield import PowerShield
probe = PowerShield() # Instantiate the power monitor probe
probe.connect(path)
probe.init()
yield probe
if request.param == 'stm_powershield':
probe.disconnect()
@pytest.fixture(name='probe_path', scope='session')
def fixture_probe_path(request, dut: DeviceAdapter):
for fixture in dut.device_config.fixtures:
if fixture.startswith('pm_probe'):
probe_port = fixture.split(':')[1]
return probe_port
@pytest.fixture(name='test_data', scope='session')
def fixture_test_data(request):
# Get test data from the configuration and parse it into a dictionary
measurements = request.config.getoption("--testdata")
measurements = measurements.replace("'", '"') # Ensure the data is properly formatted as JSON
measurements_dict = json.loads(measurements)
# Data validation
required_keys = [
'elements_to_trim',
'min_peak_distance',
'min_peak_height',
'peak_padding',
'measurement_duration',
'num_of_transitions',
'expected_rms_values',
'tolerance_percentage',
]
for key in required_keys:
if key not in measurements_dict:
logging.error(f"Missing required test data key: {key}")
pytest.fail(f"Missing required test data key: {key}")
return measurements_dict

View file

@ -0,0 +1,484 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
import csv
import logging
import queue
import re
import threading
import time
import utils.UtilityFunctions as UtilityFunctions
from abstract.PowerMonitor import PowerMonitor
from stm32l562e_dk.PowerShieldConfig import PowerShieldConf
from stm32l562e_dk.PowerShieldData import PowerShieldData
from stm32l562e_dk.SerialHandler import SerialHandler
class PowerShield(PowerMonitor):
def __init__(self):
"""
Initializes the PowerShield.
"""
self.handler = None
self.dataQueue = queue.Queue()
self.acqComplete = False
self.acqStart = False
self.target_voltage = None
self.target_temperature = None
self.acqTimeoutThread = None
self.power_shield_conf = PowerShieldConf()
self.power_shield_data = PowerShieldData()
def init(self):
"""
Initializes the power monitor.
"""
self.__take_control()
self.__set_voltage(self.power_shield_conf.target_voltage)
self.__set_format(self.power_shield_conf.data_format)
self.__set_func_mode(self.power_shield_conf.function_mode)
def connect(self, power_device_path: str):
"""Opens the connection using the SerialHandler."""
self.handler = SerialHandler(power_device_path, 3686400)
self.handler.open()
def disconnect(self):
"""Closes the connection using the SerialHandler."""
self.handler.close()
def __send_command(self, command: str, expected_ack: str = None, ack: bool = False) -> str:
"""
Sends a command to the device, retrieves the response,
and optionally verifies the acknowledgment.
:param command: The command to send.
:param expected_ack: The expected acknowledgment response (e.g., "ack htc").
:return: The response received from the device.
"""
if not self.handler.is_open():
logging.info(f"Error: Connection is not open. Cannot send command: {command}")
return ""
logging.debug(f"Sending command: {command}")
self.handler.send_cmd(command)
if ack:
response = self.handler.receive_cmd()
logging.debug(f"Response: {response}")
# Check if the response contains the expected acknowledgment
if expected_ack and expected_ack not in response:
logging.error(f"Error: Expected acknowledgment '{expected_ack}' not found.")
return ""
return response
return 0
def __test_communication(self):
"""
Sends a version command to the device.
"""
if not self.handler.is_open():
logging.error("Error: Connection is not open. Cannot send version command.")
return ""
command = 'version'
logging.info(f"Sending command: {command}")
self.handler.send_cmd(command)
response = self.handler.receive_cmd()
logging.info(f"Response: {response}")
return response
def __reset(self):
"""
Sends the reset command ('PSRST') to the power monitor device,
closes the connection, waits for the reset process to complete,
and repeatedly attempts to reconnect until successful.
"""
command = "psrst"
if not self.handler.is_open():
logging.error("Error: Connection is not open. Cannot reset the device.")
return
logging.info(f"Sending reset command: {command}")
self.handler.send_cmd(command)
# Close the connection
self.handler.close()
self.handler.serial_connection = None
time.sleep(5)
# Attempt to reopen the connection
try:
self.handler.open()
logging.info("Connection reopened after reset.")
except Exception as e:
logging.error(f"Failed to reopen connection after reset: {e}")
def __get_voltage_level(self) -> float:
"""
Sends the 'volt get' command and returns the voltage value as a float.
:return: The voltage level as a float, in volts (V).
"""
command = 'volt get'
response = self.__send_command(command, expected_ack="ack volt get", ack=True)
# If response contains the expected acknowledgment, extract and return the voltage
if response:
parts = response.split()
try:
if len(parts) >= 5:
# Use regex to find a string that matches the pattern, e.g., "3292-03"
match = re.search(r'(\d+)-(\d+)', parts[5])
if match:
# Extract the base (3292) and exponent (03)
base = match.group(1)
exponent = match.group(2)
# Construct the scientific notation string (e.g., 3292e-03)
voltage_str = f"{base}e-{exponent}"
# Convert the string into a float
voltage = float(voltage_str)
# Return the voltage as a float
self.target_voltage = round(voltage, 3)
return self.target_voltage
except ValueError:
logging.error("Error: Could not convert temperature value.")
return float('nan')
else:
logging.error("Error: No response for voltage command.")
return float('nan')
def __get_temperature(self, unit: str = PowerShieldConf.TemperatureUnit.CELSIUS) -> float:
"""
Sends the temperature command and returns the temperature as a float.
:param unit: The unit to request the temperature in, either 'degc' or 'degf'.
:return: The temperature value as a float, in the specified unit (°C or °F).
"""
# Send the temp command with the unit
response = self.__send_command(f"temp {unit}", expected_ack=f"ack temp {unit}", ack=True)
# If response contains the expected acknowledgment, extract the temperature
if response:
try:
# Example response format: "PowerShield > ack temp degc 28.0"
parts = response.split()
if len(parts) >= 5 and parts[5].replace('.', '', 1).isdigit():
# Extract temperature and convert to float
self.target_temetarute = float(parts[5])
logging.info(f"Temperature: {self.target_temetarute} {unit}")
return self.target_temetarute
else:
print("Error: Temperature value not found in response.")
return None
except ValueError:
logging.error("Error: Could not convert temperature value.")
return None
else:
logging.error("Error: No response for temp command.")
return None
def __take_control(self) -> str:
"""
Sends the 'htc' command and verifies the acknowledgment.
:return: The acknowledgment response or error message.
"""
return self.__send_command("htc", expected_ack="ack htc", ack=True)
def __set_format(self, data_format: str = PowerShieldConf.DataFormat.ASCII_DEC):
"""
Sets the measurement data format.
The format can be either ASCII (decimal) or Binary (hexadecimal).
:param data_format: The data format to set.
Options are 'ascii_dec' or 'bin_hexa'.
:return: None
"""
# Validate the input format
if data_format not in vars(PowerShieldConf.DataFormat).values():
logging.error(
f"Error: Invalid format '{data_format}'. "
"Valid options are 'ascii_dec' or 'bin_hexa'."
)
return
command = f"format {data_format}"
response = self.__send_command(command, expected_ack=f"ack format {data_format}", ack=True)
# If response contains the expected acknowledgment, the format was set successfully
if response:
logging.info(f"Data format set to {data_format}.")
else:
logging.error(f"Error: Failed to set data format to {data_format}.")
def __set_frequency(self, frequency: enumerate):
"""
Sets the sampling frequency for the measurement.
The frequency can be any valid value from the list.
:param frequency: The sampling frequency to set.
Valid options include:
{100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1}.
:return: None
"""
# Validate the input frequency
if frequency not in vars(PowerShieldConf.SamplingFrequency).values():
logging.error(
f"Error: Invalid frequency '{frequency}'."
"Valid options are:"
"100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1."
)
return
command = f"freq {frequency}"
response = self.__send_command(command, expected_ack=f"ack freq {frequency}", ack=True)
if response:
logging.info(f"Sampling frequency set to {frequency}.")
else:
logging.error(f"Error: Failed to set sampling frequency to {frequency}.")
def __set_acquisition_time(self, acquisition_time: str = '0'):
command = f"acqtime {acquisition_time}"
response = self.__send_command(
command, expected_ack=f"ack acqtime {acquisition_time}", ack=True
)
if response:
logging.info(f"Acquisition time set to {acquisition_time}.")
else:
logging.error(f"Error: Failed to set acquisition time to {acquisition_time}.")
def __set_voltage(self, voltage: enumerate):
command = f"volt {voltage}"
response = self.__send_command(command, expected_ack=f"ack volt {voltage}", ack=True)
if response:
logging.info(f"Voltage set to {voltage}.")
else:
logging.error(f"Error: Failed to set voltage to {voltage}.")
def __set_func_mode(self, function_mode: str = PowerShieldConf.FunctionMode.HIGH):
"""
Sets the acquisition mode for current measurement.
The function_mode can be either 'optim' or 'high'.
- 'optim': Priority on current resolution (100 nA - 10 mA) with max freq at 100 kHz.
- 'high': High current (30 µA - 10 mA), high frequency (50-100 kHz), high resolution.
:param mode: The acquisition mode. Must be either 'optim' or 'high'.
:return: None
"""
# Validate the input format
if function_mode not in vars(PowerShieldConf.FunctionMode).values():
logging.error(
f"Error: Invalid format '{function_mode}'."
"Valid options are 'ascii_dec' or 'bin_hexa'."
)
return
command = f"funcmode {function_mode}"
response = self.__send_command(
command, expected_ack=f"ack funcmode {function_mode}", ack=True
)
if response:
logging.info(f"Data format set to {function_mode}.")
else:
logging.error(f"Error: Failed to set data format to {function_mode}.")
def __acq_data(self):
"""
Continuously reads data from the serial port and puts it
into a queue until acquisition is complete.
"""
logging.info("Started data acquisition...")
while True:
# Read the first byte
first_byte = self.handler.read_bytes(1)
if len(first_byte) < 1 or self.acqComplete: # Exit conditions
logging.info("Stopping data acquisition...")
return
# Check if it's metadata
if first_byte == b'\xf0': # Metadata marker
second_byte = self.handler.read_bytes(1)
# Handle metadata types
metadata_type = second_byte[0]
self.__handle_metadata(metadata_type)
else:
# Not metadata, treat as data
if self.acqStart:
second_byte = self.handler.read_bytes(1)
data = []
data.append(first_byte)
if len(second_byte) < 1 or self.acqComplete:
logging.info("Stopping data acquisition...")
return
data.append(second_byte)
amps = UtilityFunctions.convert_to_amps(
UtilityFunctions.bytes_to_twobyte_values(data)
)
self.dataQueue.put([amps])
def __handle_metadata(self, metadata_type):
if metadata_type == 0xF1:
logging.info("Received Metadata: ASCII error message.")
# self.handle_metadata_error()
elif metadata_type == 0xF2:
logging.info("Received Metadata: ASCII information message.")
# self.handle_metadata_info()
elif metadata_type == 0xF3:
logging.info("Received Metadata: Timestamp message.")
self.__handle_metadata_timestamp()
self.acqStart = True
elif metadata_type == 0xF4:
logging.info("Received Metadata: End of acquisition tag.")
self.__handle_metadata_end()
self.__handle_summary()
elif metadata_type == 0xF5:
logging.info("Received Metadata: Overcurrent detected.")
# self.handle_metadata_overcurrent()
else:
logging.error(f"Error: Unknown Metadata Type: {metadata_type:#04x}")
def __handle_summary(self):
s = ""
while True:
# Read the first byte
x = self.handler.read_bytes(1)
if len(x) < 1 or x == 0xF0:
self.acqComplete = True
return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n")
s += str(x, encoding='ascii', errors='ignore')
def __handle_metadata_end(self):
"""
Handle metadata end of acquisition message.
"""
# Read the next 2 bytes
metadata_bytes = self.handler.read_bytes(2)
if len(metadata_bytes) < 2:
logging.error("Error: Incomplete end of acquisition metadata reveived.")
return
# Check for end tags (last 2 bytes)
end_tag_1 = metadata_bytes[0]
end_tag_2 = metadata_bytes[1]
if end_tag_1 != 0xFF or end_tag_2 != 0xFF:
logging.error("Error: Invalid metadata end tags received.")
return
def __handle_metadata_timestamp(self):
"""
Handle metadata timestamp message. Parses and displays the timestamp and buffer load.
"""
# Read the next 7 bytes (timestamp + buffer load + end tags)
metadata_bytes = self.handler.read_bytes(7)
if len(metadata_bytes) < 7:
logging.error("Error: Incomplete timestamp metadata received.")
return
# Parse the timestamp (4 bytes, big-endian)
timestamp_ms = int.from_bytes(metadata_bytes[0:4], byteorder='big', signed=False)
# Parse the buffer Tx load value (1 byte)
buffer_load = metadata_bytes[4]
# Check for end tags (last 2 bytes)
end_tag_1 = metadata_bytes[5]
end_tag_2 = metadata_bytes[6]
if end_tag_1 != 0xFF or end_tag_2 != 0xFF:
logging.error("Error: Invalid metadata end tags received.")
return
# Display parsed values
logging.info(f"Metadata Timestamp: {timestamp_ms} ms")
logging.info(f"Buffer Tx Load: {buffer_load}%")
def __start_measurement(self):
"""
Starts the measurement by sending the 'start' command. Once the measurement starts,
data can be received continuously until the 'stop' command is sent.
:return: None
"""
command = "start"
self.acqComplete = False
self.__send_command(command)
raw_to_file_Thread = threading.Thread(
target=self.__raw_to_file, args=(self.power_shield_conf.output_file,)
)
raw_to_file_Thread.start()
logging.info("Measurement started. Receiving data...")
self.__acq_data()
raw_to_file_Thread.join()
def __raw_to_file(self, outputFilePath: str):
# Open a CSV file for writing
with open(outputFilePath, 'w', newline='') as outputFile:
writer = csv.writer(outputFile)
while True:
if self.dataQueue.empty() and bool(self.acqComplete):
outputFile.close()
break
if not self.dataQueue.empty():
data = self.dataQueue.get()
writer.writerow(data)
outputFile.flush()
else:
time.sleep(0.1)
def measure(self, time: int, freq: str = None, reset: bool = False):
self.power_shield_conf.acquisition_time = time
_time, self.power_shield_conf.acquisition_time_unit = UtilityFunctions.convert_acq_time(
time
)
if reset:
self.__reset()
self.__take_control()
self.__set_format(self.power_shield_conf.data_format)
if freq is not None:
self.__set_frequency(freq)
else:
self.__set_frequency(self.power_shield_conf.sampling_frequency)
self.__set_acquisition_time(
UtilityFunctions.convert_to_scientific_notation(
time=_time, unit=self.power_shield_conf.acquisition_time_unit
)
)
self.__start_measurement()
def get_data(self, unit: str = PowerShieldConf.MeasureUnit.RAW_DATA):
if self.acqComplete:
# Open the CSV file
with open(self.power_shield_conf.output_file) as file:
csv_reader = csv.reader(file)
for row in csv_reader:
self.power_shield_data.data.append(row[0])
if unit == PowerShieldConf.MeasureUnit.CURRENT_RMS:
self.power_shield_data.current_RMS = UtilityFunctions.calculate_rms(
self.power_shield_data.data
)
return self.power_shield_data.current_RMS
elif unit == PowerShieldConf.MeasureUnit.POWER:
_delta_time = self.power_shield_conf.acquisition_time
self.power_shield_data.power = 0
for data in self.power_shield_data.data:
self.power_shield_data.power += float(
float(data) * float(_delta_time) * float(self.target_voltage)
)
return self.power_shield_data.power
elif unit == PowerShieldConf.MeasureUnit.RAW_DATA:
return self.power_shield_data.data
else:
logging.error("Error: Unknown unit of requested data")
else:
logging.info("Acquisition not complete.")
return None

View file

@ -0,0 +1,79 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
from dataclasses import dataclass, field
@dataclass
class PowerShieldConf:
class PowerMode:
"""
Class representing power mode
"""
AUTO = "auto" # Power-on when acquisition start
ON = "on" # Power-on manually
OFF = "off" # Power-off manually
class MeasureUnit:
"""
Class representing measure units.
"""
VOLTAGE = "voltage" # Target Volatege
CURRENT_RMS = "current_rms" # Current RMS value
POWER = "power" # Total power consumption
RAW_DATA = "rawdata" # Get Raw Data (current probes)
class TemperatureUnit:
"""
Class representing temperature units.
"""
CELSIUS = "degc" # Celsius temperature unit
FAHRENHEIT = "degf" # Fahrenheit temperature unit
class FunctionMode:
"""
Class representing functional modes of a power monitor.
"""
OPTIM = "optim" # Optimized mode for lower power or efficiency
HIGH = "high" # High performance mode
class DataFormat:
"""
Class representing different data formats for representation.
"""
ASCII_DEC = "ascii_dec" # ASCII encoded decimal format
BIN_HEXA = "bin_hexa" # Binary/hexadecimal format
class SamplingFrequency:
"""
Class representing various sampling frequencies.
"""
FREQ_100K = "100k" # 100 kHz frequency
FREQ_50K = "50k" # 50 kHz frequency
FREQ_20K = "20k" # 20 kHz frequency
FREQ_10K = "10k" # 10 kHz frequency
FREQ_5K = "5k" # 5 kHz frequency
FREQ_2K = "2k" # 2 kHz frequency
FREQ_1K = "1k" # 1 kHz frequency
FREQ_500 = "500" # 500 Hz frequency
FREQ_200 = "200" # 200 Hz frequency
FREQ_100 = "100" # 100 Hz frequency
FREQ_50 = "50" # 50 Hz frequency
FREQ_20 = "20" # 20 Hz frequency
FREQ_10 = "10" # 10 Hz frequency
FREQ_5 = "5" # 5 Hz frequency
FREQ_2 = "2" # 2 Hz frequency
FREQ_1 = "1" # 1 Hz frequency
output_file: str = "rawData.csv"
sampling_frequency: str = SamplingFrequency.FREQ_1K
data_format: str = DataFormat.BIN_HEXA
function_mode: str = FunctionMode.HIGH
target_voltage: str = "3300m"
temperature_unit: str = TemperatureUnit.CELSIUS
acquisition_time: str = field(default=None, init=False)

View file

@ -0,0 +1,9 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
class PowerShieldData:
def __init__(self):
self.data = []
self.current_RMS = None
self.power = None

View file

@ -0,0 +1,80 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
import logging
import re
import serial
class SerialHandler:
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
"""
Initializes the class for serial communication.
:param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0').
:param baudrate: The baud rate for the connection (default is 9600).
:param timeout: The timeout for read operations in seconds (default is 1.0).
"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial_connection = None
def open(self):
"""
Opens the serial connection.
"""
if self.serial_connection is None:
try:
self.serial_connection = serial.Serial(
self.port, self.baudrate, timeout=self.timeout
)
logging.info(
"Connection to %s at %d baud opened successfully.", self.port, self.baudrate
)
except serial.SerialException as e:
logging.error("Error opening serial port %s: %s", self.port, str(e))
self.serial_connection = None
raise
def close(self):
"""Closes the serial connection."""
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
logging.info("Serial connection closed.")
def send_cmd(self, cmd: str):
"""
Sends a command to the serial device with a newline, and prints it.
:param cmd: The command to be sent.
"""
if self.serial_connection and self.serial_connection.is_open:
try:
self.serial_connection.write((cmd + "\r\n").encode('ascii'))
except serial.SerialException as e:
logging.error(f"Error sending command: {e}")
def read_bytes(self, count: int):
if self.serial_connection:
x = self.serial_connection.read(count)
return x
def receive_cmd(self) -> str:
"""
Reads data from the serial device until no more data is available.
:return: The processed received data as a string.
"""
output = ""
if self.serial_connection and self.serial_connection.is_open:
while True:
x = self.serial_connection.read()
if len(x) < 1 or x == 0xF0:
return re.sub(r'(\0|\r|\n\n\n)', '', output).strip()
output += str(x, encoding='ascii', errors='ignore')
def is_open(self) -> bool:
"""Checks if the connection is open."""
return self.serial_connection and self.serial_connection.is_open

View file

@ -0,0 +1,82 @@
# Copyright: (c) 2024, Intel Corporation
import logging
import pytest
from abstract.PowerMonitor import PowerMonitor
from twister_harness import DeviceAdapter
from utils.UtilityFunctions import current_RMS
logger = logging.getLogger(__name__)
@pytest.mark.parametrize("probe_class", ['stm_powershield'], indirect=True)
def test_power_harness(probe_class: PowerMonitor, test_data, request, dut: DeviceAdapter):
"""
This test measures and validates the RMS current values from the power monitor
and compares them against expected RMS values.
Arguments:
probe_class -- The Abstract class of the power monitor.
test_data -- Fixture to prepare data.
request -- Request object that provides access to test configuration.
dut -- The Device Under Test (DUT) that the power monitor is measuring.
"""
# Initialize the probe with the provided path
probe = probe_class # Instantiate the power monitor probe
# Get test data
measurements_dict = test_data
# Start the measurement process with the provided duration
probe.measure(time=measurements_dict['measurement_duration'])
# Retrieve the measured data
data = probe.get_data()
# Calculate the RMS current values using utility functions
rms_values_measured = current_RMS(
data,
trim=measurements_dict['elements_to_trim'],
num_peaks=measurements_dict['num_of_transitions'],
peak_distance=measurements_dict['min_peak_distance'],
peak_height=measurements_dict['min_peak_height'],
padding=measurements_dict['peak_padding'],
)
# # Convert measured values from amps to milliamps for comparison
rms_values_in_milliamps = [value * 1e4 for value in rms_values_measured]
# # Log the calculated values in milliamps for debugging purposes
logger.debug(f"Measured RMS values in mA: {rms_values_in_milliamps}")
tuples = zip(measurements_dict['expected_rms_values'], rms_values_in_milliamps, strict=False)
for expected_rms_value, measured_rms_value in tuples:
assert is_within_tolerance(
measured_rms_value, expected_rms_value, measurements_dict['tolerance_percentage']
)
def is_within_tolerance(measured_rms_value, expected_rms_value, tolerance_percentage) -> bool:
"""
Checks if the measured RMS value is within the acceptable tolerance.
Arguments:
measured_rms_value -- The measured RMS current value in milliamps.
expected_rms_value -- The expected RMS current value in milliamps.
tolerance_percentage -- The allowed tolerance as a percentage of the expected value.
Returns:
bool -- True if the measured value is within the tolerance, False otherwise.
"""
# Calculate tolerance as a percentage of the expected value
tolerance = (tolerance_percentage / 100) * expected_rms_value
# Log the values for debugging purposes
logger.debug(f"Expected RMS: {expected_rms_value:.2f} mA")
logger.debug(f"Tolerance: {tolerance:.2f} mA")
logger.debug(f"Measured RMS: {measured_rms_value:.2f} mA")
# Check if the measured value is within the range of expected ± tolerance
return (expected_rms_value - tolerance) < measured_rms_value < (expected_rms_value + tolerance)

View file

@ -0,0 +1,154 @@
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
import numpy as np
from scipy import signal
def convert_acq_time(value):
"""
Converts an acquisition time value to a more readable format with units.
- Converts values to m (milli), u (micro), or leaves them as is for whole numbers.
:param value: The numeric value to convert.
:return: A tuple with the value and the unit as separate elements.
"""
if value < 1e-3:
# If the value is smaller than 1 millisecond (10^-3), express in micro (u)
return f"{value * 1e6:.0f}", "us"
elif value < 1:
# If the value is smaller than 1 but larger than or equal to 1 milli (10^-3)
return f"{value * 1e3:.0f}", "ms"
else:
# If the value is 1 or larger, express in seconds (s)
return f"{value:.0f}", "s"
def calculate_rms(data):
"""
Calculate the Root Mean Square (RMS) of a given data array.
:param data: List or numpy array containing the data
:return: RMS value
"""
# Convert to a numpy array for easier mathematical operations
data_array = np.array(data, dtype=np.float64) # Convert to float64 to avoid type issues
# Calculate the RMS value
rms = np.sqrt(np.mean(np.square(data_array)))
return rms
def bytes_to_twobyte_values(data):
value = int.from_bytes(data[0], 'big') << 8 | int.from_bytes(data[1], 'big')
return value
def convert_to_amps(value):
"""
Convert amps to watts
"""
amps = (value & 4095) * (16 ** (0 - (value >> 12)))
return amps
def convert_to_scientific_notation(time: int, unit: str) -> str:
"""
Converts time to scientific notation based on the provided unit.
:param time: The time value to convert.
:param unit: The unit of the time ('us', 'ms', or 's').
:return: A string representing the time in scientific notation.
"""
if unit == 'us': # microseconds
return f"{time}-6"
elif unit == 'ms': # milliseconds
return f"{time}-3"
elif unit == 's': # seconds
return f"{time}"
else:
raise ValueError("Invalid unit. Use 'us', 'ms', or 's'.")
def current_RMS(data, trim=100, num_peaks=1, peak_height=0.008, peak_distance=40, padding=40):
"""
Function to process a given data array, find peaks, split data into chunks,
and then compute the Root Mean Square (RMS) value for each chunk. The function
allows for excluding the first `trim` number of data points, specifies the
number of peaks to consider for chunking, and allows for configurable peak
height and peak distance for detecting peaks.
Args:
- data (list or numpy array): The input data array for RMS calculation.
- trim (int): The number of initial elements to exclude
from the data before processing (default is 100).
- num_peaks (int): The number of peaks to consider for splitting
the data into chunks (default is 1).
- peak_height (float): The minimum height of the peaks to consider
when detecting them (default is 0.008).
- peak_distance (int): The minimum distance (in samples) between
consecutive peaks (default is 40).
- padding (int): The padding to add around the detected peaks when
chunking the data (default is 40).
Returns:
- rms_values (list): A list of RMS values calculated
from the data chunks based on the detected peaks.
The function will exclude the first `trim` elements of the data and look
for peaks that meet the specified criteria (`peak_height` and `peak_distance`).
The data will be split into chunks around the detected peaks, and RMS values
will be computed for each chunk.
"""
# Optionally exclude the first x elements of the data
data = data[trim:]
# Convert the data to a list of floats for consistency
data = [float(x) for x in data]
# Find the peaks in the data using the `find_peaks` function
peaks = signal.find_peaks(data, distance=peak_distance, height=peak_height)[0]
# Check if we have enough peaks, otherwise raise an exception
if len(peaks) < num_peaks:
raise ValueError(
f"Not enough peaks detected. Expected at least {num_peaks}, but found {len(peaks)}."
)
# Limit the number of peaks based on the `num_peaks` parameter
peaks = peaks[:num_peaks]
# Add the start (index 0) and end (index of the last data point) to the list of peak indices
indices = np.concatenate(([0], np.array(peaks), [len(data)]))
# Split the data into chunks based on the peak indices
# with padding of 'padding' elements at both ends
split_data = []
for i in range(len(indices) - 1):
start_idx = indices[i] + padding
end_idx = indices[i + 1] - padding
split_data.append(data[start_idx:end_idx])
# Function to calculate RMS for a given list of data chunks
def calculate_rms(chunks):
"""
Helper function to compute RMS values for each data chunk.
Args:
- chunks (list): A list of data chunks.
Returns:
- rms (list): A list of RMS values, one for each data chunk.
"""
rms = []
for chunk in chunks:
# Calculate RMS by taking the square root of the mean of squared values
rms_value = np.sqrt(np.mean(np.square(chunk)))
rms.append(rms_value)
return rms
# Calculate RMS for each chunk of data
rms_values = calculate_rms(split_data)
# Return the calculated RMS values
return rms_values

View file

@ -657,6 +657,18 @@ class Shell(Pytest):
return test_shell_file return test_shell_file
return None return None
class Power(Pytest):
def generate_command(self):
config = self.instance.testsuite.harness_config
pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'power-twister-harness')]
config['pytest_root'] = pytest_root
command = super().generate_command()
if self.instance.testsuite.harness == 'power':
measurements = config.get('power_measurements')
command.append(f'--testdata={measurements}')
return command
class Gtest(Harness): class Gtest(Harness):
ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')

View file

@ -222,6 +222,7 @@ class TestInstance:
'console', 'console',
'ztest', 'ztest',
'pytest', 'pytest',
'power',
'test', 'test',
'gtest', 'gtest',
'robot', 'robot',
@ -313,7 +314,7 @@ class TestInstance:
device_testing) device_testing)
# check if test is runnable in pytest # check if test is runnable in pytest
if self.testsuite.harness in ['pytest', 'shell']: if self.testsuite.harness in ['pytest', 'shell', 'power']:
target_ready = bool( target_ready = bool(
filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST
) )

View file

@ -107,6 +107,9 @@ schema;scenario-schema:
type: map type: map
required: false required: false
mapping: mapping:
"power_measurements":
type: any
required: false
"shell_commands_file": "shell_commands_file":
type: str type: str
required: false required: false