diff --git a/include/net/mqtt.h b/include/net/mqtt.h index 3fb4c2ae424..28603edc11c 100644 --- a/include/net/mqtt.h +++ b/include/net/mqtt.h @@ -345,7 +345,7 @@ int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx, /** * @brief mqtt_rx_puback Parses and validates the MQTT PUBACK message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -354,7 +354,7 @@ int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_pubcomp Parses and validates the MQTT PUBCOMP message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -363,7 +363,7 @@ int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_pubrec Parses and validates the MQTT PUBREC message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -373,7 +373,7 @@ int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx); * @brief mqtt_rx_pubrel Parses and validates the MQTT PUBREL message * @details rx is an RX buffer from the IP stack * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -382,7 +382,7 @@ int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_pingresp Parses the MQTT PINGRESP message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -391,7 +391,7 @@ int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_suback Parses the MQTT SUBACK message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -400,7 +400,7 @@ int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_unsuback Parses the MQTT UNSUBACK message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error */ @@ -409,7 +409,7 @@ int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx); /** * @brief mqtt_rx_publish Parses the MQTT PUBLISH message * @param [in] ctx MQTT context structure - * @param [in] rx RX buffer from the IP stack + * @param [in] rx Data buffer * @return 0 on success * @return -EINVAL on error * @return -ENOMEM if no data buffer is available diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index 6ac7d2366a4..e4dabecb309 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -31,15 +31,7 @@ */ NET_BUF_POOL_DEFINE(mqtt_msg_pool, MQTT_BUF_CTR, MSG_SIZE, 0, NULL); -int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type) -{ - ctx->app_type = app_type; - - /* So far, only clean session = 1 is supported */ - ctx->clean_session = 1; - - return 0; -} +#define MQTT_PUBLISHER_MIN_MSG_SIZE 2 int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg) { @@ -393,12 +385,10 @@ int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx, int clean_session) uint8_t *data; int rc; - data = net_nbuf_appdata(rx); - len = net_nbuf_appdatalen(rx); + data = rx->data; + len = rx->len; - /* CONNACK is only 4 bytes len, so it is assumed - * that net buf traversing is not required here - */ + /* CONNACK is 4 bytes len */ rc = mqtt_unpack_connack(data, len, &session, &connect_rc); if (rc != 0) { rc = -EINVAL; @@ -479,8 +469,8 @@ int mqtt_rx_pub_msgs(struct mqtt_ctx *ctx, struct net_buf *rx, return -EINVAL; } - data = net_nbuf_appdata(rx); - len = net_nbuf_appdatalen(rx); + data = rx->data; + len = rx->len; /* 4 bytes message */ rc = unpack(data, len, &pkt_id); @@ -540,17 +530,12 @@ int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx) int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx) { - uint8_t *data; - uint16_t len; int rc; ARG_UNUSED(ctx); - data = net_nbuf_appdata(rx); - len = net_nbuf_appdatalen(rx); - /* 2 bytes message */ - rc = mqtt_unpack_pingresp(data, len); + rc = mqtt_unpack_pingresp(rx->data, rx->len); if (rc != 0) { return -EINVAL; @@ -568,8 +553,8 @@ int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx) uint8_t *data; int rc; - data = net_nbuf_appdata(rx); - len = net_nbuf_appdatalen(rx); + data = rx->data; + len = rx->len; rc = mqtt_unpack_suback(data, len, &pkt_id, &items, CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS, suback_qos); @@ -596,8 +581,8 @@ int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx) uint8_t *data; int rc; - data = net_nbuf_appdata(rx); - len = net_nbuf_appdatalen(rx); + data = rx->data; + len = rx->len; /* 4 bytes message */ rc = mqtt_unpack_unsuback(data, len, &pkt_id); @@ -644,9 +629,158 @@ int mqtt_rx_publish(struct mqtt_ctx *ctx, struct net_buf *rx) break; default: rc = -EINVAL; - break; } return rc; } +/** + * @brief mqtt_linearize_buffer Linearize an IP fragmented buffer + * @param [in] ctx MQTT context structure + * @param [in] rx RX IP stack buffer + * @param [in] min_size Min message size allowed. This allows us + * to exit if the rx buffer is shorter + * than the expected msg size + * @return Data buffer + * @return NULL on error + */ +static +struct net_buf *mqtt_linearize_buffer(struct mqtt_ctx *ctx, struct net_buf *rx, + uint16_t min_size) +{ + struct net_buf *data = NULL; + uint16_t data_len; + uint16_t offset; + int rc; + + data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout); + if (data == NULL) { + return NULL; + } + + /* CONFIG_MQTT_MSG_MAX_SIZE is defined via Kconfig. So here it's + * determined if the input buffer could fit our data buffer or if + * it has the expected size. + */ + data_len = net_nbuf_appdatalen(rx); + if (data_len < min_size || data_len > CONFIG_MQTT_MSG_MAX_SIZE) { + goto exit_error; + } + + offset = net_buf_frags_len(rx) - data_len; + rc = net_nbuf_linear_copy(data, rx, offset, data_len); + if (rc != 0) { + goto exit_error; + } + + return data; + +exit_error: + net_nbuf_unref(data); + + return NULL; +} + +/** + * @brief mqtt_publisher_parser Calls the appropriate rx routine for the MQTT + * message contained in rx + * @param ctx MQTT context + * @param rx RX buffer + * @return 0 on success + * @return -EINVAL if an unknown message is received + * @return -ENOMEM if no data buffer is available + * @return mqtt_rx_connack, mqtt_rx_puback, mqtt_rx_pubrec, + * mqtt_rx_pubcomp, and mqtt_rx_pingresp + * return codes + */ +static +int mqtt_publisher_parser(struct mqtt_ctx *ctx, struct net_buf *rx) +{ + uint16_t pkt_type = MQTT_INVALID; + struct net_buf *data = NULL; + int rc = -EINVAL; + + data = mqtt_linearize_buffer(ctx, rx, MQTT_PUBLISHER_MIN_MSG_SIZE); + if (!data) { + rc = -ENOMEM; + goto exit_parser; + } + + pkt_type = MQTT_PACKET_TYPE(data->data[0]); + + switch (pkt_type) { + case MQTT_CONNACK: + if (!ctx->connected) { + rc = mqtt_rx_connack(ctx, data, ctx->clean_session); + } else { + rc = -EINVAL; + } + break; + case MQTT_PUBACK: + rc = mqtt_rx_puback(ctx, data); + break; + case MQTT_PUBREC: + rc = mqtt_rx_pubrec(ctx, data); + break; + case MQTT_PUBCOMP: + rc = mqtt_rx_pubcomp(ctx, data); + break; + case MQTT_PINGRESP: + rc = mqtt_rx_pingresp(ctx, data); + break; + default: + rc = -EINVAL; + break; + } + +exit_parser: + /* TODO: add error handling via a user provided callback */ + + net_nbuf_unref(data); + + return rc; +} + +static +void mqtt_recv(struct net_context *net_ctx, struct net_buf *buf, int status, + void *data) +{ + struct mqtt_ctx *mqtt = (struct mqtt_ctx *)data; + + /* net_ctx is already referenced to by the mqtt_ctx struct */ + ARG_UNUSED(net_ctx); + + if (status != 0) { + return; + } + + mqtt->rcv(mqtt, buf); + net_nbuf_unref(buf); +} + +int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type) +{ + /* So far, only clean session = 1 is supported */ + ctx->clean_session = 1; + ctx->connected = 0; + + /* Install the receiver callback, timeout is set to K_NO_WAIT. + * In this case, no return code is evaluated. + */ + (void)net_context_recv(ctx->net_ctx, mqtt_recv, K_NO_WAIT, ctx); + + ctx->app_type = app_type; + + switch (ctx->app_type) { + case MQTT_APP_PUBLISHER: + ctx->rcv = mqtt_publisher_parser; + break; + case MQTT_APP_SUBSCRIBER: + ctx->rcv = NULL; + break; + default: + return -EINVAL; + } + + return 0; +} diff --git a/subsys/net/lib/mqtt/mqtt_pkt.h b/subsys/net/lib/mqtt/mqtt_pkt.h index d431c0aff14..5fe5b202825 100644 --- a/subsys/net/lib/mqtt/mqtt_pkt.h +++ b/subsys/net/lib/mqtt/mqtt_pkt.h @@ -32,6 +32,8 @@ #include +#define MQTT_PACKET_TYPE(first_byte) (((first_byte) & 0xF0) >> 4) + /** * @brief mqtt_pack_connack Packs the MQTT CONNACK message * @details See MQTT 3.2 CONNACK - Acknowledge connection