tests: Bluetooth: Classic: Add uuid128 service test

add one sdp service that contains uuid128, improve test to save bumble
hci log.

Signed-off-by: Mark Wang <yichang.wang@nxp.com>
This commit is contained in:
Mark Wang 2025-06-04 09:01:03 +08:00 committed by Benjamin Cabé
commit 263cc93827
2 changed files with 285 additions and 164 deletions

View file

@ -9,13 +9,19 @@ from bumble.core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
BT_L2CAP_PROTOCOL_ID, BT_L2CAP_PROTOCOL_ID,
BT_OBEX_PROTOCOL_ID, BT_OBEX_PROTOCOL_ID,
BT_SERIAL_PORT_SERVICE,
CommandTimeoutError, CommandTimeoutError,
DeviceClass, DeviceClass,
) )
from bumble.device import Device from bumble.device import Device
from bumble.hci import Address from bumble.hci import Address
from bumble.sdp import SDP_ALL_ATTRIBUTES_RANGE, SDP_PUBLIC_BROWSE_ROOT from bumble.sdp import (
SDP_ALL_ATTRIBUTES_RANGE,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_PUBLIC_BROWSE_ROOT,
)
from bumble.sdp import Client as SDP_Client from bumble.sdp import Client as SDP_Client
from bumble.snoop import BtSnooper
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from twister_harness import DeviceAdapter, Shell from twister_harness import DeviceAdapter, Shell
@ -52,13 +58,13 @@ class discovery_listener(Device.Listener):
logger.info(f'>>> {address}:') logger.info(f'>>> {address}:')
logger.info(f' Device Class (raw): {class_of_device:06X}') logger.info(f' Device Class (raw): {class_of_device:06X}')
major_class_name = DeviceClass.major_device_class_name(major_device_class) major_class_name = DeviceClass.major_device_class_name(major_device_class)
logger.info(' Device Major Class: ' f'{major_class_name}') logger.info(f' Device Major Class: {major_class_name}')
minor_class_name = DeviceClass.minor_device_class_name( minor_class_name = DeviceClass.minor_device_class_name(
major_device_class, minor_device_class major_device_class, minor_device_class
) )
logger.info(' Device Minor Class: ' f'{minor_class_name}') logger.info(f' Device Minor Class: {minor_class_name}')
logger.info( logger.info(
' Device Services: ' f'{", ".join(DeviceClass.service_class_labels(service_classes))}' f' Device Services: {", ".join(DeviceClass.service_class_labels(service_classes))}'
) )
logger.info(f' RSSI: {rssi}') logger.info(f' RSSI: {rssi}')
if data.ad_structures: if data.ad_structures:
@ -92,187 +98,221 @@ async def br_connect(hci_port, shell, address) -> None:
hci_transport.source, hci_transport.source,
hci_transport.sink, hci_transport.sink,
) )
device.classic_enabled = True
device.le_enabled = False
await device_power_on(device)
target_address = address.split(" ")[0] with open("bumble_hci_sdp_s_discover.log", "wb") as snoop_file:
logger.info(f'=== Connecting to {target_address}...') device.host.snooper = BtSnooper(snoop_file)
try: device.classic_enabled = True
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) device.le_enabled = False
logger.info(f'=== Connected to {connection.peer_address}!') await device_power_on(device)
except CommandTimeoutError as e:
logger.info('!!! Connection timed out')
raise e
# Connect to the SDP Server target_address = address.split(" ")[0]
sdp_client = SDP_Client(connection) logger.info(f'=== Connecting to {target_address}...')
await sdp_client.connect() try:
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
logger.info(f'=== Connected to {connection.peer_address}!')
except CommandTimeoutError as e:
logger.info('!!! Connection timed out')
raise e
# List all services in the root browse group # Connect to the SDP Server
logger.info("<<< 1 List all services in the root browse group") sdp_client = SDP_Client(connection)
service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) await sdp_client.connect()
logger.info(f'SERVICES: {service_record_handles}')
# For each service in the root browse group, get all its attributes # List all services in the root browse group
assert len(service_record_handles) == 0 logger.info("<<< 1 List all services in the root browse group")
for service_record_handle in service_record_handles: service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT])
attributes = await sdp_client.get_attributes( logger.info(f'SERVICES: {service_record_handles}')
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
# For each service in the root browse group, get all its attributes
assert len(service_record_handles) == 0
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes:
logger.info(f' {attribute}')
# Install SDP Record 0
shell.exec_command("sdp_server register_sdp 0")
# List all services in the root browse group
logger.info("<<< 2 List all services in the root browse group")
service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT])
logger.info(f'SERVICES: {service_record_handles}')
# For each service in the root browse group, get all its attributes
assert len(service_record_handles) != 0
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes:
logger.info(f' {attribute}')
logger.info("<<< 3 List all attributes with L2CAP protocol")
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE]
) )
logger.info(f'SERVICE {service_record_handle:04X} attributes:') assert len(search_result) != 0
for attribute in attributes: logger.info('SEARCH RESULTS:')
logger.info(f' {attribute}') for attribute_list in search_result:
logger.info('SERVICE:')
logger.info(
' ' + '\n '.join([attribute.to_string() for attribute in attribute_list])
)
# Install SDP Record 0 logger.info("<<< 4 List all attributes with OBEX protocol")
shell.exec_command("sdp_server register_sdp 0") search_result = await sdp_client.search_attributes(
[BT_OBEX_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE]
# List all services in the root browse group
logger.info("<<< 2 List all services in the root browse group")
service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT])
logger.info(f'SERVICES: {service_record_handles}')
# For each service in the root browse group, get all its attributes
assert len(service_record_handles) != 0
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
) )
logger.info(f'SERVICE {service_record_handle:04X} attributes:') logger.info(f'SEARCH RESULTS:{search_result}')
for attribute in attributes: assert len(search_result) == 0
logger.info(f' {attribute}')
logger.info("<<< 3 List all attributes with L2CAP protocol") logger.info("<<< 5 List all attributes with L2CAP protocol (0x1000~0xFFFF)")
search_result = await sdp_client.search_attributes( search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] [BT_L2CAP_PROTOCOL_ID], [(0x1000, 0xFFFF)]
)
assert len(search_result) != 0
logger.info('SEARCH RESULTS:')
for attribute_list in search_result:
logger.info('SERVICE:')
logger.info(' ' + '\n '.join([attribute.to_string() for attribute in attribute_list]))
logger.info("<<< 4 List all attributes with OBEX protocol")
search_result = await sdp_client.search_attributes(
[BT_OBEX_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SEARCH RESULTS:{search_result}')
assert len(search_result) == 0
logger.info("<<< 5 List all attributes with L2CAP protocol (0x1000~0xFFFF)")
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID], [(0x1000, 0xFFFF)]
)
logger.info(f'SEARCH RESULTS:{search_result}')
assert len(search_result) == 0
# Install SDP Record 1
shell.exec_command("sdp_server register_sdp 1")
# List all services with L2CAP
logger.info("<<< 6 List all services with L2CAP protocol")
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}')
assert len(service_record_handles) != 0
# For each service in the root browse group, get all its attributes
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
) )
logger.info(f'SERVICE {service_record_handle:04X} attributes:') logger.info(f'SEARCH RESULTS:{search_result}')
for attribute in attributes: assert len(search_result) == 0
logger.info(f' {attribute}')
# Install SDP Record 2 # Install SDP Record 1
shell.exec_command("sdp_server register_sdp 2") shell.exec_command("sdp_server register_sdp 1")
# List all services with L2CAP # List all services with L2CAP
logger.info("<<< 7 List all services with L2CAP protocol") logger.info("<<< 6 List all services with L2CAP protocol")
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}') logger.info(f'SERVICES: {service_record_handles}')
assert len(service_record_handles) != 0 assert len(service_record_handles) != 0
# For each service in the root browse group, get all its attributes # For each service in the root browse group, get all its attributes
for service_record_handle in service_record_handles: for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes( attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes:
logger.info(f' {attribute}')
# Install SDP Record 2
shell.exec_command("sdp_server register_sdp 2")
# List all services with L2CAP
logger.info("<<< 7 List all services with L2CAP protocol")
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}')
assert len(service_record_handles) != 0
# For each service in the root browse group, get all its attributes
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes:
logger.info(f' {attribute}')
# Install SDP Record all
shell.exec_command("sdp_server register_sdp_all")
# List all services with L2CAP
logger.info("<<< 8 List all services with L2CAP protocol")
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}')
assert len(service_record_handles) != 0
# For each service in the root browse group, get all its attributes
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes:
logger.info(f' {attribute}')
logger.info("<<< 9 List all attributes with L2CAP protocol")
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE]
) )
logger.info(f'SERVICE {service_record_handle:04X} attributes:') assert len(search_result) != 0
for attribute in attributes: logger.info('SEARCH RESULTS:')
logger.info(f' {attribute}') for attribute_list in search_result:
logger.info('SERVICE:')
logger.info(
' ' + '\n '.join([attribute.to_string() for attribute in attribute_list])
)
# Install SDP Record all # Install SDP large Record
shell.exec_command("sdp_server register_sdp_all") shell.exec_command("sdp_server register_sdp_large")
# List all services with L2CAP logger.info("<<< 10 List all services with L2CAP protocol")
logger.info("<<< 8 List all services with L2CAP protocol") service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) logger.info(f'SERVICES: {service_record_handles}')
logger.info(f'SERVICES: {service_record_handles}')
assert len(service_record_handles) != 0 assert len(service_record_handles) != 0
# For each service in the root browse group, get all its attributes # For each service in the root browse group, get all its attributes
for service_record_handle in service_record_handles: for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes( attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
) )
logger.info(f'SERVICE {service_record_handle:04X} attributes:') logger.info(f'SERVICE {service_record_handle:04X} attributes:')
for attribute in attributes: service_name_found = False
logger.info(f' {attribute}') for attribute in attributes:
logger.info(f' {attribute}')
if attribute.id == 0x100:
service_name_found = True
if service_record_handle == service_record_handles[-1]:
assert service_name_found is False
logger.info("<<< 9 List all attributes with L2CAP protocol") # Install SDP large Record
search_result = await sdp_client.search_attributes( shell.exec_command("sdp_server register_sdp_large_valid")
[BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE]
)
assert len(search_result) != 0
logger.info('SEARCH RESULTS:')
for attribute_list in search_result:
logger.info('SERVICE:')
logger.info(' ' + '\n '.join([attribute.to_string() for attribute in attribute_list]))
# Install SDP large Record logger.info("<<< 11 List all services with L2CAP protocol with valid service name")
shell.exec_command("sdp_server register_sdp_large") service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}')
logger.info("<<< 10 List all services with L2CAP protocol") assert len(service_record_handles) != 0
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) # For each service in the root browse group, get all its attributes
logger.info(f'SERVICES: {service_record_handles}') for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
service_name_found = False
for attribute in attributes:
logger.info(f' {attribute}')
if attribute.id == 0x100:
service_name_found = True
if service_record_handle == service_record_handles[-1]:
assert service_name_found is True
assert len(service_record_handles) != 0 # Install SDP uuid128 Record
# For each service in the root browse group, get all its attributes shell.exec_command("sdp_server register_sdp_uuid128")
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
logger.info(f'SERVICE {service_record_handle:04X} attributes:')
service_name_found = False
for attribute in attributes:
logger.info(f' {attribute}')
if attribute.id == 0x100:
service_name_found = True
if service_record_handle == service_record_handles[-1]:
assert service_name_found is False
# Install SDP large Record logger.info("<<< 12 List all services with L2CAP protocol")
shell.exec_command("sdp_server register_sdp_large_valid") service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID])
logger.info(f'SERVICES: {service_record_handles}')
logger.info("<<< 11 List all services with L2CAP protocol with valid service name") assert len(service_record_handles) != 0
service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) # For each service in the root browse group, get all its attributes
logger.info(f'SERVICES: {service_record_handles}') for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
assert len(service_record_handles) != 0 service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
# For each service in the root browse group, get all its attributes )
for service_record_handle in service_record_handles: logger.info(f'SERVICE {service_record_handle:04X} attributes:')
attributes = await sdp_client.get_attributes( profile_found = False
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] for attribute in attributes:
) logger.info(f' {attribute}')
logger.info(f'SERVICE {service_record_handle:04X} attributes:') # The new added service is the last one.
service_name_found = False if (
for attribute in attributes: service_record_handle == service_record_handles[-1]
logger.info(f' {attribute}') and attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
if attribute.id == 0x100: and attribute.is_uuid_in_value(BT_SERIAL_PORT_SERVICE, attribute.value)
service_name_found = True ):
if service_record_handle == service_record_handles[-1]: profile_found = True
assert service_name_found is True if service_record_handle == service_record_handles[-1]:
assert profile_found is True
class TestSdpServer: class TestSdpServer:

