net: mqtt: Add MQTT 5.0 support for PUBLISH

Add support for PUBLISH message specified in MQTT 5.0. The message
encoder and decoder were updated to support MQTT properties.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2024-12-19 16:09:07 +01:00 committed by Benjamin Cabé
commit 15ad90aceb
8 changed files with 332 additions and 24 deletions

View file

@ -393,6 +393,55 @@ struct mqtt_publish_param {
* by the broker. * by the broker.
*/ */
uint8_t retain_flag : 1; uint8_t retain_flag : 1;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 properties. */
struct {
/** MQTT 5.0, chapter 3.3.2.3.7 User Property. */
struct mqtt_utf8_pair user_prop[CONFIG_MQTT_USER_PROPERTIES_MAX];
/** MQTT 5.0, chapter 3.3.2.3.5 Response Topic. */
struct mqtt_utf8 response_topic;
/** MQTT 5.0, chapter 3.3.2.3.6 Correlation Data. */
struct mqtt_binstr correlation_data;
/** MQTT 5.0, chapter 3.3.2.3.9 Content Type. */
struct mqtt_utf8 content_type;
/** MQTT 5.0, chapter 3.3.2.3.8 Subscription Identifier. */
uint32_t subscription_identifier[CONFIG_MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX];
/** MQTT 5.0, chapter 3.3.2.3.3 Message Expiry Interval. */
uint32_t message_expiry_interval;
/** MQTT 5.0, chapter 3.3.2.3.4 Topic Alias. */
uint16_t topic_alias;
/** MQTT 5.0, chapter 3.3.2.3.2 Payload Format Indicator. */
uint8_t payload_format_indicator;
/** Flags indicating whether given property was present in received packet. */
struct {
/** Payload Format Indicator property was present. */
bool has_payload_format_indicator;
/** Message Expiry Interval property was present. */
bool has_message_expiry_interval;
/** Topic Alias property was present. */
bool has_topic_alias;
/** Response Topic property was present. */
bool has_response_topic;
/** Correlation Data property was present. */
bool has_correlation_data;
/** User Property property was present. */
bool has_user_prop;
/** Subscription Identifier property was present. */
bool has_subscription_identifier;
/** Content Type property was present. */
bool has_content_type;
} rx;
} prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
}; };
/** @brief List of topics in a subscription request. */ /** @brief List of topics in a subscription request. */

View file

@ -91,6 +91,14 @@ config MQTT_USER_PROPERTIES_MAX
Maximum number of user properties that the client can include in a Maximum number of user properties that the client can include in a
packet or parse on input. packet or parse on input.
config MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX
int "Maximum number of Subscription ID properties in a Publish packet"
default 1
range 1 32
help
Maximum number of Subscription ID properties that the client can
parse when processing Publish message.
config MQTT_TOPIC_ALIAS_MAX config MQTT_TOPIC_ALIAS_MAX
int "Maximum number of supported topic aliases" int "Maximum number of supported topic aliases"
default 5 default 5

View file

@ -262,7 +262,7 @@ int mqtt_publish(struct mqtt_client *client,
goto error; goto error;
} }
err_code = publish_encode(param, &packet); err_code = publish_encode(client, param, &packet);
if (err_code < 0) { if (err_code < 0) {
goto error; goto error;
} }

View file

