net/mqtt: Allow an MQTT publisher app to receive msgs

Changes applied by this patch:

- Add the mqtt_publisher_parser routine
- Add the MQTT_PACKET_TYPE macro to get the MQTT msg packet type
  (required by mqtt_publisher_parser)
- Add the mqtt_linearize_buffer (required by mqtt_publisher_parser)
- Add the mqtt_recv callback for reception
- Modify the mqtt_init routine to install the reception callback

The mqtt_publisher_parser routine is a callback used internally
to execute the appropriate mqtt_rx routine. Only the following
messages are handled by this routine:

MQTT_CONNACK, MQTT_PUBACK, MQTT_PUBREC, MQTT_PUBCOMP and MQTT_PINGRESP.

On error, it executes the ctx->malformed cb, if defined.

This commit also introduces the mqtt_linearize_buffer routine that
will be used to linearize an IP stack fragmented buffer. This patch
makes use of the net_nbuf_linear_copy routine to linearize the
incoming buffer. mqtt_rx_xxxx routines are also updated to handle
linear buffers (no fragmentation).

Currently, all the network protocol routines assume that the input
buffer is not fragmented. Future versions will remove that assumption
and the mqtt_linearize_buffer routine will be removed as well.

Public MQTT API is not affected by this patch.

Change-Id: I02fece67052ffbc7cb393d5ca545c503da463c4b
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-12-17 10:36:14 -06:00 committed by Tomasz Bursztyka
commit 76e479974a
3 changed files with 171 additions and 35 deletions

View file

