iot/mqtt: Add the MQTT high-level API

This commit adds the MQTT high-level API with Quality-of-Service
support. The following MQTT messages are covered by this commit:

CONNECT (tx), DISCONNECT (tx), PUBACK (tx, rx), PUBCOMP (tx, rx),
PUBREC (tx, rx), PUBREL (tx, rx), PUBLISH (tx), PINGREQ (tx),
SUBSCRIBE (tx), UNSUBSCRIBE (tx), CONNACK (rx), PINGRESP (rx),
SUBACK (rx) and UNSUBACK (rx).

Where 'tx' stands for transmission: routines that create and send
messages. 'rx' stands for reception: routines that receive an RX
buffer from the IP stack and parse the MQTT mesage contained in
that buffer.

Jira: ZEP-365
Jira: ZEP-591
Jira: ZEP-856

Change-Id: Ibee701a298127eb713aa3fde5aaf7d089ecd1b9d
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-11-13 22:24:45 -06:00 committed by Jukka Rissanen
commit d37b9fa9b5
4 changed files with 1022 additions and 0 deletions

389
include/iot/mqtt.h Normal file
View file

@ -0,0 +1,389 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _MQTT_H_
#define _MQTT_H_
#include <iot/mqtt_types.h>
#include <net/net_context.h>
/**
* @brief mqtt_app MQTT application type
*/
enum mqtt_app {
/** Publisher only application */
MQTT_APP_PUBLISHER,
/** Subscriber only application */
MQTT_APP_SUBSCRIBER,
/** Publisher and Subscriber application */
MQTT_APP_PUBLISHER_SUBSCRIBER,
/** MQTT Server */
MQTT_APP_SERVER
};
/**
* @brief struct mqtt_ctx MQTT context structure
* @details
* Context structure for the MQTT high-level API with support for QoS.
*
* This API is designed for asynchronous operation, so callbacks are
* executed when some events must be addressed outside the MQTT routines.
* Those events are triggered by the reception or transmission of MQTT messages
* and are defined by the MQTT v3.1.1 spec, see:
*
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
*
* For example, assume that Zephyr is operating as a MQTT_APP_SUBSCRIBER, so it
* may receive the MQTT PUBLISH and MQTT PUBREL (QoS2) messages. Once the
* messages are parsed and validated, the #publish_rx callback is executed.
*
* Internally, the #publish_rx callback must store the #mqtt_publish_msg message
* when a MQTT PUBLISH msg is received. When a MQTT PUBREL message is received,
* the application must evaluate if the PUBREL's Packet Identifier matches a
* previous received MQTT PUBLISH message. In this case, the #publish_rx_data
* may be used to refer to a collection of #mqtt_publish_msg structs (array of
* structs).
*
* <b>NOTE: The application (and not the API) is in charge of keeping track of
* the state of the received and sent messages.</b>
*/
struct mqtt_ctx {
/** IP stack context structure */
struct net_context *net_ctx;
/** Network timeout for tx and rx routines */
uint32_t net_timeout;
/** Callback executed when a MQTT CONNACK msg is received and validated.
* If this function pointer is not used, must be set to NULL.
* The argument for the routine pointed to by this pointer
* is represented by #connect_data.
*/
void (*connect)(void *data);
/** Data passed to the #connect callback. */
void *connect_data;
/** Callback executed when a MQTT DISCONNECT msg is sent.
* If this function pointer is not used, must be set to NULL.
* The argument for the routine pointed to by this pointer
* is represented by #disconnect_data.
*/
void (*disconnect)(void *data);
/** Data passed to the #disconnect callback. */
void *disconnect_data;
/** Callback executed when a #MQTT_APP_PUBLISHER application receives
* a MQTT PUBxxxx msg.
*
* <b>Note: this callback must be not NULL</b>
*
* @param [in] data User provided data, represented by
* #publish_tx_data
* @param [in] pkt_id Packet Identifier for the input MQTT msg
* @param [in] type Packet type
* @return If this callback returns 0, the caller will
* continue.
*
* @return If type is MQTT_PUBACK, MQTT_PUBCOMP or
* MQTT_PUBREC, this callback must return 0 if
* pkt_id matches the packet id of a previously
* received MQTT_PUBxxx message.
*
* @return <b>Note: the application must discard all the
* messages already processed</b>
*
* @return Any other value will stop the QoS handshake
* and the caller will return -EINVAL
*/
int (*publish_tx)(void *data, uint16_t pkt_id, enum mqtt_packet type);
/** Data passed to the #publish_tx callback */
void *publish_tx_data;
/** Callback executed when a MQTT_APP_SUBSCRIBER,
* MQTT_APP_PUBLISHER_SUBSCRIBER or MQTT_APP_SERVER application receive
* a MQTT PUBxxxx msg.
*
* <b>Note: this callback must be not NULL</b>
*
* @param [in] data User provided data, represented by
* #publish_rx_data
* @param [in] msg Publish message, this parameter is only used
* when the type is MQTT_PUBLISH
* @param [in] pkt_id Packet Identifier for the input msg
* @param [in] type Packet type
* @return If this callback returns 0, the caller will
* continue.
*
* @return If type is MQTT_PUBREL this callback must return
* 0 if pkt_id matches the packet id of a
* previously received MQTT_PUBxxx message.
*
* @return <b>Note: the application must discard all the
* messages already processed</b>
*
* @return Any other value will stop the QoS handshake
* and the caller will return -EINVAL
*/
int (*publish_rx)(void *data, struct mqtt_publish_msg *msg,
uint16_t pkt_id, enum mqtt_packet type);
/** Data passed to the #publish_rx callback */
void *publish_rx_data;
/** Callback executed when a MQTT_APP_SUBSCRIBER or
* MQTT_APP_PUBLISHER_SUBSCRIBER receives the MQTT SUBACK message
*
* <b>Note: this callback must be not NULL</b>
*
* @param [in] data User provided data, represented by
* #subscribe_data
* @param [in] pkt_id Packet Identifier for the MQTT SUBACK msg
* @param [in] items Number of elements in the qos array
* @param [in] qos Array of QoS values
* @return If this callback returns 0, the caller will
* continue
* @return Any other value will make the caller return
* -EINVAL
*/
int (*subscribe)(void *data, uint16_t pkt_id,
uint8_t items, enum mqtt_qos qos[]);
/** Data passed to the #subscribe callback */
void *subscribe_data;
/** Callback executed when a MQTT_APP_SUBSCRIBER or
* MQTT_APP_PUBLISHER_SUBSCRIBER receives the MQTT UNSUBACK message
*
* <b>Note: this callback must be not NULL</b>
* @param [in] data User provided data, represented by
* #unsubscribe_data
* @param [in] pkt_id Packet Identifier for the MQTT SUBACK msg
* @return If this callback returns 0, the caller will
* continue
* @return Any other value will make the caller return
* -EINVAL
*/
int (*unsubscribe)(void *data, uint16_t pkt_id);
/** Data passed to the #unsubscribe callback */
void *unsubscribe_data;
/* Clean session is also part of the MQTT CONNECT msg, however app
* behavior is influenced by this parameter, so we keep a copy here
*/
/** MQTT Clean Session parameter */
uint8_t clean_session:1;
/** 1 if the MQTT application is connected and 0 otherwise */
uint8_t connected:1;
/** Application type */
enum mqtt_app app_type;
};
/**
* @brief mqtt_tx_connect Sends the MQTT CONNECT message
* @param [in] ctx MQTT context structure
* @param [in] msg MQTT CONNECT msg
* @return 0 on success
* @return -EIO on network error
* @return -ENOMEM if no data/tx buffer is available
* @return -EINVAL if invalid data was passed to this
* routine
*/
int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg);
/**
* @brief mqtt_tx_disconnect Send the MQTT DISCONNECT message
* @param [in] ctx MQTT context structure
* @return 0 on success
* @return -EIO on network error
* @return -ENOMEM if no data/tx buffer is available
* @return -EINVAL if invalid data was passed to this
* routine
*/
int mqtt_tx_disconnect(struct mqtt_ctx *ctx);
/**
* @brief mqtt_tx_puback Sends the MQTT PUBACK message with the given
* packet id
* @param [in] ctx MQTT context structure
* @param [in] id MQTT Packet Identifier
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_puback(struct mqtt_ctx *ctx, uint16_t id);
/**
* @brief mqtt_tx_pubcomp Sends the MQTT PUBCOMP message with the given
* packet id
* @param [in] ctx MQTT context structure
* @param [in] id MQTT Packet Identifier
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, uint16_t id);
/**
* @brief mqtt_tx_pubrec Sends the MQTT PUBREC message with the given
* packet id
* @param [in] ctx MQTT context structure
* @param [in] id MQTT Packet Identifier
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_pubrec(struct mqtt_ctx *ctx, uint16_t id);
/**
* @brief mqtt_tx_pubrel Sends the MQTT PUBREL message with the given
* packet id
* @param [in] ctx MQTT context structure
* @param [in] id MQTT Packet Identifier
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_pubrel(struct mqtt_ctx *ctx, uint16_t id);
/**
* @brief mqtt_tx_publish Sends the MQTT PUBLISH message
* @param [in] ctx MQTT context structure
* @param [in] msg MQTT PUBLISH msg
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg);
/**
* @brief mqtt_tx_pingreq Sends the MQTT PINGREQ message
* @param [in] ctx MQTT context structure
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_pingreq(struct mqtt_ctx *ctx);
/**
* @brief mqtt_tx_subscribe Sends the MQTT SUBSCRIBE message
* @param [in] ctx MQTT context structure
* @param [in] pkt_id Packet identifier for the MQTT SUBSCRIBE msg
* @param [in] items Number of elements in 'topics' and 'qos' arrays
* @param [in] topics Array of 'items' elements containing C strings.
* For example: {"sensors", "lights", "doors"}
* @param [in] qos Array of 'items' elements containing MQTT QoS
* values: MQTT_QoS0, MQTT_QoS1, MQTT_QoS2. For
* example for the 'topics' array above the
* following QoS may be used:
* {MQTT_QoS0, MQTT_QoS2, MQTT_QoS1}, indicating
* that the subscription to 'lights' must be done
* with MQTT_QoS2
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
int mqtt_tx_subscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, int items,
const char *topics[], enum mqtt_qos qos[]);
/**
* @brief mqtt_tx_unsubscribe Sends the MQTT UNSUBSCRIBE message
* @param [in] ctx MQTT context structure
* @param [in] pkt_id Packet identifier for the MQTT UNSUBSCRIBE msg
* @param [in] items Number of elements in the 'topics' array
* @param [in] topics Array of 'items' elements containing C strings
* @return
*/
int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, int items,
const char *topics[]);
int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx,
int clean_session);
/**
* @brief mqtt_rx_puback Parses and validates the MQTT PUBACK message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pubcomp Parses and validates the MQTT PUBCOMP message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pubrec Parses and validates the MQTT PUBREC message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pubrel Parses and validates the MQTT PUBREL message
* @details rx is an RX buffer from the IP stack
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pingresp Parses the MQTT PINGRESP message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_suback Parses the MQTT SUBACK message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @return 0 on success
* @return -EINVAL on error
*/
int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx);
#endif

