net: mqtt: Add MQTT 5.0 support for PUBLISH ACKs

Add support for PUBACK, PUBREC, PUBREL and PUBCOMP specified in MQTT
5.0. As all of these acknowledgment packets have similar format,
introduced a common encoder/decoder to handle ACK packets.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2024-12-19 16:09:08 +01:00 committed by Benjamin Cabé
commit c21e64251b
7 changed files with 357 additions and 74 deletions

View file

@ -334,28 +334,77 @@ struct mqtt_connack_param {
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief Common MQTT 5.0 properties shared across all ack-type messages. */
struct mqtt_common_ack_properties {
/** MQTT 5.0, chapter 3.4.2.2.3 User Property. */
struct mqtt_utf8_pair user_prop[CONFIG_MQTT_USER_PROPERTIES_MAX];
/** MQTT 5.0, chapter 3.4.2.2.2 Reason String. */
struct mqtt_utf8 reason_string;
/** Flags indicating whether given property was present in received packet. */
struct {
/** Reason String property was present. */
bool has_reason_string;
/** User Property property was present. */
bool has_user_prop;
} rx;
};
/** @brief Parameters for MQTT publish acknowledgment (PUBACK). */
struct mqtt_puback_param {
/** Message id of the PUBLISH message being acknowledged */
uint16_t message_id;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 reason code. */
uint8_t reason_code;
/** MQTT 5.0 properties. */
struct mqtt_common_ack_properties prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief Parameters for MQTT publish receive (PUBREC). */
struct mqtt_pubrec_param {
/** Message id of the PUBLISH message being acknowledged */
uint16_t message_id;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 reason code. */
uint8_t reason_code;
/** MQTT 5.0 properties. */
struct mqtt_common_ack_properties prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief Parameters for MQTT publish release (PUBREL). */
struct mqtt_pubrel_param {
/** Message id of the PUBREC message being acknowledged */
uint16_t message_id;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 reason code. */
uint8_t reason_code;
/** MQTT 5.0 properties. */
struct mqtt_common_ack_properties prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief Parameters for MQTT publish complete (PUBCOMP). */
struct mqtt_pubcomp_param {
/** Message id of the PUBREL message being acknowledged */
uint16_t message_id;
#if defined(CONFIG_MQTT_VERSION_5_0)
/** MQTT 5.0 reason code. */
uint8_t reason_code;
/** MQTT 5.0 properties. */
struct mqtt_common_ack_properties prop;
#endif /* CONFIG_MQTT_VERSION_5_0 */
};
/** @brief Parameters for MQTT subscription acknowledgment (SUBACK). */

View file

@ -309,7 +309,7 @@ int mqtt_publish_qos1_ack(struct mqtt_client *client,
goto error;
}
err_code = publish_ack_encode(param, &packet);
err_code = publish_ack_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}
@ -346,7 +346,7 @@ int mqtt_publish_qos2_receive(struct mqtt_client *client,
goto error;
}
err_code = publish_receive_encode(param, &packet);
err_code = publish_receive_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}
@ -383,7 +383,7 @@ int mqtt_publish_qos2_release(struct mqtt_client *client,
goto error;
}
err_code = publish_release_encode(param, &packet);
err_code = publish_release_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}
@ -420,7 +420,7 @@ int mqtt_publish_qos2_complete(struct mqtt_client *client,
goto error;
}
err_code = publish_complete_encode(param, &packet);
err_code = publish_complete_encode(client, param, &packet);
if (err_code < 0) {
goto error;
}

View file

@ -810,25 +810,132 @@ int publish_decode(const struct mqtt_client *client, uint8_t flags,
return 0;
}
int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param)
#if defined(CONFIG_MQTT_VERSION_5_0)
static int common_ack_properties_decode(struct buf_ctx *buf,
struct mqtt_common_ack_properties *prop)
{
return unpack_uint16(buf, &param->message_id);
struct property_decoder prop_dec[] = {
{
&prop->reason_string,
&prop->rx.has_reason_string,
MQTT_PROP_REASON_STRING
},
{
&prop->user_prop,
&prop->rx.has_user_prop,
MQTT_PROP_USER_PROPERTY
}
};
return properties_decode(prop_dec, ARRAY_SIZE(prop_dec), buf);
}
#else
static int common_ack_properties_decode(struct buf_ctx *buf,
struct mqtt_common_ack_properties *prop)
{
ARG_UNUSED(prop);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
static int common_pub_ack_decode(struct buf_ctx *buf, uint16_t *message_id,
uint8_t *reason_code,
struct mqtt_common_ack_properties *prop)
{
size_t remaining_len;
int err;
err = unpack_uint16(buf, message_id);
if (err < 0) {
return err;
}
int publish_receive_decode(struct buf_ctx *buf, struct mqtt_pubrec_param *param)
{
return unpack_uint16(buf, &param->message_id);
remaining_len = buf->end - buf->cur;
/* For MQTT < 5.0 properties are NULL. */
if (prop != NULL && reason_code != NULL) {
if (remaining_len > 0) {
err = unpack_uint8(buf, reason_code);
if (err < 0) {
return err;
}
}
int publish_release_decode(struct buf_ctx *buf, struct mqtt_pubrel_param *param)
{
return unpack_uint16(buf, &param->message_id);
if (remaining_len > 1) {
err = common_ack_properties_decode(buf, prop);
if (err < 0) {
return err;
}
}
}
int publish_complete_decode(struct buf_ctx *buf,
return 0;
}
int publish_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_puback_param *param)
{
struct mqtt_common_ack_properties *prop = NULL;
uint8_t *reason_code = NULL;
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = &param->reason_code;
}
#endif
return common_pub_ack_decode(buf, &param->message_id, reason_code, prop);
}
int publish_receive_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubrec_param *param)
{
struct mqtt_common_ack_properties *prop = NULL;
uint8_t *reason_code = NULL;
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = &param->reason_code;
}
#endif
return common_pub_ack_decode(buf, &param->message_id, reason_code, prop);
}
int publish_release_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubrel_param *param)
{
struct mqtt_common_ack_properties *prop = NULL;
uint8_t *reason_code = NULL;
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = &param->reason_code;
}
#endif
return common_pub_ack_decode(buf, &param->message_id, reason_code, prop);
}
int publish_complete_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubcomp_param *param)
{
return unpack_uint16(buf, &param->message_id);
struct mqtt_common_ack_properties *prop = NULL;
uint8_t *reason_code = NULL;
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = &param->reason_code;
}
#endif
return common_ack_decode(buf, &param->message_id, reason_code, prop);
}
int subscribe_ack_decode(struct buf_ctx *buf, struct mqtt_suback_param *param)