@ -345,7 +345,7 @@ int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx,
/**
* @brief mqtt_rx_puback Parses and validates the MQTT PUBACK message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -354,7 +354,7 @@ int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pubcomp Parses and validates the MQTT PUBCOMP message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -363,7 +363,7 @@ int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pubrec Parses and validates the MQTT PUBREC message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -373,7 +373,7 @@ int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx);
* @brief mqtt_rx_pubrel Parses and validates the MQTT PUBREL message
* @details rx is an RX buffer from the IP stack
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -382,7 +382,7 @@ int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_pingresp Parses the MQTT PINGRESP message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -391,7 +391,7 @@ int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_suback Parses the MQTT SUBACK message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -400,7 +400,7 @@ int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_unsuback Parses the MQTT UNSUBACK message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
*/
@ -409,7 +409,7 @@ int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx);
/**
* @brief mqtt_rx_publish Parses the MQTT PUBLISH message
* @param [in] ctx MQTT context structure
* @param [in] rx RX buffer from the IP stack
* @param [in] rx Data buffer
* @return 0 on success
* @return -EINVAL on error
* @return -ENOMEM if no data buffer is available

View file

@ -31,15 +31,7 @@
*/
NET_BUF_POOL_DEFINE(mqtt_msg_pool, MQTT_BUF_CTR, MSG_SIZE, 0, NULL);
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
{
ctx->app_type = app_type;
/* So far, only clean session = 1 is supported */
ctx->clean_session = 1;
return 0;
}
#define MQTT_PUBLISHER_MIN_MSG_SIZE 2
int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg)
{
@ -393,12 +385,10 @@ int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx, int clean_session)
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
data = rx->data;
len = rx->len;
/* CONNACK is only 4 bytes len, so it is assumed
* that net buf traversing is not required here
*/
/* CONNACK is 4 bytes len */
rc = mqtt_unpack_connack(data, len, &session, &connect_rc);
if (rc != 0) {
rc = -EINVAL;
@ -479,8 +469,8 @@ int mqtt_rx_pub_msgs(struct mqtt_ctx *ctx, struct net_buf *rx,
return -EINVAL;
}
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
data = rx->data;
len = rx->len;
/* 4 bytes message */
rc = unpack(data, len, &pkt_id);
@ -540,17 +530,12 @@ int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx)
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx)
{
uint8_t *data;
uint16_t len;
int rc;
ARG_UNUSED(ctx);
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
/* 2 bytes message */
rc = mqtt_unpack_pingresp(data, len);
rc = mqtt_unpack_pingresp(rx->data, rx->len);
if (rc != 0) {
return -EINVAL;
@ -568,8 +553,8 @@ int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx)
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
data = rx->data;
len = rx->len;
rc = mqtt_unpack_suback(data, len, &pkt_id, &items,
CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS, suback_qos);
@ -596,8 +581,8 @@ int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx)
uint8_t *data;
int rc;
data = net_nbuf_appdata(rx);
len = net_nbuf_appdatalen(rx);
data = rx->data;
len = rx->len;
/* 4 bytes message */
rc = mqtt_unpack_unsuback(data, len, &pkt_id);
@ -644,9 +629,158 @@ int mqtt_rx_publish(struct mqtt_ctx *ctx, struct net_buf *rx)
break;
default:
rc = -EINVAL;
break;
}
return rc;
}
/**
* @brief mqtt_linearize_buffer Linearize an IP fragmented buffer
* @param [in] ctx MQTT context structure
* @param [in] rx RX IP stack buffer
* @param [in] min_size Min message size allowed. This allows us
* to exit if the rx buffer is shorter
* than the expected msg size
* @return Data buffer
* @return NULL on error
*/
static
struct net_buf *mqtt_linearize_buffer(struct mqtt_ctx *ctx, struct net_buf *rx,
uint16_t min_size)
{
struct net_buf *data = NULL;
uint16_t data_len;
uint16_t offset;
int rc;
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return NULL;
}
/* CONFIG_MQTT_MSG_MAX_SIZE is defined via Kconfig. So here it's
* determined if the input buffer could fit our data buffer or if
* it has the expected size.
*/
data_len = net_nbuf_appdatalen(rx);
if (data_len < min_size || data_len > CONFIG_MQTT_MSG_MAX_SIZE) {
goto exit_error;
}
offset = net_buf_frags_len(rx) - data_len;
rc = net_nbuf_linear_copy(data, rx, offset, data_len);
if (rc != 0) {
goto exit_error;
}
return data;
exit_error:
net_nbuf_unref(data);
return NULL;
}
/**
* @brief mqtt_publisher_parser Calls the appropriate rx routine for the MQTT
* message contained in rx
* @param ctx MQTT context
* @param rx RX buffer
* @return 0 on success
* @return -EINVAL if an unknown message is received
* @return -ENOMEM if no data buffer is available
* @return mqtt_rx_connack, mqtt_rx_puback, mqtt_rx_pubrec,
* mqtt_rx_pubcomp, and mqtt_rx_pingresp
* return codes
*/
static
int mqtt_publisher_parser(struct mqtt_ctx *ctx, struct net_buf *rx)
{
uint16_t pkt_type = MQTT_INVALID;
struct net_buf *data = NULL;
int rc = -EINVAL;
data = mqtt_linearize_buffer(ctx, rx, MQTT_PUBLISHER_MIN_MSG_SIZE);
if (!data) {
rc = -ENOMEM;
goto exit_parser;
}
pkt_type = MQTT_PACKET_TYPE(data->data[0]);
switch (pkt_type) {
case MQTT_CONNACK:
if (!ctx->connected) {
rc = mqtt_rx_connack(ctx, data, ctx->clean_session);
} else {
rc = -EINVAL;
}
break;
case MQTT_PUBACK:
rc = mqtt_rx_puback(ctx, data);
break;
case MQTT_PUBREC:
rc = mqtt_rx_pubrec(ctx, data);
break;
case MQTT_PUBCOMP:
rc = mqtt_rx_pubcomp(ctx, data);
break;
case MQTT_PINGRESP:
rc = mqtt_rx_pingresp(ctx, data);
break;
default:
rc = -EINVAL;
break;
}
exit_parser:
/* TODO: add error handling via a user provided callback */
net_nbuf_unref(data);
return rc;
}
static
void mqtt_recv(struct net_context *net_ctx, struct net_buf *buf, int status,
void *data)
{
struct mqtt_ctx *mqtt = (struct mqtt_ctx *)data;
/* net_ctx is already referenced to by the mqtt_ctx struct */
ARG_UNUSED(net_ctx);
if (status != 0) {
return;
}
mqtt->rcv(mqtt, buf);
net_nbuf_unref(buf);
}
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
{
/* So far, only clean session = 1 is supported */
ctx->clean_session = 1;
ctx->connected = 0;
/* Install the receiver callback, timeout is set to K_NO_WAIT.
* In this case, no return code is evaluated.
*/
(void)net_context_recv(ctx->net_ctx, mqtt_recv, K_NO_WAIT, ctx);
ctx->app_type = app_type;
switch (ctx->app_type) {
case MQTT_APP_PUBLISHER:
ctx->rcv = mqtt_publisher_parser;
break;
case MQTT_APP_SUBSCRIBER:
ctx->rcv = NULL;
break;
default:
return -EINVAL;
}
return 0;
}

View file

@ -32,6 +32,8 @@
#include <net/mqtt_types.h>
#define MQTT_PACKET_TYPE(first_byte) (((first_byte) & 0xF0) >> 4)
/**
* @brief mqtt_pack_connack Packs the MQTT CONNACK message
* @details See MQTT 3.2 CONNACK - Acknowledge connection