debug/coredump: add a primitive coredump mechanism

This adds a very primitive coredump mechanism under subsys/debug
where during fatal error, register and memory content can be
dumped to coredump backend. One such backend utilizing log
module for output is included. Once the coredump log is converted
to a binary file, it can be used with the ELF output file as
inputs to an overly simplified implementation of a GDB server.
This GDB server can be attached via the target remote command of
GDB and will be serving register and memory content. This allows
using GDB to examine stack and memory where the fatal error
occurred.

Signed-off-by: Daniel Leung <daniel.leung@intel.com>
This commit is contained in:
Daniel Leung 2020-08-07 10:47:37 -07:00 committed by Anas Nashif
commit 49206a86ff
23 changed files with 1692 additions and 0 deletions

View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
import argparse
import logging
import os
import socket
import sys
from parser.log_parser import CoredumpLogFile
from parser.elf_parser import CoredumpElfFile
import gdbstubs
LOGGING_FORMAT = "[%(levelname)s][%(name)s] %(message)s"
# Only bind to local host
GDBSERVER_HOST = ""
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("elffile", help="Zephyr ELF binary")
parser.add_argument("logfile", help="Coredump binary log file")
parser.add_argument("--debug", action="store_true",
help="Print extra debugging information")
parser.add_argument("--port", type=int, default=1234,
help="GDB server port")
parser.add_argument("-v", "--verbose", action="store_true",
help="Print more information")
return parser.parse_args()
def main():
args = parse_args()
# Setup logging
logging.basicConfig(format=LOGGING_FORMAT)
# Setup logging for "parser"
logger = logging.getLogger("parser")
if args.debug:
logger.setLevel(logging.DEBUG)
elif args.verbose:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
# Setup logging for follow code
logger = logging.getLogger("gdbserver")
if args.debug:
logger.setLevel(logging.DEBUG)
else:
# Use INFO as default since we need to let user
# know what is going on
logger.setLevel(logging.INFO)
# Setup logging for "gdbstuc"
logger = logging.getLogger("gdbstub")
if args.debug:
logger.setLevel(logging.DEBUG)
elif args.verbose:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
if not os.path.isfile(args.elffile):
logger.error(f"Cannot find file {args.elffile}, exiting...")
sys.exit(1)
if not os.path.isfile(args.logfile):
logger.error(f"Cannot find file {args.logfile}, exiting...")
sys.exit(1)
logger.info(f"Log file: {args.logfile}")
logger.info(f"ELF file: {args.elffile}")
# Parse the coredump binary log file
logf = CoredumpLogFile(args.logfile)
logf.open()
if not logf.parse():
logger.error("Cannot parse log file, exiting...")
logf.close()
sys.exit(1)
# Parse ELF file for code and read-only data
elff = CoredumpElfFile(args.elffile)
elff.open()
if not elff.parse():
logger.error("Cannot parse ELF file, exiting...")
elff.close()
logf.close()
sys.exit(1)
gdbstub = gdbstubs.get_gdbstub(logf, elff)
# Start a GDB server
gdbserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Reuse address so we don't have to wait for socket to be
# close before we can bind to the port again
gdbserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
gdbserver.bind((GDBSERVER_HOST, args.port))
gdbserver.listen(1)
logger.info(f"Waiting GDB connection on port {args.port}...")
conn, remote = gdbserver.accept()
if conn:
logger.info(f"Accepted GDB connection from {remote}")
gdbstub.run(conn)
conn.close()
gdbserver.close()
logger.info("GDB session finished.")
elff.close()
logf.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,99 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
import argparse
import binascii
import sys
COREDUMP_PREFIX_STR = "#CD:"
COREDUMP_BEGIN_STR = COREDUMP_PREFIX_STR + "BEGIN#"
COREDUMP_END_STR = COREDUMP_PREFIX_STR + "END#"
COREDUMP_ERROR_STR = COREDUMP_PREFIX_STR + "ERROR CANNOT DUMP#"
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("infile", help="Serial Log File")
parser.add_argument("outfile",
help="Output file for use with coredump GDB server")
return parser.parse_args()
def main():
args = parse_args()
infile = open(args.infile, "r")
if not infile:
print(f"ERROR: Cannot open input file: {args.infile}, exiting...")
sys.exit(1)
outfile = open(args.outfile, "wb")
if not outfile:
print(f"ERROR: Cannot open output file for write: {args.outfile}, exiting...")
sys.exit(1)
print(f"Input file {args.infile}")
print(f"Output file {args.outfile}")
has_begin = False
has_end = False
has_error = False
go_parse_line = False
bytes_written = 0
for line in infile.readlines():
if line.find(COREDUMP_BEGIN_STR) >= 0:
# Found "BEGIN#" - beginning of log
has_begin = True
go_parse_line = True
continue
if line.find(COREDUMP_END_STR) >= 0:
# Found "END#" - end of log
has_end = True
go_parse_line = False
break
if line.find(COREDUMP_ERROR_STR) >= 0:
# Error was encountered during dumping:
# log is not usable
has_error = True
go_parse_line = False
break
if not go_parse_line:
continue
prefix_idx = line.find(COREDUMP_PREFIX_STR)
if prefix_idx < 0:
continue
prefix_idx += len(COREDUMP_PREFIX_STR)
hex_str = line[prefix_idx:].strip()
binary_data = binascii.unhexlify(hex_str)
outfile.write(binary_data)
bytes_written += len(binary_data)
if not has_begin:
print("ERROR: Beginning of log not found!")
elif not has_end:
print("WARN: End of log not found! Is log complete?")
elif has_error:
print("ERROR: log has error.")
else:
print(f"Bytes written {bytes_written}")
infile.close()
outfile.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,15 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
class TgtCode:
UNKNOWN = 0
def get_gdbstub(logfile, elffile):
stub = None
tgt_code = logfile.log_hdr['tgt_code']
return stub