View file

@ -242,40 +242,6 @@ static int zero_len_str_encode(struct buf_ctx *buf)
return pack_uint16(0x0000, buf);
}
/**
* @brief Encodes and sends messages that contain only message id in
* the variable header.
*
* @param[in] message_type Message type and reserved bit fields.
* @param[in] message_id Message id to be encoded in the variable header.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
*
* @retval 0 or an error code indicating a reason for failure.
*/
static int mqtt_message_id_only_enc(uint8_t message_type, uint16_t message_id,
struct buf_ctx *buf)
{
int err_code;
uint8_t *start;
/* Message id zero is not permitted by spec. */
if (message_id == 0U) {
return -EINVAL;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_uint16(message_id, buf);
if (err_code != 0) {
return err_code;
}
return mqtt_encode_fixed_header(message_type, start, buf);
}
#if defined(CONFIG_MQTT_VERSION_5_0)
/**
* @brief Packs unsigned 32 bit value to the buffer at the offset requested.
@ -998,40 +964,189 @@ int publish_encode(const struct mqtt_client *client,
return 0;
}
int publish_ack_encode(const struct mqtt_puback_param *param,
#if defined(CONFIG_MQTT_VERSION_5_0)
static uint32_t common_ack_properties_length(
const struct mqtt_common_ack_properties *prop)
{
return user_properties_length(prop->user_prop) +
string_property_length(&prop->reason_string);
}
static int common_ack_properties_encode(
const struct mqtt_common_ack_properties *prop,
struct buf_ctx *buf)
{
uint32_t properties_len;
int err;
/* Precalculate total properties length */
properties_len = common_ack_properties_length(prop);
/* Properties length can be omitted if equal to 0. */
if (properties_len == 0) {
return 0;
}
err = pack_variable_int(properties_len, buf);
if (err < 0) {
return err;
}
err = encode_user_properties(prop->user_prop, buf);
if (err < 0) {
return err;
}
err = encode_string_property(MQTT_PROP_REASON_STRING,
&prop->reason_string, buf);
if (err < 0) {
return err;
}
return 0;
}
#else /* CONFIG_MQTT_VERSION_5_0 */
static uint32_t common_ack_properties_length(
const struct mqtt_common_ack_properties *prop)
{
return 0;
}
static int common_ack_properties_encode(
const struct mqtt_common_ack_properties *prop,
struct buf_ctx *buf)
{
ARG_UNUSED(prop);
ARG_UNUSED(buf);
return -ENOTSUP;
}
#endif /* CONFIG_MQTT_VERSION_5_0 */
static int common_ack_encode(
uint8_t message_type, uint16_t message_id, uint8_t reason_code,
const struct mqtt_common_ack_properties *prop,
struct buf_ctx *buf)
{
int err_code;
uint8_t *start;
/* Message id zero is not permitted by spec. */
if (message_id == 0U) {
return -EINVAL;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_uint16(message_id, buf);
if (err_code != 0) {
return err_code;
}
/* For MQTT < 5.0 properties are NULL. */
if (prop != NULL) {
/* The Reason Code and Property Length can be omitted if the
* Reason Code is 0x00 (Success) and there are no Properties.
*/
if (common_ack_properties_length(prop) == 0 &&
reason_code == 0) {
goto out;
}
err_code = pack_uint8(reason_code, buf);
if (err_code != 0) {
return err_code;
}
err_code = common_ack_properties_encode(prop, buf);
if (err_code != 0) {
return err_code;
}
}
out:
return mqtt_encode_fixed_header(message_type, start, buf);
}
int publish_ack_encode(const struct mqtt_client *client,
const struct mqtt_puback_param *param,
struct buf_ctx *buf)
{
const uint8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBACK, 0, 0, 0);
const struct mqtt_common_ack_properties *prop = NULL;
uint8_t reason_code = 0;
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = param->reason_code;
}
#endif
return common_ack_encode(message_type, param->message_id,
reason_code, prop, buf);
}
int publish_receive_encode(const struct mqtt_pubrec_param *param,
int publish_receive_encode(const struct mqtt_client *client,
const struct mqtt_pubrec_param *param,
struct buf_ctx *buf)
{
const uint8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREC, 0, 0, 0);
const struct mqtt_common_ack_properties *prop = NULL;
uint8_t reason_code = 0;
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = param->reason_code;
}
#endif
return common_ack_encode(message_type, param->message_id,
reason_code, prop, buf);
}
int publish_release_encode(const struct mqtt_pubrel_param *param,
int publish_release_encode(const struct mqtt_client *client,
const struct mqtt_pubrel_param *param,
struct buf_ctx *buf)
{
const uint8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREL, 0, 1, 0);
const struct mqtt_common_ack_properties *prop = NULL;
uint8_t reason_code = 0;
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = param->reason_code;
}
#endif
return common_ack_encode(message_type, param->message_id,
reason_code, prop, buf);
}
int publish_complete_encode(const struct mqtt_pubcomp_param *param,
int publish_complete_encode(const struct mqtt_client *client,
const struct mqtt_pubcomp_param *param,
struct buf_ctx *buf)
{
const uint8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBCOMP, 0, 0, 0);
const struct mqtt_common_ack_properties *prop = NULL;
uint8_t reason_code = 0;
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
#if defined(CONFIG_MQTT_VERSION_5_0)
if (mqtt_is_version_5_0(client)) {
prop = &param->prop;
reason_code = param->reason_code;
}
#endif
return common_ack_encode(message_type, param->message_id,
reason_code, prop, buf);
}
int disconnect_encode(struct buf_ctx *buf)

