soc: intel_adsp: tools: add shell support to cavstool.py

Create a pseudo-terminal to access Zephyr shell on the audio DSP.
The shell terminal is enabled with "-p" command-line option.

Signed-off-by: Kai Vehmanen <kai.vehmanen@linux.intel.com>
This commit is contained in:
Kai Vehmanen 2024-05-14 12:01:43 +03:00 committed by Henrik Brix Andersen
commit 5a7600bec6

View file

@ -11,6 +11,7 @@ import subprocess
import ctypes
import mmap
import argparse
import pty
start_output = True
@ -25,7 +26,7 @@ HUGEPAGE_FILE = "/dev/hugepages/cavs-fw-dma.tmp."
#
# Window 0 is the FW_STATUS area, and 4k after that the IPC "outbox"
# Window 1 is the IPC "inbox" (host-writable memory, just 384 bytes currently)
# Window 2 is debug, divided into multiple slots, unused by this script
# Window 2 is used for debug slots (Zephyr shell is one user)
# Window 3 is winstream-formatted log output
WINDOW_BASE = 0x80000
@ -34,6 +35,11 @@ WINDOW_STRIDE = 0x20000
WINDOW_BASE_ACE = 0x180000
WINDOW_STRIDE_ACE = 0x8000
DEBUG_SLOT_SIZE = 4096
DEBUG_SLOT_SHELL = 0
SHELL_RX_SIZE = 256
SHELL_MAX_VALID_SLOT_SIZE = 16777216
# pylint: disable=duplicate-code
# ADSPCS bits
@ -650,24 +656,29 @@ def winstream_offset():
# This SHOULD be just "mem[start:start+length]", but slicing an mmap
# array seems to be unreliable on one of my machines (python 3.6.9 on
# Ubuntu 18.04). Read out bytes individually.
def win_read(start, length):
def win_read(base, start, length):
try:
return b''.join(bar4_mmap[winstream_offset() + x].to_bytes(1, 'little')
return b''.join(bar4_mmap[base + x].to_bytes(1, 'little')
for x in range(start, start + length))
except IndexError as ie:
# A FW in a bad state may cause winstream garbage
log.error("IndexError in bar4_mmap[%d + %d]", winstream_offset(), start)
log.error("IndexError in bar4_mmap[%d + %d]", base, start)
log.error("bar4_mmap.size()=%d", bar4_mmap.size())
raise ie
def win_hdr():
return struct.unpack("<IIII", win_read(0, 16))
def win_hdr(base):
return struct.unpack("<IIII", win_read(base, 0, 16))
# Python implementation of the same algorithm in sys_winstream_read(),
# see there for details.
def winstream_read(last_seq):
def winstream_read(base, last_seq):
while True:
(wlen, start, end, seq) = win_hdr()
(wlen, start, end, seq) = win_hdr(base)
if wlen > SHELL_MAX_VALID_SLOT_SIZE:
log.debug("DSP powered off at winstream_read")
return (seq, "")
if wlen == 0:
return (seq, "")
if last_seq == 0:
last_seq = seq if args.no_history else (seq - ((end - start) % wlen))
if seq == last_seq or start == end:
@ -677,15 +688,87 @@ def winstream_read(last_seq):
return (seq, "")
copy = (end - behind) % wlen
suffix = min(behind, wlen - copy)
result = win_read(16 + copy, suffix)
result = win_read(base, 16 + copy, suffix)
if suffix < behind:
result += win_read(16, behind - suffix)
(wlen, start1, end, seq1) = win_hdr()
result += win_read(base, 16, behind - suffix)
(wlen, start1, end, seq1) = win_hdr(base)
if start1 == start and seq1 == seq:
# Best effort attempt at decoding, replacing unusable characters
# Found to be useful when it really goes wrong
return (seq, result.decode("utf-8", "replace"))
def idx_mod(wlen, idx):
if idx >= wlen:
return idx - wlen
return idx
def idx_sub(wlen, a, b):
return idx_mod(wlen, a + (wlen - b))
# Python implementation of the same algorithm in sys_winstream_write(),
# see there for details.
def winstream_write(base, msg):
(wlen, start, end, seq) = win_hdr(base)
if wlen > SHELL_MAX_VALID_SLOT_SIZE:
log.debug("DSP powered off at winstream_write")
return
if wlen == 0:
return
lenmsg = len(msg)
lenmsg0 = lenmsg
if len(msg) > wlen + 1:
start = end
lenmsg = wlen - 1
lenmsg = min(lenmsg, wlen)
if seq != 0:
avail = (wlen - 1) - idx_sub(wlen, end, start)
if lenmsg > avail:
start = idx_mod(wlen, start + (lenmsg - avail))
if lenmsg < lenmsg0:
start = end
drop = lenmsg0 - lenmsg
msg = msg[drop : lenmsg - drop]
suffix = min(lenmsg, wlen - end)
for c in range(0, suffix):
bar4_mmap[base + 16 + end + c] = msg[c]
if lenmsg > suffix:
for c in range(0, lenmsg - suffix):
bar4_mmap[base + 16 + c] = msg[suffix + c]
end = idx_mod(wlen, end + lenmsg)
seq += lenmsg0
# write back updated fields as 32bit writes
update_hdr = struct.pack("<III", start, end, seq)
dst = base + 4 # skip wlen
for c in range(0, 3):
src = c * 4
bar4_mmap[dst : dst + 4] = update_hdr[src : src + 4]
dst += 4
def debug_offset():
( base, stride ) = adsp_mem_window_config()
return base + stride * 2
def shell_base_offset():
return debug_offset() + DEBUG_SLOT_SIZE * (1 + DEBUG_SLOT_SHELL)
def read_from_shell_memwindow_winstream(last_seq):
offset = shell_base_offset() + SHELL_RX_SIZE
(last_seq, output) = winstream_read(offset, last_seq)
if output:
os.write(shell_client_port, output.encode("utf-8"))
return last_seq
def write_to_shell_memwindow_winstream():
msg = os.read(shell_client_port, 1)
if len(msg) > 0:
winstream_write(shell_base_offset(), msg)
def create_shell_pty():
global shell_client_port
(shell_client_port, user_port) = pty.openpty()
name = os.ttyname(user_port)
log.info(f"shell PTY at: {name}")
asyncio.get_event_loop().add_reader(shell_client_port, write_to_shell_memwindow_winstream)
async def ipc_delay_done():
await asyncio.sleep(0.1)
@ -867,12 +950,18 @@ async def main():
if not args.quiet:
sys.stdout.write("--\n")
if args.shell_pty:
create_shell_pty()
hda_streams = dict()
last_seq = 0
last_seq_shell = 0
while start_output is True:
await asyncio.sleep(0.03)
(last_seq, output) = winstream_read(last_seq)
if args.shell_pty:
last_seq_shell = read_from_shell_memwindow_winstream(last_seq_shell)
(last_seq, output) = winstream_read(winstream_offset(), last_seq)
if output:
sys.stdout.write(output)
sys.stdout.flush()
@ -887,6 +976,8 @@ ap.add_argument("-v", "--verbose", action="store_true",
help="More loader output, DEBUG logging level")
ap.add_argument("-l", "--log-only", action="store_true",
help="Don't load firmware, just show log output")
ap.add_argument("-p", "--shell-pty", action="store_true",
help="Create a Zephyr shell pty if enabled in firmware")
ap.add_argument("-n", "--no-history", action="store_true",
help="No current log buffer at start, just new output")
ap.add_argument("fw_file", nargs="?", help="Firmware file")