diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index e4dabecb309..fc975ca180f 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -741,6 +741,70 @@ exit_parser: return rc; } + +/** + * @brief mqtt_subscriber_parser Calls the appropriate rx routine for the MQTT + * message contained in rx + * @details On error, this routine will execute the + * 'ctx->malformed' callback (if defined) + * @param ctx MQTT context + * @param rx RX buffer + * @return 0 on success + * @return -EINVAL if an unknown message is received + * @return -ENOMEM if no data buffer is available + * @return mqtt_rx_publish, mqtt_rx_pubrel, mqtt_rx_pubrel, + * mqtt_rx_suback + * return codes + */ +static +int mqtt_subscriber_parser(struct mqtt_ctx *ctx, struct net_buf *rx) +{ + uint16_t pkt_type = MQTT_INVALID; + struct net_buf *data = NULL; + int rc = 0; + + data = mqtt_linearize_buffer(ctx, rx, MQTT_PUBLISHER_MIN_MSG_SIZE); + if (!data) { + rc = -EINVAL; + goto exit_parser; + } + + pkt_type = MQTT_PACKET_TYPE(data->data[0]); + + switch (pkt_type) { + case MQTT_CONNACK: + if (!ctx->connected) { + rc = mqtt_rx_connack(ctx, data, ctx->clean_session); + } else { + rc = -EINVAL; + } + + break; + case MQTT_PUBLISH: + rc = mqtt_rx_publish(ctx, data); + break; + case MQTT_PUBREL: + rc = mqtt_rx_pubrel(ctx, data); + break; + case MQTT_PINGRESP: + rc = mqtt_rx_pubrel(ctx, data); + break; + case MQTT_SUBACK: + rc = mqtt_rx_suback(ctx, data); + break; + default: + rc = -EINVAL; + break; + } + +exit_parser: + /* TODO: add error handling via a user provided callback */ + + net_nbuf_unref(data); + + return rc; +} + static void mqtt_recv(struct net_context *net_ctx, struct net_buf *buf, int status, void *data) @@ -776,7 +840,7 @@ int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type) ctx->rcv = mqtt_publisher_parser; break; case MQTT_APP_SUBSCRIBER: - ctx->rcv = NULL; + ctx->rcv = mqtt_subscriber_parser; break; default: return -EINVAL;