View file

@ -238,7 +238,8 @@ int publish_encode(const struct mqtt_client *client,
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_ack_encode(const struct mqtt_puback_param *param,
int publish_ack_encode(const struct mqtt_client *client,
const struct mqtt_puback_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Receive packet.
@ -251,7 +252,8 @@ int publish_ack_encode(const struct mqtt_puback_param *param,
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_receive_encode(const struct mqtt_pubrec_param *param,
int publish_receive_encode(const struct mqtt_client *client,
const struct mqtt_pubrec_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Release packet.
@ -264,7 +266,8 @@ int publish_receive_encode(const struct mqtt_pubrec_param *param,
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_release_encode(const struct mqtt_pubrel_param *param,
int publish_release_encode(const struct mqtt_client *client,
const struct mqtt_pubrel_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Complete packet.
@ -277,7 +280,8 @@ int publish_release_encode(const struct mqtt_pubrel_param *param,
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_complete_encode(const struct mqtt_pubcomp_param *param,
int publish_complete_encode(const struct mqtt_client *client,
const struct mqtt_pubcomp_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Disconnect packet.
@ -369,45 +373,50 @@ int publish_decode(const struct mqtt_client *client, uint8_t flags,
/**@brief Decode MQTT Publish Ack packet.
*
* @param[in] MQTT client for which packet is decoded.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Ack parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param);
int publish_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_puback_param *param);
/**@brief Decode MQTT Publish Receive packet.
*
* @param[in] MQTT client for which packet is decoded.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Receive parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_receive_decode(struct buf_ctx *buf,
int publish_receive_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubrec_param *param);
/**@brief Decode MQTT Publish Release packet.
*
* @param[in] MQTT client for which packet is decoded.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Release parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_release_decode(struct buf_ctx *buf,
int publish_release_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubrel_param *param);
/**@brief Decode MQTT Publish Complete packet.
*
* @param[in] MQTT client for which packet is decoded.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Complete parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_complete_decode(struct buf_ctx *buf,
int publish_complete_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_pubcomp_param *param);
/**@brief Decode MQTT Subscribe packet.

View file

@ -78,7 +78,7 @@ static int mqtt_handle_packet(struct mqtt_client *client,
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
evt.type = MQTT_EVT_PUBACK;
err_code = publish_ack_decode(buf, &evt.param.puback);
err_code = publish_ack_decode(client, buf, &evt.param.puback);
evt.result = err_code;
break;
@ -86,7 +86,8 @@ static int mqtt_handle_packet(struct mqtt_client *client,
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
evt.type = MQTT_EVT_PUBREC;
err_code = publish_receive_decode(buf, &evt.param.pubrec);
err_code = publish_receive_decode(client, buf,
&evt.param.pubrec);
evt.result = err_code;
break;
@ -94,7 +95,8 @@ static int mqtt_handle_packet(struct mqtt_client *client,
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
evt.type = MQTT_EVT_PUBREL;
err_code = publish_release_decode(buf, &evt.param.pubrel);
err_code = publish_release_decode(client, buf,
&evt.param.pubrel);
evt.result = err_code;
break;
@ -102,7 +104,8 @@ static int mqtt_handle_packet(struct mqtt_client *client,
NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
evt.type = MQTT_EVT_PUBCOMP;
err_code = publish_complete_decode(buf, &evt.param.pubcomp);
err_code = publish_complete_decode(client, buf,
&evt.param.pubcomp);
evt.result = err_code;
break;

View file

@ -927,7 +927,7 @@ static int eval_msg_puback(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size;
rc = publish_ack_encode(param, &buf);
rc = publish_ack_encode(&client, param, &buf);
/**TESTPOINTS: Check publish_ack_encode functions*/
zassert_false(rc, "publish_ack_encode failed");
@ -940,7 +940,7 @@ static int eval_msg_puback(struct mqtt_test *mqtt_test)
zassert_false(rc, "fixed_header_decode failed");
rc = publish_ack_decode(&buf, &dec_param);
rc = publish_ack_decode(&client, &buf, &dec_param);
zassert_false(rc, "publish_ack_decode failed");
@ -965,7 +965,7 @@ static int eval_msg_pubcomp(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size;
rc = publish_complete_encode(param, &buf);
rc = publish_complete_encode(&client, param, &buf);
/**TESTPOINTS: Check publish_complete_encode functions*/
zassert_false(rc, "publish_complete_encode failed");
@ -978,7 +978,7 @@ static int eval_msg_pubcomp(struct mqtt_test *mqtt_test)
zassert_false(rc, "fixed_header_decode failed");
rc = publish_complete_decode(&buf, &dec_param);
rc = publish_complete_decode(&client, &buf, &dec_param);
zassert_false(rc, "publish_complete_decode failed");
@ -1003,7 +1003,7 @@ static int eval_msg_pubrec(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size;
rc = publish_receive_encode(param, &buf);
rc = publish_receive_encode(&client, param, &buf);
/**TESTPOINTS: Check publish_receive_encode functions*/
zassert_false(rc, "publish_receive_encode failed");
@ -1016,7 +1016,7 @@ static int eval_msg_pubrec(struct mqtt_test *mqtt_test)
zassert_false(rc, "fixed_header_decode failed");
rc = publish_receive_decode(&buf, &dec_param);
rc = publish_receive_decode(&client, &buf, &dec_param);
zassert_false(rc, "publish_receive_decode failed");
@ -1041,7 +1041,7 @@ static int eval_msg_pubrel(struct mqtt_test *mqtt_test)
buf.cur = client.tx_buf;
buf.end = client.tx_buf + client.tx_buf_size;
rc = publish_release_encode(param, &buf);
rc = publish_release_encode(&client, param, &buf);
/**TESTPOINTS: Check publish_release_encode functions*/
zassert_false(rc, "publish_release_encode failed");
@ -1054,7 +1054,7 @@ static int eval_msg_pubrel(struct mqtt_test *mqtt_test)
zassert_false(rc, "fixed_header_decode failed");
rc = publish_release_decode(&buf, &dec_param);
rc = publish_release_decode(&client, &buf, &dec_param);
zassert_false(rc, "publish_release_decode failed");