View file

@ -0,0 +1,5 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,205 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
import abc
import binascii
import logging
logger = logging.getLogger("gdbstub")
class GdbStub(abc.ABC):
def __init__(self, logfile, elffile):
self.logfile = logfile
self.elffile = elffile
self.socket = None
self.gdb_signal = None
mem_regions = list()
for r in logfile.get_memory_regions():
mem_regions.append(r)
for r in elffile.get_memory_regions():
mem_regions.append(r)
self.mem_regions = mem_regions
def get_gdb_packet(self):
socket = self.socket
if socket is None:
return None
data = b''
checksum = 0
# Wait for '$'
while True:
ch = socket.recv(1)
if ch == b'$':
break
# Get a full packet
while True:
ch = socket.recv(1)
if ch == b'#':
# End of packet
break
checksum += ord(ch)
data += ch
# Get checksum (2-bytes)
ch = socket.recv(2)
in_chksum = ord(binascii.unhexlify(ch))
logger.debug(f"Received GDB packet: {data}")
if (checksum % 256) == in_chksum:
# ACK
logger.debug("ACK")
socket.send(b'+')
return data
else:
# NACK
logger.debug(f"NACK (checksum {in_chksum} != {checksum}")
socket.send(b'-')
return None
def put_gdb_packet(self, data):
socket = self.socket
if socket is None:
return
checksum = 0
for d in data:
checksum += d
pkt = b'$' + data + b'#'
checksum = checksum % 256
pkt += format(checksum, "02X").encode()
logger.debug(f"Sending GDB packet: {pkt}")
socket.send(pkt)
def handle_signal_query_packet(self):
# the '?' packet
pkt = b'S'
pkt += format(self.gdb_signal, "02X").encode()
self.put_gdb_packet(pkt)
@abc.abstractmethod
def handle_register_group_read_packet(self):
# the 'g' packet for reading a group of registers
pass
def handle_register_group_write_packet(self):
# the 'G' packet for writing to a group of registers
#
# We don't support writing so return error
self.put_gdb_packet(b"E01")
def handle_register_single_read_packet(self, pkt):
# the 'p' packet for reading a single register
self.put_gdb_packet(b"E01")
def handle_register_single_write_packet(self, pkt):
# the 'P' packet for writing to registers
#
# We don't support writing so return error
self.put_gdb_packet(b"E01")
def handle_memory_read_packet(self, pkt):
# the 'm' packet for reading memory: m<addr>,<len>
def get_mem_region(addr):
for r in self.mem_regions:
if r['start'] <= addr <= r['end']:
return r
return None
# extract address and length from packet
# and convert them into usable integer values
str_addr, str_length = pkt[1:].split(b',')
s_addr = int(b'0x' + str_addr, 16)
length = int(b'0x' + str_length, 16)
# FIXME: Need more efficient way of extracting memory content
remaining = length
addr = s_addr
barray = b''
r = get_mem_region(addr)
while remaining > 0:
if addr > r['end']:
r = get_mem_region(addr)
if r is None:
barray = None
break
offset = addr - r['start']
barray += r['data'][offset:offset+1]
addr += 1
remaining -= 1
if barray is not None:
pkt = binascii.hexlify(barray)
self.put_gdb_packet(pkt)
else:
self.put_gdb_packet(b"E01")
def handle_memory_write_packet(self, pkt):
# the 'M' packet for writing to memory
#
# We don't support writing so return error
self.put_gdb_packet(b"E02")
def handle_general_query_packet(self, pkt):
self.put_gdb_packet(b'')
def run(self, socket):
self.socket = socket
while True:
pkt = self.get_gdb_packet()
if pkt is None:
continue
pkt_type = pkt[0:1]
logger.debug(f"Got packet type: {pkt_type}")
if pkt_type == b'?':
self.handle_signal_query_packet()
elif pkt_type in (b'C', b'S'):
# Continue/stepping execution, which is not supported.
# So signal exception again
self.handle_signal_query_packet()
elif pkt_type == b'g':
self.handle_register_group_read_packet()
elif pkt_type == b'G':
self.handle_register_group_write_packet()
elif pkt_type == b'p':
self.handle_register_single_read_packet(pkt)
elif pkt_type == b'P':
self.handle_register_single_write_packet(pkt)
elif pkt_type == b'm':
self.handle_memory_read_packet(pkt)
elif pkt_type == b'M':
self.handle_memory_write_packet(pkt)
elif pkt_type == b'q':
self.handle_general_query_packet(pkt)
elif pkt_type == b'k':
# GDB quits
break
else:
self.put_gdb_packet(b'')