@ -155,19 +155,7 @@ static int unpack_raw_data(uint32_t length, struct buf_ctx *buf,
return 0; return 0;
} }
/** int unpack_variable_int(struct buf_ctx *buf, uint32_t *val)
* @brief Unpacks variable length integer from the buffer from the offset
* requested.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] val Memory where the value is to be unpacked.
*
* @retval Number of bytes parsed if the procedure is successful.
* @retval -EINVAL if the length decoding would use more that 4 bytes.
* @retval -EAGAIN if the buffer would be exceeded during the read.
*/
static int unpack_variable_int(struct buf_ctx *buf, uint32_t *val)
{ {
uint8_t shift = 0U; uint8_t shift = 0U;
int bytes = 0; int bytes = 0;
@ -435,6 +423,43 @@ int decode_user_property(struct property_decoder *prop,
return 0; return 0;
} }
int decode_sub_id_property(struct property_decoder *prop,
uint32_t *remaining_len,
struct buf_ctx *buf)
{
uint32_t *sub_id_array = prop->data;
uint32_t *chosen = NULL;
uint32_t value;
int bytes;
bytes = unpack_variable_int(buf, &value);
if (bytes < 0) {
return -EINVAL;
}
if (*remaining_len < bytes) {
return -EINVAL;
}
*remaining_len -= bytes;
*prop->found = true;
for (int i = 0; i < CONFIG_MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX; i++) {
if (sub_id_array[i] == 0) {
chosen = &sub_id_array[i];
break;
}
}
if (chosen == NULL) {
NET_DBG("Cannot parse all subscription id properties, ignore excess");
} else {
*chosen = value;
}
return 0;
}
static int properties_decode(struct property_decoder *prop, uint8_t cnt, static int properties_decode(struct property_decoder *prop, uint8_t cnt,
struct buf_ctx *buf) struct buf_ctx *buf)
{ {
@ -479,12 +504,14 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
switch (type) { switch (type) {
case MQTT_PROP_SESSION_EXPIRY_INTERVAL: case MQTT_PROP_SESSION_EXPIRY_INTERVAL:
case MQTT_PROP_MAXIMUM_PACKET_SIZE: case MQTT_PROP_MAXIMUM_PACKET_SIZE:
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
err = decode_uint32_property(current_prop, err = decode_uint32_property(current_prop,
&properties_len, buf); &properties_len, buf);
break; break;
case MQTT_PROP_RECEIVE_MAXIMUM: case MQTT_PROP_RECEIVE_MAXIMUM:
case MQTT_PROP_TOPIC_ALIAS_MAXIMUM: case MQTT_PROP_TOPIC_ALIAS_MAXIMUM:
case MQTT_PROP_SERVER_KEEP_ALIVE: case MQTT_PROP_SERVER_KEEP_ALIVE:
case MQTT_PROP_TOPIC_ALIAS:
err = decode_uint16_property(current_prop, err = decode_uint16_property(current_prop,
&properties_len, buf); &properties_len, buf);
break; break;
@ -493,6 +520,7 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
case MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE: case MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE:
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE: case MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE: case MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE:
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
err = decode_uint8_property(current_prop, err = decode_uint8_property(current_prop,
&properties_len, buf); &properties_len, buf);
break; break;
@ -501,6 +529,8 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
case MQTT_PROP_RESPONSE_INFORMATION: case MQTT_PROP_RESPONSE_INFORMATION:
case MQTT_PROP_SERVER_REFERENCE: case MQTT_PROP_SERVER_REFERENCE:
case MQTT_PROP_AUTHENTICATION_METHOD: case MQTT_PROP_AUTHENTICATION_METHOD:
case MQTT_PROP_RESPONSE_TOPIC:
case MQTT_PROP_CONTENT_TYPE:
err = decode_string_property(current_prop, err = decode_string_property(current_prop,
&properties_len, buf); &properties_len, buf);
break; break;
@ -509,9 +539,14 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
&properties_len, buf); &properties_len, buf);
break; break;
case MQTT_PROP_AUTHENTICATION_DATA: case MQTT_PROP_AUTHENTICATION_DATA:
case MQTT_PROP_CORRELATION_DATA:
err = decode_binary_property(current_prop, err = decode_binary_property(current_prop,
&properties_len, buf); &properties_len, buf);
break; break;
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
err = decode_sub_id_property(current_prop,
&properties_len, buf);
break;
default: default:
err = -ENOTSUP; err = -ENOTSUP;
} }
@ -665,7 +700,68 @@ out:
return 0; return 0;
} }
int publish_decode(uint8_t flags, uint32_t var_length, struct buf_ctx *buf, #if defined(CONFIG_MQTT_VERSION_5_0)
static int publish_properties_decode(struct buf_ctx *buf,
struct mqtt_publish_param *param)
{
struct property_decoder prop[] = {
{
&param->prop.payload_format_indicator,
&param->prop.rx.has_payload_format_indicator,
MQTT_PROP_PAYLOAD_FORMAT_INDICATOR
},
{
&param->prop.message_expiry_interval,
&param->prop.rx.has_message_expiry_interval,
MQTT_PROP_MESSAGE_EXPIRY_INTERVAL
},
{
&param->prop.topic_alias,
&param->prop.rx.has_topic_alias,
MQTT_PROP_TOPIC_ALIAS
},
{
&param->prop.response_topic,
&param->prop.rx.has_response_topic,
MQTT_PROP_RESPONSE_TOPIC
},
{
&param->prop.correlation_data,
&param->prop.rx.has_correlation_data,
MQTT_PROP_CORRELATION_DATA
},
{
&param->prop.user_prop,
&param->prop.rx.has_user_prop,
MQTT_PROP_USER_PROPERTY
},
{
&param->prop.subscription_identifier,
&param->prop.rx.has_subscription_identifier,
MQTT_PROP_SUBSCRIPTION_IDENTIFIER
},
{
&param->prop.content_type,
&param->prop.rx.has_content_type,
MQTT_PROP_CONTENT_TYPE
}
};
return properties_decode(prop, ARRAY_SIZE(prop), buf);
}
#else
static int publish_properties_decode(struct buf_ctx *buf,
struct mqtt_publish_param *param)
{
ARG_UNUSED(param);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
int publish_decode(const struct mqtt_client *client, uint8_t flags,
uint32_t var_length, struct buf_ctx *buf,
struct mqtt_publish_param *param) struct mqtt_publish_param *param)
{ {
int err_code; int err_code;
@ -691,6 +787,16 @@ int publish_decode(uint8_t flags, uint32_t var_length, struct buf_ctx *buf,
var_header_length += sizeof(uint16_t); var_header_length += sizeof(uint16_t);
} }
if (mqtt_is_version_5_0(client)) {
err_code = publish_properties_decode(buf, param);
if (err_code < 0) {
return err_code;
}
/* Add parsed properties length */
var_header_length += err_code;
}
if (var_length < var_header_length) { if (var_length < var_header_length) {
NET_ERR("Corrupted PUBLISH message, header length (%u) larger " NET_ERR("Corrupted PUBLISH message, header length (%u) larger "
"than total length (%u)", var_header_length, "than total length (%u)", var_header_length,

View file

@ -862,7 +862,92 @@ int connect_request_encode(const struct mqtt_client *client,
return mqtt_encode_fixed_header(message_type, start, buf); return mqtt_encode_fixed_header(message_type, start, buf);
} }
int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf) #if defined(CONFIG_MQTT_VERSION_5_0)
static uint32_t publish_properties_length(const struct mqtt_publish_param *param)
{
return uint8_property_length(MQTT_PROP_PAYLOAD_FORMAT_INDICATOR,
param->prop.payload_format_indicator) +
uint32_property_length(param->prop.message_expiry_interval) +
uint16_property_length(param->prop.topic_alias) +
string_property_length(&param->prop.response_topic) +
binary_property_length(&param->prop.correlation_data) +
user_properties_length(param->prop.user_prop) +
/* Client does not include Subscription Identifier in any case. */
string_property_length(&param->prop.content_type);
}
static int publish_properties_encode(const struct mqtt_publish_param *param,
struct buf_ctx *buf)
{
uint32_t properties_len;
int err;
/* Precalculate total properties length */
properties_len = publish_properties_length(param);
err = pack_variable_int(properties_len, buf);
if (err < 0) {
return err;
}
err = encode_uint8_property(MQTT_PROP_PAYLOAD_FORMAT_INDICATOR,
param->prop.payload_format_indicator, buf);
if (err < 0) {
return err;
}
err = encode_uint32_property(MQTT_PROP_MESSAGE_EXPIRY_INTERVAL,
param->prop.message_expiry_interval, buf);
if (err < 0) {
return err;
}
err = encode_uint16_property(MQTT_PROP_TOPIC_ALIAS,
param->prop.topic_alias, buf);
if (err < 0) {
return err;
}
err = encode_string_property(MQTT_PROP_RESPONSE_TOPIC,
&param->prop.response_topic, buf);
if (err < 0) {
return err;
}
err = encode_binary_property(MQTT_PROP_CORRELATION_DATA,
&param->prop.correlation_data, buf);
if (err < 0) {
return err;
}
err = encode_user_properties(param->prop.user_prop, buf);
if (err < 0) {
return err;
}
/* Client does not include Subscription Identifier in any case. */
err = encode_string_property(MQTT_PROP_CONTENT_TYPE,
&param->prop.content_type, buf);
if (err < 0) {
return err;
}
return 0;
}
#else
static int publish_properties_encode(const struct mqtt_publish_param *param,
struct buf_ctx *buf)
{
ARG_UNUSED(param);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
int publish_encode(const struct mqtt_client *client,
const struct mqtt_publish_param *param,
struct buf_ctx *buf)
{ {
const uint8_t message_type = MQTT_MESSAGES_OPTIONS( const uint8_t message_type = MQTT_MESSAGES_OPTIONS(
MQTT_PKT_TYPE_PUBLISH, param->dup_flag, MQTT_PKT_TYPE_PUBLISH, param->dup_flag,
@ -891,6 +976,13 @@ int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf)
} }
} }
if (mqtt_is_version_5_0(client)) {
err_code = publish_properties_encode(param, buf);
if (err_code != 0) {
return err_code;
}
}
/* Do not copy payload. We move the buffer pointer to ensure that /* Do not copy payload. We move the buffer pointer to ensure that
* message length in fixed header is encoded correctly. * message length in fixed header is encoded correctly.
*/ */

View file

@ -224,7 +224,9 @@ int connect_request_encode(const struct mqtt_client *client,
* *
* @return 0 if the procedure is successful, an error code otherwise. * @return 0 if the procedure is successful, an error code otherwise.
*/ */
int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf); int publish_encode(const struct mqtt_client *client,
const struct mqtt_publish_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Ack packet. /**@brief Constructs/encodes Publish Ack packet.
* *
@ -352,6 +354,7 @@ int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
/**@brief Decode MQTT Publish packet. /**@brief Decode MQTT Publish packet.
* *
* @param[in] MQTT client for which packet is decoded.
* @param[in] flags Byte containing message type and flags. * @param[in] flags Byte containing message type and flags.
* @param[in] var_length Length of the variable part of the message. * @param[in] var_length Length of the variable part of the message.
* @param[inout] buf A pointer to the buf_ctx structure containing current * @param[inout] buf A pointer to the buf_ctx structure containing current
@ -360,7 +363,8 @@ int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
* *
* @return 0 if the procedure is successful, an error code otherwise. * @return 0 if the procedure is successful, an error code otherwise.
*/ */
int publish_decode(uint8_t flags, uint32_t var_length, struct buf_ctx *buf, int publish_decode(const struct mqtt_client *client, uint8_t flags,
uint32_t var_length, struct buf_ctx *buf,
struct mqtt_publish_param *param); struct mqtt_publish_param *param);
/**@brief Decode MQTT Publish Ack packet. /**@brief Decode MQTT Publish Ack packet.
@ -428,6 +432,20 @@ int subscribe_ack_decode(struct buf_ctx *buf,
int unsubscribe_ack_decode(struct buf_ctx *buf, int unsubscribe_ack_decode(struct buf_ctx *buf,
struct mqtt_unsuback_param *param); struct mqtt_unsuback_param *param);
/**
* @brief Unpacks variable length integer from the buffer from the offset
* requested.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] val Memory where the value is to be unpacked.
*
* @retval Number of bytes parsed if the procedure is successful.
* @retval -EINVAL if the length decoding would use more that 4 bytes.
* @retval -EAGAIN if the buffer would be exceeded during the read.
*/
int unpack_variable_int(struct buf_ctx *buf, uint32_t *val);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View file

@ -60,8 +60,8 @@ static int mqtt_handle_packet(struct mqtt_client *client,
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client); NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
evt.type = MQTT_EVT_PUBLISH; evt.type = MQTT_EVT_PUBLISH;
err_code = publish_decode(type_and_flags, var_length, buf, err_code = publish_decode(client, type_and_flags, var_length,
&evt.param.publish); buf, &evt.param.publish);
evt.result = err_code; evt.result = err_code;
client->internal.remaining_payload = client->internal.remaining_payload =
@ -221,7 +221,42 @@ static int mqtt_read_publish_var_header(struct mqtt_client *client,
variable_header_length += sizeof(uint16_t); variable_header_length += sizeof(uint16_t);
} }
/* Now we can read the whole header. */ if (mqtt_is_version_5_0(client)) {
struct buf_ctx backup;
uint8_t var_len = 1;
uint32_t prop_len = 0;
while (true) {
err_code = mqtt_read_message_chunk(
client, buf, variable_header_length + var_len);
if (err_code < 0) {
return err_code;
}
backup = *buf;
buf->cur += variable_header_length;
/* Try to decode variable integer, in case integer is
* not complete, read more bytes from the stream and retry.
*/
err_code = unpack_variable_int(buf, &prop_len);
if (err_code >= 0) {
break;
}
if (err_code != -EAGAIN) {
return err_code;
}
/* Try again. */
var_len++;
*buf = backup;
}
*buf = backup;
variable_header_length += var_len + prop_len;
}
err_code = mqtt_read_message_chunk(client, buf, err_code = mqtt_read_message_chunk(client, buf,
variable_header_length); variable_header_length);
if (err_code < 0) { if (err_code < 0) {

View file

@ -773,7 +773,7 @@ static int eval_msg_publish(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf; buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size; buf.end = client.tx_buf + client.tx_buf_size;
rc = publish_encode(param, &buf); rc = publish_encode(&client, param, &buf);
/* Payload is not copied, copy it manually just after the header.*/ /* Payload is not copied, copy it manually just after the header.*/
memcpy(buf.end, param->message.payload.data, memcpy(buf.end, param->message.payload.data,
@ -791,7 +791,7 @@ static int eval_msg_publish(struct mqtt_test *mqtt_test)
zassert_false(rc, "fixed_header_decode failed"); zassert_false(rc, "fixed_header_decode failed");
rc = publish_decode(type_and_flags, length, &buf, &dec_param); rc = publish_decode(&client, type_and_flags, length, &buf, &dec_param);
/**TESTPOINT: Check publish_decode function*/ /**TESTPOINT: Check publish_decode function*/
zassert_false(rc, "publish_decode failed"); zassert_false(rc, "publish_decode failed");
@ -830,7 +830,7 @@ static int eval_msg_corrupted_publish(struct mqtt_test *mqtt_test)
rc = fixed_header_decode(buf, &type_and_flags, &length); rc = fixed_header_decode(buf, &type_and_flags, &length);
zassert_equal(rc, 0, "fixed_header_decode failed"); zassert_equal(rc, 0, "fixed_header_decode failed");
rc = publish_decode(type_and_flags, length, buf, &dec_param); rc = publish_decode(&client, type_and_flags, length, buf, &dec_param);
zassert_equal(rc, -EINVAL, "publish_decode should fail"); zassert_equal(rc, -EINVAL, "publish_decode should fail");
return TC_PASS; return TC_PASS;