From 263cc93827d989fd219efcd14d535d0585a00d20 Mon Sep 17 00:00:00 2001 From: Mark Wang Date: Wed, 4 Jun 2025 09:01:03 +0800 Subject: [PATCH] tests: Bluetooth: Classic: Add uuid128 service test add one sdp service that contains uuid128, improve test to save bumble hci log. Signed-off-by: Mark Wang --- .../classic/sdp_s/pytest/test_sdp.py | 366 ++++++++++-------- .../bluetooth/classic/sdp_s/src/sdp_server.c | 83 +++- 2 files changed, 285 insertions(+), 164 deletions(-) diff --git a/tests/bluetooth/classic/sdp_s/pytest/test_sdp.py b/tests/bluetooth/classic/sdp_s/pytest/test_sdp.py index 92ed24eaef6..2c10440727a 100644 --- a/tests/bluetooth/classic/sdp_s/pytest/test_sdp.py +++ b/tests/bluetooth/classic/sdp_s/pytest/test_sdp.py @@ -9,13 +9,19 @@ from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID, BT_OBEX_PROTOCOL_ID, + BT_SERIAL_PORT_SERVICE, CommandTimeoutError, DeviceClass, ) from bumble.device import Device from bumble.hci import Address -from bumble.sdp import SDP_ALL_ATTRIBUTES_RANGE, SDP_PUBLIC_BROWSE_ROOT +from bumble.sdp import ( + SDP_ALL_ATTRIBUTES_RANGE, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_PUBLIC_BROWSE_ROOT, +) from bumble.sdp import Client as SDP_Client +from bumble.snoop import BtSnooper from bumble.transport import open_transport_or_link from twister_harness import DeviceAdapter, Shell @@ -52,13 +58,13 @@ class discovery_listener(Device.Listener): logger.info(f'>>> {address}:') logger.info(f' Device Class (raw): {class_of_device:06X}') major_class_name = DeviceClass.major_device_class_name(major_device_class) - logger.info(' Device Major Class: ' f'{major_class_name}') + logger.info(f' Device Major Class: {major_class_name}') minor_class_name = DeviceClass.minor_device_class_name( major_device_class, minor_device_class ) - logger.info(' Device Minor Class: ' f'{minor_class_name}') + logger.info(f' Device Minor Class: {minor_class_name}') logger.info( - ' Device Services: ' f'{", ".join(DeviceClass.service_class_labels(service_classes))}' + f' Device Services: {", ".join(DeviceClass.service_class_labels(service_classes))}' ) logger.info(f' RSSI: {rssi}') if data.ad_structures: @@ -92,187 +98,221 @@ async def br_connect(hci_port, shell, address) -> None: hci_transport.source, hci_transport.sink, ) - device.classic_enabled = True - device.le_enabled = False - await device_power_on(device) - target_address = address.split(" ")[0] - logger.info(f'=== Connecting to {target_address}...') - try: - connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) - logger.info(f'=== Connected to {connection.peer_address}!') - except CommandTimeoutError as e: - logger.info('!!! Connection timed out') - raise e + with open("bumble_hci_sdp_s_discover.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + device.classic_enabled = True + device.le_enabled = False + await device_power_on(device) - # Connect to the SDP Server - sdp_client = SDP_Client(connection) - await sdp_client.connect() + target_address = address.split(" ")[0] + logger.info(f'=== Connecting to {target_address}...') + try: + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + logger.info(f'=== Connected to {connection.peer_address}!') + except CommandTimeoutError as e: + logger.info('!!! Connection timed out') + raise e - # List all services in the root browse group - logger.info("<<< 1 List all services in the root browse group") - service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) - logger.info(f'SERVICES: {service_record_handles}') + # Connect to the SDP Server + sdp_client = SDP_Client(connection) + await sdp_client.connect() - # For each service in the root browse group, get all its attributes - assert len(service_record_handles) == 0 - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + # List all services in the root browse group + logger.info("<<< 1 List all services in the root browse group") + service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) + logger.info(f'SERVICES: {service_record_handles}') + + # For each service in the root browse group, get all its attributes + assert len(service_record_handles) == 0 + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + for attribute in attributes: + logger.info(f' {attribute}') + + # Install SDP Record 0 + shell.exec_command("sdp_server register_sdp 0") + + # List all services in the root browse group + logger.info("<<< 2 List all services in the root browse group") + service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) + logger.info(f'SERVICES: {service_record_handles}') + + # For each service in the root browse group, get all its attributes + assert len(service_record_handles) != 0 + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + for attribute in attributes: + logger.info(f' {attribute}') + + logger.info("<<< 3 List all attributes with L2CAP protocol") + search_result = await sdp_client.search_attributes( + [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - for attribute in attributes: - logger.info(f' {attribute}') + assert len(search_result) != 0 + logger.info('SEARCH RESULTS:') + for attribute_list in search_result: + logger.info('SERVICE:') + logger.info( + ' ' + '\n '.join([attribute.to_string() for attribute in attribute_list]) + ) - # Install SDP Record 0 - shell.exec_command("sdp_server register_sdp 0") - - # List all services in the root browse group - logger.info("<<< 2 List all services in the root browse group") - service_record_handles = await sdp_client.search_services([SDP_PUBLIC_BROWSE_ROOT]) - logger.info(f'SERVICES: {service_record_handles}') - - # For each service in the root browse group, get all its attributes - assert len(service_record_handles) != 0 - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + logger.info("<<< 4 List all attributes with OBEX protocol") + search_result = await sdp_client.search_attributes( + [BT_OBEX_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - for attribute in attributes: - logger.info(f' {attribute}') + logger.info(f'SEARCH RESULTS:{search_result}') + assert len(search_result) == 0 - logger.info("<<< 3 List all attributes with L2CAP protocol") - search_result = await sdp_client.search_attributes( - [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] - ) - assert len(search_result) != 0 - logger.info('SEARCH RESULTS:') - for attribute_list in search_result: - logger.info('SERVICE:') - logger.info(' ' + '\n '.join([attribute.to_string() for attribute in attribute_list])) - - logger.info("<<< 4 List all attributes with OBEX protocol") - search_result = await sdp_client.search_attributes( - [BT_OBEX_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] - ) - logger.info(f'SEARCH RESULTS:{search_result}') - assert len(search_result) == 0 - - logger.info("<<< 5 List all attributes with L2CAP protocol (0x1000~0xFFFF)") - search_result = await sdp_client.search_attributes( - [BT_L2CAP_PROTOCOL_ID], [(0x1000, 0xFFFF)] - ) - logger.info(f'SEARCH RESULTS:{search_result}') - assert len(search_result) == 0 - - # Install SDP Record 1 - shell.exec_command("sdp_server register_sdp 1") - - # List all services with L2CAP - logger.info("<<< 6 List all services with L2CAP protocol") - service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) - logger.info(f'SERVICES: {service_record_handles}') - - assert len(service_record_handles) != 0 - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + logger.info("<<< 5 List all attributes with L2CAP protocol (0x1000~0xFFFF)") + search_result = await sdp_client.search_attributes( + [BT_L2CAP_PROTOCOL_ID], [(0x1000, 0xFFFF)] ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - for attribute in attributes: - logger.info(f' {attribute}') + logger.info(f'SEARCH RESULTS:{search_result}') + assert len(search_result) == 0 - # Install SDP Record 2 - shell.exec_command("sdp_server register_sdp 2") + # Install SDP Record 1 + shell.exec_command("sdp_server register_sdp 1") - # List all services with L2CAP - logger.info("<<< 7 List all services with L2CAP protocol") - service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) - logger.info(f'SERVICES: {service_record_handles}') + # List all services with L2CAP + logger.info("<<< 6 List all services with L2CAP protocol") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') - assert len(service_record_handles) != 0 - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + for attribute in attributes: + logger.info(f' {attribute}') + + # Install SDP Record 2 + shell.exec_command("sdp_server register_sdp 2") + + # List all services with L2CAP + logger.info("<<< 7 List all services with L2CAP protocol") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') + + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + for attribute in attributes: + logger.info(f' {attribute}') + + # Install SDP Record all + shell.exec_command("sdp_server register_sdp_all") + + # List all services with L2CAP + logger.info("<<< 8 List all services with L2CAP protocol") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') + + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + for attribute in attributes: + logger.info(f' {attribute}') + + logger.info("<<< 9 List all attributes with L2CAP protocol") + search_result = await sdp_client.search_attributes( + [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - for attribute in attributes: - logger.info(f' {attribute}') + assert len(search_result) != 0 + logger.info('SEARCH RESULTS:') + for attribute_list in search_result: + logger.info('SERVICE:') + logger.info( + ' ' + '\n '.join([attribute.to_string() for attribute in attribute_list]) + ) - # Install SDP Record all - shell.exec_command("sdp_server register_sdp_all") + # Install SDP large Record + shell.exec_command("sdp_server register_sdp_large") - # List all services with L2CAP - logger.info("<<< 8 List all services with L2CAP protocol") - service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) - logger.info(f'SERVICES: {service_record_handles}') + logger.info("<<< 10 List all services with L2CAP protocol") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') - assert len(service_record_handles) != 0 - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] - ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - for attribute in attributes: - logger.info(f' {attribute}') + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + service_name_found = False + for attribute in attributes: + logger.info(f' {attribute}') + if attribute.id == 0x100: + service_name_found = True + if service_record_handle == service_record_handles[-1]: + assert service_name_found is False - logger.info("<<< 9 List all attributes with L2CAP protocol") - search_result = await sdp_client.search_attributes( - [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] - ) - assert len(search_result) != 0 - logger.info('SEARCH RESULTS:') - for attribute_list in search_result: - logger.info('SERVICE:') - logger.info(' ' + '\n '.join([attribute.to_string() for attribute in attribute_list])) + # Install SDP large Record + shell.exec_command("sdp_server register_sdp_large_valid") - # Install SDP large Record - shell.exec_command("sdp_server register_sdp_large") + logger.info("<<< 11 List all services with L2CAP protocol with valid service name") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') - logger.info("<<< 10 List all services with L2CAP protocol") - service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) - logger.info(f'SERVICES: {service_record_handles}') + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + service_name_found = False + for attribute in attributes: + logger.info(f' {attribute}') + if attribute.id == 0x100: + service_name_found = True + if service_record_handle == service_record_handles[-1]: + assert service_name_found is True - assert len(service_record_handles) != 0 - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] - ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - service_name_found = False - for attribute in attributes: - logger.info(f' {attribute}') - if attribute.id == 0x100: - service_name_found = True - if service_record_handle == service_record_handles[-1]: - assert service_name_found is False + # Install SDP uuid128 Record + shell.exec_command("sdp_server register_sdp_uuid128") - # Install SDP large Record - shell.exec_command("sdp_server register_sdp_large_valid") + logger.info("<<< 12 List all services with L2CAP protocol") + service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) + logger.info(f'SERVICES: {service_record_handles}') - logger.info("<<< 11 List all services with L2CAP protocol with valid service name") - service_record_handles = await sdp_client.search_services([BT_L2CAP_PROTOCOL_ID]) - logger.info(f'SERVICES: {service_record_handles}') - - assert len(service_record_handles) != 0 - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] - ) - logger.info(f'SERVICE {service_record_handle:04X} attributes:') - service_name_found = False - for attribute in attributes: - logger.info(f' {attribute}') - if attribute.id == 0x100: - service_name_found = True - if service_record_handle == service_record_handles[-1]: - assert service_name_found is True + assert len(service_record_handles) != 0 + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + logger.info(f'SERVICE {service_record_handle:04X} attributes:') + profile_found = False + for attribute in attributes: + logger.info(f' {attribute}') + # The new added service is the last one. + if ( + service_record_handle == service_record_handles[-1] + and attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + and attribute.is_uuid_in_value(BT_SERIAL_PORT_SERVICE, attribute.value) + ): + profile_found = True + if service_record_handle == service_record_handles[-1]: + assert profile_found is True class TestSdpServer: diff --git a/tests/bluetooth/classic/sdp_s/src/sdp_server.c b/tests/bluetooth/classic/sdp_s/src/sdp_server.c index 557ae207e18..dc502393d75 100644 --- a/tests/bluetooth/classic/sdp_s/src/sdp_server.c +++ b/tests/bluetooth/classic/sdp_s/src/sdp_server.c @@ -154,7 +154,7 @@ static struct bt_sdp_attribute spp_attrs_large_valid[] = { static struct bt_sdp_record spp_rec_large_valid = BT_SDP_RECORD(spp_attrs_large_valid); -#define MAX_SDP_RECORD_COUNT 8 +#define MAX_SDP_RECORD_COUNT 7 #define _SDP_ATTRS_DEFINE(index, sdp_attrs_name) \ static struct bt_sdp_attribute sdp_attrs_name##index[] = { \ @@ -296,11 +296,92 @@ static int cmd_register_sdp_large_valid(const struct shell *sh, size_t argc, cha return 0; } +uint8_t serial_port_svclass_uuid128[16] = {0x00, 0x00, 0x11, 0x01, 0x00, 0x00, 0x10, 0x00, + 0x80, 0x00, 0x00, 0x80, 0x5F, 0x9B, 0x34, 0xFB}; + +static struct bt_sdp_attribute spp_attrs_uuid128[] = { + BT_SDP_NEW_SERVICE, + BT_SDP_LIST( + BT_SDP_ATTR_SVCLASS_ID_LIST, + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 17), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE(BT_SDP_UUID128), + serial_port_svclass_uuid128 + }, + ) + ), + BT_SDP_LIST( + BT_SDP_ATTR_PROTO_DESC_LIST, + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 12), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 3), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE(BT_SDP_UUID16), + BT_SDP_ARRAY_16(BT_SDP_PROTO_L2CAP) + }, + ) + }, + { + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 5), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE(BT_SDP_UUID16), + BT_SDP_ARRAY_16(BT_SDP_PROTO_RFCOMM) + }, + { + BT_SDP_TYPE_SIZE(BT_SDP_UINT8), + BT_SDP_ARRAY_8(BT_RFCOMM_CHAN_SPP) + }, + ) + }, + ) + ), + BT_SDP_LIST( + BT_SDP_ATTR_PROFILE_DESC_LIST, + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 22), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE_VAR(BT_SDP_SEQ8, 20), + BT_SDP_DATA_ELEM_LIST( + { + BT_SDP_TYPE_SIZE(BT_SDP_UUID128), + serial_port_svclass_uuid128 + }, + { + BT_SDP_TYPE_SIZE(BT_SDP_UINT16), + BT_SDP_ARRAY_16(0x0102) + }, + ) + }, + ) + ), + BT_SDP_SERVICE_NAME("sdp_uuid128"), +}; + +static struct bt_sdp_record spp_rec_uuid128 = BT_SDP_RECORD(spp_attrs_uuid128); + +static int cmd_register_sdp_uuid128(const struct shell *sh, size_t argc, char *argv[]) +{ + int err; + + sh = sh; + + err = bt_sdp_register_service(&spp_rec_uuid128); + if (err) { + shell_error(sh, "Register SDP uuid128 failed (err %d)", err); + } + return 0; +} + SHELL_STATIC_SUBCMD_SET_CREATE(sdp_server_cmds, SHELL_CMD_ARG(register_sdp, NULL, "", cmd_register_sdp, 2, 0), SHELL_CMD_ARG(register_sdp_all, NULL, "", cmd_register_sdp_all, 1, 0), SHELL_CMD_ARG(register_sdp_large, NULL, "", cmd_register_sdp_large, 1, 0), SHELL_CMD_ARG(register_sdp_large_valid, NULL, "", cmd_register_sdp_large_valid, 1, 0), + SHELL_CMD_ARG(register_sdp_uuid128, NULL, "", cmd_register_sdp_uuid128, 1, 0), SHELL_SUBCMD_SET_END );