net: mqtt: Add MQTT 5.0 support for SUBSCRIBE/UNSUBSCRIBE

Add support for SUBSCRIBE/UNSUBSCRIBE messages specified in MQTT 5.0.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2024-12-19 16:09:09 +01:00 committed by Benjamin Cabé
commit efd795b066
5 changed files with 149 additions and 8 deletions

View file

@ -493,7 +493,7 @@ struct mqtt_publish_param {
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief List of topics in a subscription request. */
/** @brief Parameters for subscribe/unsubscribe message. */
struct mqtt_subscription_list {
/** Array containing topics along with QoS for each. */
struct mqtt_topic *list;
@ -503,6 +503,19 @@ struct mqtt_subscription_list {
/** Message id used to identify subscription request. */
uint16_t message_id;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 properties. */
struct {
/** MQTT 5.0, chapter 3.8.2.1.3 / 3.10.2.1.2 User Property. */
struct mqtt_utf8_pair user_prop[CONFIG_MQTT_USER_PROPERTIES_MAX];
/** MQTT 5.0, chapter 3.8.2.1.2 Subscription Identifier.
* Ignored for UNSUBSCRIBE requests.
*/
uint32_t subscription_identifier;
} prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/**

View file

@ -495,7 +495,7 @@ int mqtt_subscribe(struct mqtt_client *client,
goto error;
}
err_code = subscribe_encode(param, &packet);
err_code = subscribe_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}
@ -529,7 +529,7 @@ int mqtt_unsubscribe(struct mqtt_client *client,
goto error;
}
err_code = unsubscribe_encode(param, &packet);
err_code = unsubscribe_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}

View file

@ -383,6 +383,32 @@ static int encode_uint32_property(uint8_t prop, uint32_t value,
return pack_uint32(value, buf);
}
static size_t var_int_property_length(uint32_t value)
{
if (value == 0) {
return 0;
}
return sizeof(uint8_t) + (size_t)pack_variable_int(value, NULL);
}
static int encode_var_int_property(uint8_t prop, uint32_t value,
struct buf_ctx *buf)
{
int err;
if (value == 0) {
return 0;
}
err = pack_uint8(prop, buf);
if (err < 0) {
return err;
}
return pack_variable_int(value, buf);
}
static size_t string_property_length(const struct mqtt_utf8 *str)
{
if (str->size == 0) {
@ -1164,7 +1190,54 @@ int disconnect_encode(struct buf_ctx *buf)
return 0;
}
int subscribe_encode(const struct mqtt_subscription_list *param,
#if defined(CONFIG_MQTT_VERSION_5_0)
static uint32_t subscribe_properties_length(
const struct mqtt_subscription_list *param)
{
return var_int_property_length(param->prop.subscription_identifier) +
user_properties_length(param->prop.user_prop);
}
static int subscribe_properties_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
uint32_t properties_len;
int err;
/* Precalculate total properties length */
properties_len = subscribe_properties_length(param);
err = pack_variable_int(properties_len, buf);
if (err < 0) {
return err;
}
err = encode_var_int_property(MQTT_PROP_SUBSCRIPTION_IDENTIFIER,
param->prop.subscription_identifier,
buf);
if (err < 0) {
return err;
}
err = encode_user_properties(param->prop.user_prop, buf);
if (err < 0) {
return err;
}
return 0;
}
#else
static int subscribe_properties_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
ARG_UNUSED(param);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
int subscribe_encode(const struct mqtt_client *client,
const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
const uint8_t message_type = MQTT_MESSAGES_OPTIONS(
@ -1186,6 +1259,13 @@ int subscribe_encode(const struct mqtt_subscription_list *param,
return err_code;
}
if (mqtt_is_version_5_0(client)) {
err_code = subscribe_properties_encode(param, buf);
if (err_code != 0) {
return err_code;
}
}
for (i = 0; i < param->list_count; i++) {
err_code = pack_utf8_str(&param->list[i].topic, buf);
if (err_code != 0) {
@ -1201,7 +1281,46 @@ int subscribe_encode(const struct mqtt_subscription_list *param,
return mqtt_encode_fixed_header(message_type, start, buf);
}
int unsubscribe_encode(const struct mqtt_subscription_list *param,
#if defined(CONFIG_MQTT_VERSION_5_0)
static uint32_t unsubscribe_properties_length(
const struct mqtt_subscription_list *param)
{
return user_properties_length(param->prop.user_prop);
}
static int unsubscribe_properties_encode(
const struct mqtt_subscription_list *param, struct buf_ctx *buf)
{
uint32_t properties_len;
int err;
/* Precalculate total properties length */
properties_len = unsubscribe_properties_length(param);
err = pack_variable_int(properties_len, buf);
if (err < 0) {
return err;
}
err = encode_user_properties(param->prop.user_prop, buf);
if (err < 0) {
return err;
}
return 0;
}
#else
static int unsubscribe_properties_encode(
const struct mqtt_subscription_list *param, struct buf_ctx *buf)
{
ARG_UNUSED(param);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
int unsubscribe_encode(const struct mqtt_client *client,
const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
const uint8_t message_type = MQTT_MESSAGES_OPTIONS(
@ -1218,6 +1337,13 @@ int unsubscribe_encode(const struct mqtt_subscription_list *param,
return err_code;
}
if (mqtt_is_version_5_0(client)) {
err_code = unsubscribe_properties_encode(param, buf);
if (err_code != 0) {
return err_code;
}
}
for (i = 0; i < param->list_count; i++) {
err_code = pack_utf8_str(&param->list[i].topic, buf);
if (err_code != 0) {

View file

@ -305,7 +305,8 @@ int disconnect_encode(struct buf_ctx *buf);
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int subscribe_encode(const struct mqtt_subscription_list *param,
int subscribe_encode(const struct mqtt_client *client,
const struct mqtt_subscription_list *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Unsubscribe packet.
@ -318,7 +319,8 @@ int subscribe_encode(const struct mqtt_subscription_list *param,
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int unsubscribe_encode(const struct mqtt_subscription_list *param,
int unsubscribe_encode(const struct mqtt_client *client,
const struct mqtt_subscription_list *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Ping Request packet.

View file

@ -846,7 +846,7 @@ static int eval_msg_subscribe(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size;
rc = subscribe_encode(param, &buf);
rc = subscribe_encode(&client, param, &buf);
/**TESTPOINT: Check subscribe_encode function*/
zassert_false(rc, "subscribe_encode failed");