View file

@ -0,0 +1,5 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,93 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
import logging
import elftools
from elftools.elf.elffile import ELFFile
# ELF section flags
SHF_WRITE = 0x1
SHF_ALLOC = 0x2
SHF_EXEC = 0x4
SHF_WRITE_ALLOC = SHF_WRITE | SHF_ALLOC
SHF_ALLOC_EXEC = SHF_ALLOC | SHF_EXEC
logger = logging.getLogger("parser")
class CoredumpElfFile():
"""
Class to parse ELF file for memory content in various sections.
There are read-only sections (e.g. text and rodata) where
the memory content does not need to be dumped via coredump
and can be retrived from the ELF file.
"""
def __init__(self, elffile):
self.elffile = elffile
self.fd = None
self.elf = None
self.memory_regions = list()
def open(self):
self.fd = open(self.elffile, "rb")
self.elf = ELFFile(self.fd)
def close(self):
self.fd.close()
def get_memory_regions(self):
return self.memory_regions
def parse(self):
if self.fd is None:
self.open()
for section in self.elf.iter_sections():
# REALLY NEED to match exact type as all other sections
# (debug, text, etc.) are descendants where
# isinstance() would match.
if type(section) is not elftools.elf.sections.Section: # pylint: disable=unidiomatic-typecheck
continue
size = section['sh_size']
flags = section['sh_flags']
sec_start = section['sh_addr']
sec_end = sec_start + size - 1
store = False
sect_desc = "?"
if section['sh_type'] == 'SHT_PROGBITS':
if (flags & SHF_ALLOC_EXEC) == SHF_ALLOC_EXEC:
# Text section
store = True
sect_desc = "text"
elif (flags & SHF_WRITE_ALLOC) == SHF_WRITE_ALLOC:
# Data section
#
# Running app changes the content so no need
# to store
pass
elif (flags & SHF_ALLOC) == SHF_ALLOC:
# Read only data section
store = True
sect_desc = "read-only data"
if store:
mem_region = {"start": sec_start, "end": sec_end, "data": section.data()}
logger.info("ELF Section: 0x%x to 0x%x of size %d (%s)" %
(mem_region["start"],
mem_region["end"],
len(mem_region["data"]),
sect_desc))
self.memory_regions.append(mem_region)
return True

View file