View file

@ -21,3 +21,33 @@ config MQTT_LIB
default n default n
help help
Enable the Zephyr MQTT Library Enable the Zephyr MQTT Library
config MQTT_MSG_MAX_SIZE
int
prompt "Max size of a MQTT message"
depends on MQTT_LIB
default 128
range 128 1024
help
Set the maximum size of the MQTT message. So, no messages
longer than CONFIG_MQTT_MSG_SIZE will be processed.
config MQTT_ADDITIONAL_BUFFER_CTR
int
prompt "Additional buffers available for the MQTT application"
depends on MQTT_LIB
default 0
help
Set some additional buffers. When two or more concurrent contexts are
used in the same application, additional buffers may help to have a 1:1
relation between application contexts and internal buffers.
config MQTT_SUBSCRIBE_MAX_TOPICS
int
prompt "Max number of topics to subscribe to"
depends on MQTT_LIB
default 1
range 1 8
help
Set the maximum number of topics handled by the SUBSCRIBE/SUBACK
messages during reception.

View file

@ -1 +1,2 @@
obj-y := mqtt_pkt.o obj-y := mqtt_pkt.o
obj-y += mqtt.o

602
lib/iot/mqtt/mqtt.c Normal file
View file

@ -0,0 +1,602 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iot/mqtt.h>
#include "mqtt_pkt.h"
#include <net/net_ip.h>
#include <net/nbuf.h>
#include <net/buf.h>
#include <errno.h>
#define MSG_SIZE CONFIG_MQTT_MSG_MAX_SIZE
#define MQTT_BUF_CTR (1 + CONFIG_MQTT_ADDITIONAL_BUFFER_CTR)
static struct nano_fifo mqtt_msg_fifo;
/* Memory pool internally used to handle messages that may exceed the size of
* system defined network buffer. By using this memory pool, routines don't deal
* with fragmentation, so algorithms are more easy to implement.
*/
static NET_BUF_POOL(mqtt_msg_pool, MQTT_BUF_CTR, MSG_SIZE, &mqtt_msg_fifo,
NULL, 0);
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
{
ctx->app_type = app_type;
/* So far, only clean session = 1 is supported */
ctx->clean_session = 1;
net_buf_pool_init(mqtt_msg_pool);
return 0;
}
int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg)
{
struct net_buf *data;
struct net_buf *tx;
int rc;
data = net_buf_get_timeout(&mqtt_msg_fifo, 0, ctx->net_timeout);
if (data == NULL) {
rc = -ENOMEM;
goto exit_connect;
}
ctx->clean_session = msg->clean_session ? 1 : 0;
rc = mqtt_pack_connect(data->data, &data->len, MSG_SIZE, msg);
if (rc != 0) {
net_nbuf_unref(data);
rc = -EINVAL;
goto exit_connect;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_connect;
}
net_buf_frag_add(tx, data);
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_connect;
}
rc = 0;
exit_connect:
return rc;
}
int mqtt_tx_disconnect(struct mqtt_ctx *ctx)
{
struct net_buf *tx;
/* DISCONNECT is a zero length message: 2 bytes required, no payload */
uint8_t msg[2];
uint16_t len;
int rc;
rc = mqtt_pack_disconnect(msg, &len, sizeof(msg));
if (rc != 0) {
rc = -EINVAL;
goto exit_disconnect;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_disconnect;
}
rc = net_nbuf_append(tx, len, msg);
if (rc != true) {
rc = -ENOMEM;
goto exit_disconnect;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_disconnect;
}
ctx->connected = 0;
rc = 0;
if (ctx->disconnect) {
ctx->disconnect(ctx->disconnect_data);
}
exit_disconnect:
return rc;
}
/**
* @brief mqtt_tx_pub_msgs Writes the MQTT PUBxxx msg indicated by pkt_type
* with identifier 'id'
* @param [in] ctx MQTT context
* @param [in] id MQTT packet identifier
* @param [in] pkt_type MQTT packet type
* @return 0 on success
* @return -EINVAL if an invalid parameter was passed to
* this routine
* @return -ENOMEM if a tx buffer is not available
* @return -EIO on network error
*/
static
int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, uint16_t id,
enum mqtt_packet pkt_type)
{
struct net_buf *tx;
uint8_t msg[4];
uint16_t len;
int rc;
switch (pkt_type) {
case MQTT_PUBACK:
rc = mqtt_pack_puback(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBCOMP:
rc = mqtt_pack_pubcomp(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBREC:
rc = mqtt_pack_pubrec(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBREL:
rc = mqtt_pack_pubrel(msg, &len, sizeof(msg), id);
break;
default:
return -EINVAL;
}
if (rc != 0) {
return -EINVAL;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_send;
}
rc = net_nbuf_append(tx, len, msg);
if (rc != true) {
rc = -ENOMEM;
goto exit_send;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_send;
}
rc = 0;
exit_send:
return rc;
}
int mqtt_tx_puback(struct mqtt_ctx *ctx, uint16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBACK);
}
int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, uint16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBCOMP);
}
int mqtt_tx_pubrec(struct mqtt_ctx *ctx, uint16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBREC);
}
int mqtt_tx_pubrel(struct mqtt_ctx *ctx, uint16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBREL);
}
int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg)
{
struct net_buf *data;
struct net_buf *tx;
int rc;
data = net_buf_get_timeout(&mqtt_msg_fifo, 0, ctx->net_timeout);
if (data == NULL) {
rc = -ENOMEM;
goto exit_publish;
}
rc = mqtt_pack_publish(data->data, &data->len, data->size, msg);
if (rc != 0) {
net_nbuf_unref(data);
rc = -EINVAL;
goto exit_publish;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_publish;
}
net_buf_frag_add(tx, data);
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_publish;
}
rc = 0;
exit_publish:
return rc;
}
int mqtt_tx_pingreq(struct mqtt_ctx *ctx)
{
struct net_buf *tx = NULL;
uint8_t msg[2];
uint16_t len;
int rc;
rc = mqtt_pack_pingreq(msg, &len, sizeof(msg));
if (rc != 0) {
rc = -EINVAL;
goto exit_pingreq;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_pingreq;
}
net_nbuf_append(tx, len, msg);
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_pingreq;
}
rc = 0;
exit_pingreq:
return rc;
}
int mqtt_tx_subscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, int items,
const char *topics[], enum mqtt_qos qos[])
{
struct net_buf *data;
struct net_buf *tx;
int rc;
data = net_buf_get_timeout(&mqtt_msg_fifo, 0, ctx->net_timeout);
if (data == NULL) {
rc = -ENOMEM;
goto exit_subs;
}
rc = mqtt_pack_subscribe(data->data, &data->len, data->size,
pkt_id, items, topics, qos);
if (rc != 0) {
net_nbuf_unref(data);
rc = -EINVAL;
goto exit_subs;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_subs;
}
net_buf_frag_add(tx, data);
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_subs;
}
rc = 0;
exit_subs:
return rc;
}
int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, int items,
const char *topics[])
{
struct net_buf *data;
struct net_buf *tx;
int rc;
data = net_buf_get_timeout(&mqtt_msg_fifo, 0, ctx->net_timeout);
if (data == NULL) {
rc = -ENOMEM;
goto exit_unsub;
}
rc = mqtt_pack_unsubscribe(data->data, &data->len, data->size, pkt_id,
items, topics);
if (rc != 0) {
net_buf_unref(data);
rc = -EINVAL;
goto exit_unsub;
}
tx = net_nbuf_get_tx(ctx->net_ctx);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_unsub;
}
net_buf_frag_add(tx, data);
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
rc = -EIO;
goto exit_unsub;
}
rc = 0;
exit_unsub:
return rc;
}
int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx, int clean_session)
{
uint16_t len;
uint8_t connect_rc;
uint8_t session;
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
/* CONNACK is only 4 bytes len, so it is assumed
* that net buf traversing is not required here
*/
rc = mqtt_unpack_connack(data, len, &session, &connect_rc);
if (rc != 0) {
rc = -EINVAL;
goto exit_connect;
}
switch (clean_session) {
/* new session */
case 1:
/* server acks there is no previous session
* and server connection return code is OK
*/
if (session == 0 && connect_rc == 0) {
rc = 0;
} else {
rc = -EINVAL;
}
break;
/* previous session */
case 0:
/* TODO */
/* FALLTHROUGH */
default:
rc = -EINVAL;
break;
}
ctx->connected = 1;
if (ctx->connect) {
ctx->connect(ctx->connect_data);
}
exit_connect:
return rc;
}
/**
* @brief mqtt_rx_pub_msgs Parses and validates the MQTT PUBxxxx message
* contained in the rx buffer. It validates against
* message structure and Packet Identifier.
* @details For the MQTT PUBREC and PUBREL messages, this
* function writes the corresponding MQTT PUB msg.
* @param ctx MQTT context
* @param rx RX buffer
* @param type MQTT Packet type
* @return 0 on success
* @return -EINVAL on error
*/
static
int mqtt_rx_pub_msgs(struct mqtt_ctx *ctx, struct net_buf *rx,
enum mqtt_packet type)
{
int (*unpack)(uint8_t *, uint16_t, uint16_t *) = NULL;
int (*response)(struct mqtt_ctx *, uint16_t) = NULL;
uint16_t pkt_id;
uint16_t len;
uint8_t *data;
int rc;
switch (type) {
case MQTT_PUBACK:
unpack = mqtt_unpack_puback;
break;
case MQTT_PUBCOMP:
unpack = mqtt_unpack_pubcomp;
break;
case MQTT_PUBREC:
unpack = mqtt_unpack_pubrec;
response = mqtt_tx_pubrel;
break;
case MQTT_PUBREL:
unpack = mqtt_unpack_pubrel;
response = mqtt_tx_pubcomp;
break;
default:
return -EINVAL;
}
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
/* 4 bytes message */
rc = unpack(data, len, &pkt_id);
if (rc != 0) {
return -EINVAL;
}
/* Only MQTT_APP_SUBSCRIBER, MQTT_APP_PUBLISHER_SUBSCRIBER and
* MQTT_APP_SERVER apps must receive the MQTT_PUBREL msg.
*/
if (type == MQTT_PUBREL) {
if (ctx->app_type != MQTT_APP_PUBLISHER) {
rc = ctx->publish_rx(ctx->publish_rx_data, NULL, pkt_id,
MQTT_PUBREL);
} else {
rc = -EINVAL;
}
} else {
rc = ctx->publish_tx(ctx->publish_tx_data, pkt_id, type);
}
if (rc != 0) {
return -EINVAL;
}
if (!response) {
return 0;
}
rc = response(ctx, pkt_id);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBACK);
}
int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBCOMP);
}
int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBREC);
}
int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBREL);
}
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx)
{
uint8_t *data;
uint16_t len;
int rc;
ARG_UNUSED(ctx);
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
/* 2 bytes message */
rc = mqtt_unpack_pingresp(data, len);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
enum mqtt_qos suback_qos[CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS];
uint16_t pkt_id;
uint16_t len;
uint8_t items;
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
rc = mqtt_unpack_suback(data, len, &pkt_id, &items,
CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS, suback_qos);
if (rc != 0) {
return -EINVAL;
}
if (!ctx->subscribe) {
return -EINVAL;
}
rc = ctx->subscribe(ctx->subscribe_data, pkt_id, items, suback_qos);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
uint16_t pkt_id;
uint16_t len;
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
/* 4 bytes message */
rc = mqtt_unpack_unsuback(data, len, &pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (!ctx->unsubscribe) {
return -EINVAL;
}
rc = ctx->unsubscribe(ctx->subscribe_data, pkt_id);
if (rc != 0) {
return -EINVAL;
}
return 0;
}