View file

@ -154,7 +154,7 @@ static struct bt_sdp_attribute spp_attrs_large_valid[] = {
static struct bt_sdp_record spp_rec_large_valid = BT_SDP_RECORD(spp_attrs_large_valid); static struct bt_sdp_record spp_rec_large_valid = BT_SDP_RECORD(spp_attrs_large_valid);
#define MAX_SDP_RECORD_COUNT 8 #define MAX_SDP_RECORD_COUNT 7
#define _SDP_ATTRS_DEFINE(index, sdp_attrs_name) \ #define _SDP_ATTRS_DEFINE(index, sdp_attrs_name) \
static struct bt_sdp_attribute sdp_attrs_name##index[] = { \ static struct bt_sdp_attribute sdp_attrs_name##index[] = { \
@ -296,11 +296,92 @@ static int cmd_register_sdp_large_valid(const struct shell *sh, size_t argc, cha
return 0; return 0;
} }
uint8_t serial_port_svclass_uuid128[16] = {0x00, 0x00, 0x11, 0x01, 0x00, 0x00, 0x10, 0x00,
0x80, 0x00, 0x00, 0x80, 0x5F, 0x9B, 0x34, 0xFB};
static struct bt_sdp_attribute spp_attrs_uuid128[] = {
BT_SDP_NEW_SERVICE,
BT_SDP_LIST(
BT_SDP_ATTR_SVCLASS_ID_LIST,
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 17),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE(BT_SDP_UUID128),
serial_port_svclass_uuid128
},
)
),
BT_SDP_LIST(
BT_SDP_ATTR_PROTO_DESC_LIST,
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 12),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 3),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE(BT_SDP_UUID16),
BT_SDP_ARRAY_16(BT_SDP_PROTO_L2CAP)
},
)
},
{
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 5),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE(BT_SDP_UUID16),
BT_SDP_ARRAY_16(BT_SDP_PROTO_RFCOMM)
},
{
BT_SDP_TYPE_SIZE(BT_SDP_UINT8),
BT_SDP_ARRAY_8(BT_RFCOMM_CHAN_SPP)
},
)
},
)
),
BT_SDP_LIST(
BT_SDP_ATTR_PROFILE_DESC_LIST,
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 22),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 20),
BT_SDP_DATA_ELEM_LIST(
{
BT_SDP_TYPE_SIZE(BT_SDP_UUID128),
serial_port_svclass_uuid128
},
{
BT_SDP_TYPE_SIZE(BT_SDP_UINT16),
BT_SDP_ARRAY_16(0x0102)
},
)
},
)
),
BT_SDP_SERVICE_NAME("sdp_uuid128"),
};
static struct bt_sdp_record spp_rec_uuid128 = BT_SDP_RECORD(spp_attrs_uuid128);
static int cmd_register_sdp_uuid128(const struct shell *sh, size_t argc, char *argv[])
{
int err;
sh = sh;
err = bt_sdp_register_service(&spp_rec_uuid128);
if (err) {
shell_error(sh, "Register SDP uuid128 failed (err %d)", err);
}
return 0;
}
SHELL_STATIC_SUBCMD_SET_CREATE(sdp_server_cmds, SHELL_STATIC_SUBCMD_SET_CREATE(sdp_server_cmds,
SHELL_CMD_ARG(register_sdp, NULL, "<SDP Record Index>", cmd_register_sdp, 2, 0), SHELL_CMD_ARG(register_sdp, NULL, "<SDP Record Index>", cmd_register_sdp, 2, 0),
SHELL_CMD_ARG(register_sdp_all, NULL, "", cmd_register_sdp_all, 1, 0), SHELL_CMD_ARG(register_sdp_all, NULL, "", cmd_register_sdp_all, 1, 0),
SHELL_CMD_ARG(register_sdp_large, NULL, "", cmd_register_sdp_large, 1, 0), SHELL_CMD_ARG(register_sdp_large, NULL, "", cmd_register_sdp_large, 1, 0),
SHELL_CMD_ARG(register_sdp_large_valid, NULL, "", cmd_register_sdp_large_valid, 1, 0), SHELL_CMD_ARG(register_sdp_large_valid, NULL, "", cmd_register_sdp_large_valid, 1, 0),
SHELL_CMD_ARG(register_sdp_uuid128, NULL, "", cmd_register_sdp_uuid128, 1, 0),
SHELL_SUBCMD_SET_END SHELL_SUBCMD_SET_END
); );