@ -0,0 +1,167 @@
#!/usr/bin/env python3
#
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
import logging
import struct
# Note: keep sync with C code
Z_COREDUMP_HDR_ID = b'ZE'
Z_COREDUMP_HDR_VER = 1
LOG_HDR_STRUCT = "<ccHHBBI"
LOG_HDR_SIZE = struct.calcsize(LOG_HDR_STRUCT)
Z_COREDUMP_ARCH_HDR_ID = b'A'
LOG_ARCH_HDR_STRUCT = "<cHH"
LOG_ARCH_HDR_SIZE = struct.calcsize(LOG_ARCH_HDR_STRUCT)
Z_COREDUMP_MEM_HDR_ID = b'M'
Z_COREDUMP_MEM_HDR_VER = 1
LOG_MEM_HDR_STRUCT = "<cH"
LOG_MEM_HDR_SIZE = struct.calcsize(LOG_MEM_HDR_STRUCT)
logger = logging.getLogger("parser")
def reason_string(reason):
# Keep sync with "enum k_fatal_error_reason"
ret = "(Unknown)"
if reason == 0:
ret = "K_ERR_CPU_EXCEPTION"
elif reason == 1:
ret = "K_ERR_SPURIOUS_IRQ"
elif reason == 2:
ret = "K_ERR_STACK_CHK_FAIL"
elif reason == 3:
ret = "K_ERR_KERNEL_OOPS"
elif reason == 4:
ret = "K_ERR_KERNEL_PANIC"
return ret
class CoredumpLogFile:
"""
Process the binary coredump file for register block
and memory blocks.
"""
def __init__(self, logfile):
self.logfile = logfile
self.fd = None
self.log_hdr = None
self.arch_data = list()
self.memory_regions = list()
def open(self):
self.fd = open(self.logfile, "rb")
def close(self):
self.fd.close()
def get_arch_data(self):
return self.arch_data
def get_memory_regions(self):
return self.memory_regions
def parse_arch_section(self):
hdr = self.fd.read(LOG_ARCH_HDR_SIZE)
_, hdr_ver, num_bytes = struct.unpack(LOG_ARCH_HDR_STRUCT, hdr)
arch_data = self.fd.read(num_bytes)
self.arch_data = {"hdr_ver" : hdr_ver, "data" : arch_data}
return True
def parse_memory_section(self):
hdr = self.fd.read(LOG_MEM_HDR_SIZE)
_, hdr_ver = struct.unpack(LOG_MEM_HDR_STRUCT, hdr)
if hdr_ver != Z_COREDUMP_MEM_HDR_VER:
logger.error(f"Memory block version: {hdr_ver}, expected {Z_COREDUMP_MEM_HDR_VER}!")
return False
# Figure out how to read the start and end addresses
ptr_fmt = None
if self.log_hdr["ptr_size"] == 64:
ptr_fmt = "QQ"
elif self.log_hdr["ptr_size"] == 32:
ptr_fmt = "II"
else:
return False
data = self.fd.read(struct.calcsize(ptr_fmt))
saddr, eaddr = struct.unpack(ptr_fmt, data)
size = eaddr - saddr
data = self.fd.read(size)
mem = {"start": saddr, "end": eaddr, "data": data}
self.memory_regions.append(mem)
logger.info("Memory: 0x%x to 0x%x of size %d" %
(saddr, eaddr, size))
return True
def parse(self):
if self.fd is None:
self.open()
hdr = self.fd.read(LOG_HDR_SIZE)
id1, id2, hdr_ver, tgt_code, ptr_size, flags, reason = struct.unpack(LOG_HDR_STRUCT, hdr)
if (id1 + id2) != Z_COREDUMP_HDR_ID:
# ID in header does not match
logger.error("Log header ID not found...")
return False
if hdr_ver != Z_COREDUMP_HDR_VER:
logger.error(f"Log version: {hdr_ver}, expected: {Z_COREDUMP_HDR_VER}!")
return False
ptr_size = 2 ** ptr_size
self.log_hdr = {
"hdr_version": hdr_ver,
"tgt_code": tgt_code,
"ptr_size": ptr_size,
"flags": flags,
"reason": reason,
}
logger.info("Reason: {0}".format(reason_string(reason)))
logger.info(f"Pointer size {ptr_size}")
del id1, id2, hdr_ver, tgt_code, ptr_size, flags, reason
while True:
section_id = self.fd.read(1)
if not section_id:
# no more data to read
break
self.fd.seek(-1, 1) # go back 1 byte
if section_id == Z_COREDUMP_ARCH_HDR_ID:
if not self.parse_arch_section():
logger.error("Cannot parse architecture section")
return False
elif section_id == Z_COREDUMP_MEM_HDR_ID:
if not self.parse_memory_section():
logger.error("Cannot parse memory section")
return False
else:
# Unknown section in log file
logger.error(f"Unknown section in log file with ID {section_id}")
return False
return True