net: mqtt: Add BSD socket implementation

Add new, socket based MQTT implementation, based on MQTT from Nordic
nRF5 SDK, introducing the following features:

* transport independent MQTT logic, with support for multiple transports
* support for multiple MQTT versions (3.1.0 and 3.1.1 supported)
* single event handler - no need to keep callback array in RAM
* automatic send of Ping Requests, for connection keep-alive
* message/event parameters wrapped into strucutres - easier extension
  for future MQTT versions
* no separate thread needed to run MQTT - application only needs to call
  mqtt_input and mqtt_live periodically

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2018-06-27 10:41:51 +02:00 committed by Anas Nashif
commit 37563a92d5
14 changed files with 3243 additions and 0 deletions

648
include/net/mqtt.h Normal file
View file

@ -0,0 +1,648 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt.h
*
* @defgroup mqtt_socket MQTT Client library
* @ingroup networking
* @{
* @brief MQTT Client Implementation
*
* @details
* MQTT Client's Application interface is defined in this header.
*
* @note The implementation assumes TCP module is enabled.
*
* @note By default the implementation uses MQTT version 3.1.1.
*/
#ifndef ZEPHYR_INCLUDE_NET_MQTT_H_
#define ZEPHYR_INCLUDE_NET_MQTT_H_
#include <stddef.h>
#include <zephyr.h>
#include <zephyr/types.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief MQTT Asynchronous Events notified to the application from the module
* through the callback registered by the application.
*/
enum mqtt_evt_type {
/** Acknowledgment of connection request. Event result accompanying
* the event indicates whether the connection failed or succeeded.
*/
MQTT_EVT_CONNACK,
/** Disconnection Event. MQTT Client Reference is no longer valid once
* this event is received for the client.
*/
MQTT_EVT_DISCONNECT,
/** Publish event received when message is published on a topic client
* is subscribed to.
*
* @note PUBLISH event structure only contains payload size, the payload
* data parameter should be ignored. Payload content has to be
* read manually with @ref mqtt_read_publish_payload function.
*/
MQTT_EVT_PUBLISH,
/** Acknowledgment for published message with QoS 1. */
MQTT_EVT_PUBACK,
/** Reception confirmation for published message with QoS 2. */
MQTT_EVT_PUBREC,
/** Release of published message with QoS 2. */
MQTT_EVT_PUBREL,
/** Confirmation to a publish release message with QoS 2. */
MQTT_EVT_PUBCOMP,
/** Acknowledgment to a subscribe request. */
MQTT_EVT_SUBACK,
/** Acknowledgment to a unsubscribe request. */
MQTT_EVT_UNSUBACK
};
/** @brief MQTT version protocol level. */
enum mqtt_version {
MQTT_VERSION_3_1_0 = 3, /**< Protocol level for 3.1.0. */
MQTT_VERSION_3_1_1 = 4 /**< Protocol level for 3.1.1. */
};
/** @brief MQTT Quality of Service types. */
enum mqtt_qos {
/** Lowest Quality of Service, no acknowledgment needed for published
* message.
*/
MQTT_QOS_0_AT_MOST_ONCE = 0x00,
/** Medium Quality of Service, if acknowledgment expected for published
* message, duplicate messages permitted.
*/
MQTT_QOS_1_AT_LEAST_ONCE = 0x01,
/** Highest Quality of Service, acknowledgment expected and message
* shall be published only once. Message not published to interested
* parties unless client issues a PUBREL.
*/
MQTT_QOS_2_EXACTLY_ONCE = 0x02
};
/** @brief MQTT CONNACK return codes. */
enum mqtt_conn_return_code {
/** Connection accepted. */
MQTT_CONNECTION_ACCEPTED = 0x00,
/** The Server does not support the level of the MQTT protocol
* requested by the Client.
*/
MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 0x01,
/** The Client identifier is correct UTF-8 but not allowed by the
* Server.
*/
MQTT_IDENTIFIER_REJECTED = 0x02,
/** The Network Connection has been made but the MQTT service is
* unavailable.
*/
MQTT_SERVER_UNAVAILABLE = 0x03,
/** The data in the user name or password is malformed. */
MQTT_BAD_USER_NAME_OR_PASSWORD = 0x04,
/** The Client is not authorized to connect. */
MQTT_NOT_AUTHORIZED = 0x05
};
/** @brief MQTT SUBACK return codes. */
enum mqtt_suback_return_code {
/** Subscription with QoS 0 succeeded. */
MQTT_SUBACK_SUCCESS_QoS_0 = 0x00,
/** Subscription with QoS 1 succeeded. */
MQTT_SUBACK_SUCCESS_QoS_1 = 0x01,
/** Subscription with QoS 2 succeeded. */
MQTT_SUBACK_SUCCESS_QoS_2 = 0x02,
/** Subscription for a topic failed. */
MQTT_SUBACK_FAILURE = 0x80
};
/** @brief Abstracts UTF-8 encoded strings. */
struct mqtt_utf8 {
u8_t *utf8; /**< Pointer to UTF-8 string. */
u32_t size; /**< Size of UTF string, in bytes. */
};
/** @brief Abstracts binary strings. */
struct mqtt_binstr {
u8_t *data; /**< Pointer to binary stream. */
u32_t len; /**< Length of binary stream. */
};
/** @brief Abstracts MQTT UTF-8 encoded topic that can be subscribed
* to or published.
*/
struct mqtt_topic {
/** Topic on to be published or subscribed to. */
struct mqtt_utf8 topic;
/** Quality of service requested for the subscription.
* @ref mqtt_qos for details.
*/
u8_t qos;
};
/** @brief Parameters for a publish message. */
struct mqtt_publish_message {
struct mqtt_topic topic; /**< Topic on which data was published. */
struct mqtt_binstr payload; /**< Payload on the topic published. */
};
/** @brief Parameters for a connection acknowledgment (CONNACK). */
struct mqtt_connack_param {
/** The Session Present flag enables a Client to establish whether
* the Client and Server have a consistent view about whether there
* is already stored Session state.
*/
u8_t session_present_flag;
/** The appropriate non-zero Connect return code indicates if the Server
* is unable to process a connection request for some reason.
*/
enum mqtt_conn_return_code return_code;
};
/** @brief Parameters for MQTT publish acknowledgment (PUBACK). */
struct mqtt_puback_param {
u16_t message_id;
};
/** @brief Parameters for MQTT publish receive (PUBREC). */
struct mqtt_pubrec_param {
u16_t message_id;
};
/** @brief Parameters for MQTT publish release (PUBREL). */
struct mqtt_pubrel_param {
u16_t message_id;
};
/** @brief Parameters for MQTT publish complete (PUBCOMP). */
struct mqtt_pubcomp_param {
u16_t message_id;
};
/** @brief Parameters for MQTT subscription acknowledgment (SUBACK). */
struct mqtt_suback_param {
u16_t message_id;
struct mqtt_binstr return_codes;
};
/** @brief Parameters for MQTT unsubscribe acknowledgment (UNSUBACK). */
struct mqtt_unsuback_param {
u16_t message_id;
};
/** @brief Parameters for a publish message. */
struct mqtt_publish_param {
/** Messages including topic, QoS and its payload (if any)
* to be published.
*/
struct mqtt_publish_message message;
/** Message id used for the publish message. Redundant for QoS 0. */
u16_t message_id;
/** Duplicate flag. If 1, it indicates the message is being
* retransmitted. Has no meaning with QoS 0.
*/
u8_t dup_flag : 1;
/** Retain flag. If 1, the message shall be stored persistently
* by the broker.
*/
u8_t retain_flag : 1;
};
/** @brief List of topics in a subscription request. */
struct mqtt_subscription_list {
/** Array containing topics along with QoS for each. */
struct mqtt_topic *list;
/** Number of topics in the subscription list */
u16_t list_count;
/** Message id used to identify subscription request. */
u16_t message_id;
};
/**
* @brief Defines event parameters notified along with asynchronous events
* to the application.
*/
union mqtt_evt_param {
/** Parameters accompanying MQTT_EVT_CONNACK event. */
struct mqtt_connack_param connack;
/** Parameters accompanying MQTT_EVT_PUBLISH event.
*
* @note PUBLISH event structure only contains payload size, the payload
* data parameter should be ignored. Payload content has to be
* read manually with @ref mqtt_read_publish_payload function.
*/
struct mqtt_publish_param publish;
/** Parameters accompanying MQTT_EVT_PUBACK event. */
struct mqtt_puback_param puback;
/** Parameters accompanying MQTT_EVT_PUBREC event. */
struct mqtt_pubrec_param pubrec;
/** Parameters accompanying MQTT_EVT_PUBREL event. */
struct mqtt_pubrel_param pubrel;
/** Parameters accompanying MQTT_EVT_PUBCOMP event. */
struct mqtt_pubcomp_param pubcomp;
/** Parameters accompanying MQTT_EVT_SUBACK event. */
struct mqtt_suback_param suback;
/** Parameters accompanying MQTT_EVT_UNSUBACK event. */
struct mqtt_unsuback_param unsuback;
};
/** @brief Defines MQTT asynchronous event notified to the application. */
struct mqtt_evt {
/** Identifies the event. */
enum mqtt_evt_type type;
/** Contains parameters (if any) accompanying the event. */
union mqtt_evt_param param;
/** Event result. 0 or a negative error code (errno.h) indicating
* reason of failure.
*/
int result;
};
struct mqtt_client;
/**
* @brief Asynchronous event notification callback registered by the
* application.
*
* @param[in] client Identifies the client for which the event is notified.
* @param[in] evt Event description along with result and associated
* parameters (if any).
*/
typedef void (*mqtt_evt_cb_t)(struct mqtt_client *client,
const struct mqtt_evt *evt);
/** @brief MQTT transport type. */
enum mqtt_transport_type {
/** Use non secure TCP transport for MQTT connection. */
MQTT_TRANSPORT_NON_SECURE = 0x00,
/** Use secure TCP transport (TLS) for MQTT connection. */
MQTT_TRANSPORT_SECURE = 0x01,
/** Shall not be used as a transport type.
* Indicator of maximum transport types possible.
*/
MQTT_TRANSPORT_NUM = 0x02
};
/** @brief MQTT transport specific data. */
struct mqtt_transport {
/** Transport type selection for client instance.
* @ref mqtt_transport_type for possible values. MQTT_TRANSPORT_MAX
* is not a valid type.
*/
enum mqtt_transport_type type;
union {
/* TCP socket transport for MQTT */
struct {
/** Socket descriptor. */
int sock;
} tcp;
};
};
/** @brief MQTT internal state. */
struct mqtt_internal {
/** Internal. Mutex to protect access to the client instance. */
struct k_mutex mutex;
/** Internal. Wall clock value (in milliseconds) of the last activity
* that occurred. Needed for periodic PING.
*/
u32_t last_activity;
/** Internal. Client's state in the connection. */
u32_t state;
/** Internal. Packet length read so far. */
u32_t rx_buf_datalen;
/** Internal. Remaining payload length to read. */
u32_t remaining_payload;
};
/**
* @brief MQTT Client definition to maintain information relevant to the
* client.
*/
struct mqtt_client {
/** MQTT client internal state. */
struct mqtt_internal internal;
/** MQTT transport configuration and data. */
struct mqtt_transport transport;
/** Unique client identification to be used for the connection. */
struct mqtt_utf8 client_id;
/** Broker details, for example, address, port. Address type should
* be compatible with transport used.
*/
const void *broker;
/** User name (if any) to be used for the connection. NULL indicates
* no user name.
*/
struct mqtt_utf8 *user_name;
/** Password (if any) to be used for the connection. Note that if
* password is provided, user name shall also be provided. NULL
* indicates no password.
*/
struct mqtt_utf8 *password;
/** Will topic and QoS. Can be NULL. */
struct mqtt_topic *will_topic;
/** Will message. Can be NULL. Non NULL value valid only if will topic
* is not NULL.
*/
struct mqtt_utf8 *will_message;
/** Application callback registered with the module to get MQTT events.
*/
mqtt_evt_cb_t evt_cb;
/** Receive buffer used for MQTT packet reception in RX path. */
u8_t *rx_buf;
/** Size of receive buffer. */
u32_t rx_buf_size;
/** Transmit buffer used for creating MQTT packet in TX path. */
u8_t *tx_buf;
/** Size of transmit buffer. */
u32_t tx_buf_size;
/** MQTT protocol version. */
u8_t protocol_version;
/** Will retain flag, 1 if will message shall be retained persistently.
*/
u8_t will_retain : 1;
/** Clean session flag indicating a fresh (1) or a retained session (0).
* Default is 1.
*/
u8_t clean_session : 1;
};
/**
* @brief Initializes the client instance.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
*
* @note Shall be called to initialize client structure, before setting any
* client parameters and before connecting to broker.
*/
void mqtt_client_init(struct mqtt_client *client);
/**
* @brief API to request new MQTT client connection.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
*
* @note This memory is assumed to be resident until mqtt_disconnect is called.
* @note Any subsequent changes to parameters like broker address, user name,
* device id, etc. have no effect once MQTT connection is established.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*
* @note Default protocol revision used for connection request is 3.1.1. Please
* set client.protocol_version = MQTT_VERSION_3_1_0 to use protocol 3.1.0.
* @note Please modify :option:`CONFIG_MQTT_KEEPALIVE` time to override default
* of 1 minute.
*/
int mqtt_connect(struct mqtt_client *client);
/**
* @brief API to publish messages on topics.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[in] param Parameters to be used for the publish message.
* Shall not be NULL.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_publish(struct mqtt_client *client,
const struct mqtt_publish_param *param);
/**
* @brief API used by client to send acknowledgment on receiving QoS1 publish
* message. Should be called on reception of @ref MQTT_EVT_PUBLISH with
* QoS level @ref MQTT_QOS_1_AT_LEAST_ONCE.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[in] param Identifies message being acknowledged.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_publish_qos1_ack(struct mqtt_client *client,
const struct mqtt_puback_param *param);
/**
* @brief API used by client to send acknowledgment on receiving QoS2 publish
* message. Should be called on reception of @ref MQTT_EVT_PUBLISH with
* QoS level @ref MQTT_QOS_2_EXACTLY_ONCE.
*
* @param[in] client Identifies client instance for which the procedure is
* requested. Shall not be NULL.
* @param[in] param Identifies message being acknowledged.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_publish_qos2_receive(struct mqtt_client *client,
const struct mqtt_pubrec_param *param);
/**
* @brief API used by client to request release of QoS2 publish message.
* Should be called on reception of @ref MQTT_EVT_PUBREC.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[in] param Identifies message being released.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_publish_qos2_release(struct mqtt_client *client,
const struct mqtt_pubrel_param *param);
/**
* @brief API used by client to send acknowledgment on receiving QoS2 publish
* release message. Should be called on reception of
* @ref MQTT_EVT_PUBREL.
*
* @param[in] client Identifies client instance for which the procedure is
* requested. Shall not be NULL.
* @param[in] param Identifies message being completed.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_publish_qos2_complete(struct mqtt_client *client,
const struct mqtt_pubcomp_param *param);
/**
* @brief API to request subscription of one or more topics on the connection.
*
* @param[in] client Identifies client instance for which the procedure
* is requested. Shall not be NULL.
* @param[in] param Subscription parameters. Shall not be NULL.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_subscribe(struct mqtt_client *client,
const struct mqtt_subscription_list *param);
/**
* @brief API to request unsubscribtion of one or more topics on the connection.
*
* @param[in] client Identifies client instance for which the procedure is
* requested. Shall not be NULL.
* @param[in] param Parameters describing topics being unsubscribed from.
* Shall not be NULL.
*
* @note QoS included in topic description is unused in this API.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_unsubscribe(struct mqtt_client *client,
const struct mqtt_subscription_list *param);
/**
* @brief API to send MQTT ping. The use of this API is optional, as the library
* handles the connection keep-alive on it's own, see @ref mqtt_live.
*
* @param[in] client Identifies client instance for which procedure is
* requested.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_ping(struct mqtt_client *client);
/**
* @brief API to disconnect MQTT connection.
*
* @param[in] client Identifies client instance for which procedure is
* requested.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_disconnect(struct mqtt_client *client);
/**
* @brief API to abort MQTT connection. This will close the corresponding
* transport without closing the connection gracefully at the MQTT level
* (with disconnect message).
*
* @param[in] client Identifies client instance for which procedure is
* requested.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_abort(struct mqtt_client *client);
/**
* @brief This API should be called periodically for the client to be able
* to keep the connection alive by sending Ping Requests if need be.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
*
* @note Application shall ensure that the periodicity of calling this function
* makes it possible to respect the Keep Alive time agreed with the
* broker on connection. @ref mqtt_connect for details on Keep Alive
* time.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_live(struct mqtt_client *client);
/**
* @brief Receive an incoming MQTT packet. The registered callback will be
* called with the packet content.
*
* @note In case of PUBLISH message, the payload has to be read separately with
* @ref mqtt_read_publish_payload function. The size of the payload to
* read is provided in the publish event structure.
*
* @note This is a non-blocking call.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_input(struct mqtt_client *client);
/**
* @brief Read the payload of the received PUBLISH message. This function should
* be called within the MQTT event handler, when MQTT PUBLISH message is
* notified.
*
* @note This is a non-blocking call.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[out] buffer Buffer where payload should be stored.
* @param[in] length Length of the buffer, in bytes.
*
* @return Number of bytes read or a negative error code (errno.h) indicating
* reason of failure.
*/
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length);
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_NET_MQTT_H_ */
/**@} */

View file

@ -4,6 +4,7 @@ add_subdirectory_if_kconfig(lwm2m)
add_subdirectory_if_kconfig(sntp) add_subdirectory_if_kconfig(sntp)
add_subdirectory_ifdef(CONFIG_DNS_RESOLVER dns) add_subdirectory_ifdef(CONFIG_DNS_RESOLVER dns)
add_subdirectory_ifdef(CONFIG_MQTT_LEGACY_LIB mqtt) add_subdirectory_ifdef(CONFIG_MQTT_LEGACY_LIB mqtt)
add_subdirectory_ifdef(CONFIG_MQTT_LIB mqtt_sock)
add_subdirectory_ifdef(CONFIG_NET_APP app) add_subdirectory_ifdef(CONFIG_NET_APP app)
add_subdirectory_ifdef(CONFIG_NET_CONFIG_SETTINGS config) add_subdirectory_ifdef(CONFIG_NET_CONFIG_SETTINGS config)
add_subdirectory_ifdef(CONFIG_NET_SOCKETS sockets) add_subdirectory_ifdef(CONFIG_NET_SOCKETS sockets)

View file

@ -14,6 +14,8 @@ source "subsys/net/lib/dns/Kconfig"
source "subsys/net/lib/mqtt/Kconfig" source "subsys/net/lib/mqtt/Kconfig"
source "subsys/net/lib/mqtt_sock/Kconfig"
source "subsys/net/lib/http/Kconfig" source "subsys/net/lib/http/Kconfig"
source "subsys/net/lib/lwm2m/Kconfig" source "subsys/net/lib/lwm2m/Kconfig"

View file

@ -0,0 +1,10 @@
zephyr_library()
zephyr_library_sources(
mqtt_decoder.c
mqtt_encoder.c
mqtt_rx.c
mqtt_transport_socket_tcp.c
mqtt_transport.c
mqtt.c
)

View file

@ -0,0 +1,30 @@
# Kconfig - Socket MQTT Library for Zephyr
#
# Copyright (c) 2018 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
#
config MQTT_LIB
bool "Socket MQTT Library Support"
select NET_SOCKETS
select NET_SOCKETS_POSIX_NAMES
help
Enable the Zephyr MQTT Library
if MQTT_LIB
module=MQTT
module-dep=NET_LOG
module-str=Log level for MQTT
module-help=Enables mqtt debug messages.
source "subsys/net/Kconfig.template.log_config.net"
config MQTT_KEEPALIVE
int "Maximum number of clients Keep alive time for MQTT (in seconds)"
default 60
help
Keep alive time for MQTT (in seconds). Sending of Ping Requests to
keep the connection alive are governed by this value.
endif # MQTT_LIB

View file

@ -0,0 +1,639 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt.c
*
* @brief MQTT Client API Implementation.
*/
#define LOG_MODULE_NAME net_mqtt
#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL
#include <net/mqtt.h>
#include "mqtt_transport.h"
#include "mqtt_internal.h"
#include "mqtt_os.h"
static void client_reset(struct mqtt_client *client)
{
MQTT_STATE_INIT(client);
client->internal.last_activity = 0;
client->internal.rx_buf_datalen = 0;
client->internal.remaining_payload = 0;
}
/** @brief Initialize tx buffer. */
static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf)
{
memset(client->tx_buf, 0, client->tx_buf_size);
buf->cur = client->tx_buf;
buf->end = client->tx_buf + client->tx_buf_size;
}
/**@brief Notifies disconnection event to the application.
*
* @param[in] client Identifies the client for which the procedure is requested.
* @param[in] result Reason for disconnection.
*/
static void disconnect_event_notify(struct mqtt_client *client, int result)
{
struct mqtt_evt evt;
/* Determine appropriate event to generate. */
if (MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED) ||
MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) {
evt.type = MQTT_EVT_DISCONNECT;
evt.result = result;
} else {
evt.type = MQTT_EVT_CONNACK;
evt.result = -ECONNREFUSED;
}
/* Notify application. */
event_notify(client, &evt);
/* Reset internal state. */
client_reset(client);
}
void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt)
{
if (client->evt_cb != NULL) {
mqtt_mutex_unlock(client);
client->evt_cb(client, evt);
mqtt_mutex_lock(client);
}
}
static void client_disconnect(struct mqtt_client *client, int result)
{
int err_code;
err_code = mqtt_transport_disconnect(client);
if (err_code < 0) {
MQTT_ERR("Failed to disconnect transport!");
}
disconnect_event_notify(client, result);
}
static int client_connect(struct mqtt_client *client)
{
int err_code;
struct buf_ctx packet;
err_code = mqtt_transport_connect(client);
if (err_code < 0) {
return err_code;
}
tx_buf_init(client, &packet);
MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED);
err_code = connect_request_encode(client, &packet);
if (err_code < 0) {
goto error;
}
/* Send MQTT identification message to broker. */
err_code = mqtt_transport_write(client, packet.cur,
packet.end - packet.cur);
if (err_code < 0) {
goto error;
}
client->internal.last_activity = mqtt_sys_tick_in_ms_get();
MQTT_TRC("Connect completed");
return 0;
error:
client_disconnect(client, err_code);
return err_code;
}
static int client_read(struct mqtt_client *client)
{
int err_code;
if (client->internal.remaining_payload > 0) {
return -EBUSY;
}
err_code = mqtt_handle_rx(client);
if (err_code < 0) {
client_disconnect(client, err_code);
}
return err_code;
}
static int client_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen)
{
int err_code;
MQTT_TRC("[%p]: Transport writing %d bytes.", client, datalen);
err_code = mqtt_transport_write(client, data, datalen);
if (err_code < 0) {
MQTT_TRC("TCP write failed, errno = %d, "
"closing connection", errno);
client_disconnect(client, err_code);
return err_code;
}
MQTT_TRC("[%p]: Transport write complete.", client);
client->internal.last_activity = mqtt_sys_tick_in_ms_get();
return 0;
}
void mqtt_client_init(struct mqtt_client *client)
{
NULL_PARAM_CHECK_VOID(client);
memset(client, 0, sizeof(*client));
MQTT_STATE_INIT(client);
mqtt_mutex_init(client);
client->protocol_version = MQTT_VERSION_3_1_1;
client->clean_session = 1;
}
int mqtt_connect(struct mqtt_client *client)
{
int err_code;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(client->client_id.utf8);
mqtt_mutex_lock(client);
if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) {
err_code = -ENOMEM;
goto error;
}
err_code = client_connect(client);
if (err_code < 0) {
err_code = -ECONNREFUSED;
goto error;
}
return 0;
error:
client_reset(client);
mqtt_mutex_unlock(client);
return err_code;
}
static int verify_tx_state(const struct mqtt_client *client)
{
if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) {
return -ENOTCONN;
}
return 0;
}
int mqtt_publish(struct mqtt_client *client,
const struct mqtt_publish_param *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> Topic size 0x%08x, "
"Data size 0x%08x", client, client->internal.state,
param->message.topic.topic.size,
param->message.payload.len);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = publish_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, param->message.payload.data,
param->message.payload.len);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_publish_qos1_ack(struct mqtt_client *client,
const struct mqtt_puback_param *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
client, client->internal.state, param->message_id);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = publish_ack_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_publish_qos2_receive(struct mqtt_client *client,
const struct mqtt_pubrec_param *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
client, client->internal.state, param->message_id);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = publish_receive_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_publish_qos2_release(struct mqtt_client *client,
const struct mqtt_pubrel_param *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
client, client->internal.state, param->message_id);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = publish_release_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_publish_qos2_complete(struct mqtt_client *client,
const struct mqtt_pubcomp_param *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
client, client->internal.state, param->message_id);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = publish_complete_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
if (err_code < 0) {
goto error;
}
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_disconnect(struct mqtt_client *client)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = disconnect_encode(&packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
if (err_code < 0) {
goto error;
}
MQTT_SET_STATE_EXCLUSIVE(client, MQTT_STATE_DISCONNECTING);
error:
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_subscribe(struct mqtt_client *client,
const struct mqtt_subscription_list *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
MQTT_TRC("[CID %p]:[State 0x%02x]: >> message id 0x%04x "
"topic count 0x%04x", client, client->internal.state,
param->message_id, param->list_count);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = subscribe_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x",
client, client->internal.state, err_code);
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_unsubscribe(struct mqtt_client *client,
const struct mqtt_subscription_list *param)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
NULL_PARAM_CHECK(param);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = unsubscribe_encode(param, &packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_ping(struct mqtt_client *client)
{
int err_code;
struct buf_ctx packet;
NULL_PARAM_CHECK(client);
mqtt_mutex_lock(client);
tx_buf_init(client, &packet);
err_code = verify_tx_state(client);
if (err_code < 0) {
goto error;
}
err_code = ping_request_encode(&packet);
if (err_code < 0) {
goto error;
}
err_code = client_write(client, packet.cur, packet.end - packet.cur);
error:
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_abort(struct mqtt_client *client)
{
mqtt_mutex_lock(client);
NULL_PARAM_CHECK(client);
if (client->internal.state != MQTT_STATE_IDLE) {
client_disconnect(client, -ECONNABORTED);
}
mqtt_mutex_unlock(client);
return 0;
}
int mqtt_live(struct mqtt_client *client)
{
u32_t elapsed_time;
NULL_PARAM_CHECK(client);
mqtt_mutex_lock(client);
if (MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) {
client_disconnect(client, 0);
} else {
elapsed_time = mqtt_elapsed_time_in_ms_get(
client->internal.last_activity);
if ((MQTT_KEEPALIVE > 0) &&
(elapsed_time >= (MQTT_KEEPALIVE * 1000))) {
(void)mqtt_ping(client);
}
}
mqtt_mutex_unlock(client);
return 0;
}
int mqtt_input(struct mqtt_client *client)
{
int err_code = 0;
NULL_PARAM_CHECK(client);
mqtt_mutex_lock(client);
MQTT_TRC("state:0x%08x", client->internal.state);
if (MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) {
client_disconnect(client, 0);
} else if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) {
err_code = client_read(client);
} else {
err_code = -EACCES;
}
mqtt_mutex_unlock(client);
return err_code;
}
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length)
{
int ret;
NULL_PARAM_CHECK(client);
mqtt_mutex_lock(client);
if (client->internal.remaining_payload == 0) {
ret = 0;
goto exit;
}
if (client->internal.remaining_payload < length) {
length = client->internal.remaining_payload;
}
ret = mqtt_transport_read(client, buffer, length);
if (ret == -EAGAIN) {
goto exit;
}
if (ret <= 0) {
if (ret == 0) {
ret = -ENOTCONN;
}
client_disconnect(client, ret);
goto exit;
}
client->internal.remaining_payload -= ret;
exit:
mqtt_mutex_unlock(client);
return ret;
}

View file

@ -0,0 +1,298 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_decoder.c
*
* @brief Decoder functions needed for decoding packets received from the
* broker.
*/
#define LOG_MODULE_NAME net_mqtt_dec
#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL
#include "mqtt_internal.h"
#include "mqtt_os.h"
/**
* @brief Unpacks unsigned 8 bit value from the buffer from the offset
* requested.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] val Memory where the value is to be unpacked.
*
* @retval 0 if the procedure is successful.
* @retval -EINVAL if the buffer would be exceeded during the read
*/
static int unpack_uint8(struct buf_ctx *buf, u8_t *val)
{
MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end);
if ((buf->end - buf->cur) < sizeof(u8_t)) {
return -EINVAL;
}
*val = *(buf->cur++);
MQTT_TRC("<< val:%02x", *val);
return 0;
}
/**
* @brief Unpacks unsigned 16 bit value from the buffer from the offset
* requested.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] val Memory where the value is to be unpacked.
*
* @retval 0 if the procedure is successful.
* @retval -EINVAL if the buffer would be exceeded during the read
*/
static int unpack_uint16(struct buf_ctx *buf, u16_t *val)
{
MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end);
if ((buf->end - buf->cur) < sizeof(u16_t)) {
return -EINVAL;
}
*val = *(buf->cur++) << 8; /* MSB */
*val |= *(buf->cur++); /* LSB */
MQTT_TRC("<< val:%04x", *val);
return 0;
}
/**
* @brief Unpacks utf8 string from the buffer from the offset requested.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] str Pointer to a string that will hold the string location
* in the buffer.
*
* @retval 0 if the procedure is successful.
* @retval -EINVAL if the buffer would be exceeded during the read
*/
static int unpack_utf8_str(struct buf_ctx *buf, struct mqtt_utf8 *str)
{
u16_t utf8_strlen;
int err_code;
MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end);
err_code = unpack_uint16(buf, &utf8_strlen);
if (err_code != 0) {
return err_code;
}
if ((buf->end - buf->cur) < utf8_strlen) {
return -EINVAL;
}
str->size = utf8_strlen;
/* Zero length UTF8 strings permitted. */
if (utf8_strlen) {
/* Point to right location in buffer. */
str->utf8 = buf->cur;
buf->cur += utf8_strlen;
} else {
str->utf8 = NULL;
}
MQTT_TRC("<< str_size:%08x", (u32_t)GET_UT8STR_BUFFER_SIZE(str));
return 0;
}
/**
* @brief Unpacks binary string from the buffer from the offset requested.
*
* @param[in] length Binary string length.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] str Pointer to a binary string that will hold the binary string
* location in the buffer.
*
* @retval 0 if the procedure is successful.
* @retval -EINVAL if the buffer would be exceeded during the read
*/
static int unpack_data(u32_t length, struct buf_ctx *buf,
struct mqtt_binstr *str)
{
MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end);
if ((buf->end - buf->cur) < length) {
return -EINVAL;
}
str->len = length;
/* Zero length binary strings are permitted. */
if (length > 0) {
str->data = buf->cur;
buf->cur += length;
} else {
str->data = NULL;
}
MQTT_TRC("<< bin len:%08x", GET_BINSTR_BUFFER_SIZE(str));
return 0;
}
/**@brief Decode MQTT Packet Length in the MQTT fixed header.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] length Length of variable header and payload in the
* MQTT message.
*
* @retval 0 if the procedure is successful.
* @retval -EINVAL if the length decoding would use more that 4 bytes.
* @retval -EAGAIN if the buffer would be exceeded during the read.
*/
int packet_length_decode(struct buf_ctx *buf, u32_t *length)
{
u8_t shift = 0;
u8_t bytes = 0;
*length = 0;
do {
if (bytes > MQTT_MAX_LENGTH_BYTES) {
return -EINVAL;
}
if (buf->cur >= buf->end) {
return -EAGAIN;
}
*length += ((u32_t)*(buf->cur) & MQTT_LENGTH_VALUE_MASK)
<< shift;
shift += MQTT_LENGTH_SHIFT;
bytes++;
} while ((*(buf->cur++) & MQTT_LENGTH_CONTINUATION_BIT) != 0);
MQTT_TRC("length:0x%08x", *length);
return 0;
}
int fixed_header_decode(struct buf_ctx *buf, u8_t *type_and_flags,
u32_t *length)
{
int err_code;
err_code = unpack_uint8(buf, type_and_flags);
if (err_code != 0) {
return err_code;
}
return packet_length_decode(buf, length);
}
int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_connack_param *param)
{
int err_code;
u8_t flags, ret_code;
err_code = unpack_uint8(buf, &flags);
if (err_code != 0) {
return err_code;
}
err_code = unpack_uint8(buf, &ret_code);
if (err_code != 0) {
return err_code;
}
if (client->protocol_version == MQTT_VERSION_3_1_1) {
param->session_present_flag =
flags & MQTT_CONNACK_FLAG_SESSION_PRESENT;
MQTT_TRC("[CID %p]: session_present_flag: %d", client,
param->session_present_flag);
}
param->return_code = (enum mqtt_conn_return_code)ret_code;
return 0;
}
int publish_decode(u8_t flags, u32_t var_length, struct buf_ctx *buf,
struct mqtt_publish_param *param)
{
int err_code;
u32_t var_header_length;
param->dup_flag = flags & MQTT_HEADER_DUP_MASK;
param->retain_flag = flags & MQTT_HEADER_RETAIN_MASK;
param->message.topic.qos = ((flags & MQTT_HEADER_QOS_MASK) >> 1);
err_code = unpack_utf8_str(buf, &param->message.topic.topic);
if (err_code != 0) {
return err_code;
}
var_header_length = param->message.topic.topic.size + sizeof(u16_t);
if (param->message.topic.qos > MQTT_QOS_0_AT_MOST_ONCE) {
err_code = unpack_uint16(buf, &param->message_id);
if (err_code != 0) {
return err_code;
}
var_header_length += sizeof(u16_t);
}
param->message.payload.data = NULL;
param->message.payload.len = var_length - var_header_length;
return 0;
}
int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param)
{
return unpack_uint16(buf, &param->message_id);
}
int publish_receive_decode(struct buf_ctx *buf, struct mqtt_pubrec_param *param)
{
return unpack_uint16(buf, &param->message_id);
}
int publish_release_decode(struct buf_ctx *buf, struct mqtt_pubrel_param *param)
{
return unpack_uint16(buf, &param->message_id);
}
int publish_complete_decode(struct buf_ctx *buf,
struct mqtt_pubcomp_param *param)
{
return unpack_uint16(buf, &param->message_id);
}
int subscribe_ack_decode(struct buf_ctx *buf, struct mqtt_suback_param *param)
{
int err_code;
err_code = unpack_uint16(buf, &param->message_id);
if (err_code != 0) {
return err_code;
}
return unpack_data(buf->end - buf->cur, buf, &param->return_codes);
}
int unsubscribe_ack_decode(struct buf_ctx *buf,
struct mqtt_unsuback_param *param)
{
return unpack_uint16(buf, &param->message_id);
}

View file

@ -0,0 +1,568 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_encoder.c
*
* @brief Encoding functions needed to create packet to be sent to the broker.
*/
#define LOG_MODULE_NAME net_mqtt_enc
#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL
#include "mqtt_internal.h"
#include "mqtt_os.h"
#define MQTT_3_1_0_PROTO_DESC_LEN 6
#define MQTT_3_1_1_PROTO_DESC_LEN 4
static const u8_t mqtt_3_1_0_proto_desc_str[MQTT_3_1_0_PROTO_DESC_LEN] = {
'M', 'Q', 'I', 's', 'd', 'p'
};
static const u8_t mqtt_3_1_1_proto_desc_str[MQTT_3_1_1_PROTO_DESC_LEN] = {
'M', 'Q', 'T', 'T'
};
static const struct mqtt_utf8 mqtt_3_1_0_proto_desc = {
.utf8 = (u8_t *)mqtt_3_1_0_proto_desc_str,
.size = MQTT_3_1_0_PROTO_DESC_LEN
};
static const struct mqtt_utf8 mqtt_3_1_1_proto_desc = {
.utf8 = (u8_t *)mqtt_3_1_1_proto_desc_str,
.size = MQTT_3_1_1_PROTO_DESC_LEN
};
/** Never changing ping request, needed for Keep Alive. */
static const u8_t ping_packet[MQTT_FIXED_HEADER_MIN_SIZE] = {
MQTT_PKT_TYPE_PINGREQ,
0x00
};
/** Never changing disconnect request. */
static const u8_t disc_packet[MQTT_FIXED_HEADER_MIN_SIZE] = {
MQTT_PKT_TYPE_DISCONNECT,
0x00
};
/**
* @brief Packs unsigned 8 bit value to the buffer at the offset requested.
*
* @param[in] val Value to be packed.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
*
* @retval 0 if procedure is successful.
* @retval -ENOMEM if there is no place in the buffer to store the value.
*/
static int pack_uint8(u8_t val, struct buf_ctx *buf)
{
if ((buf->end - buf->cur) < sizeof(u8_t)) {
return -ENOMEM;
}
MQTT_TRC(">> val:%02x cur:%p, end:%p", val, buf->cur, buf->end);
/* Pack value. */
*(buf->cur++) = val;
return 0;
}
/**
* @brief Packs unsigned 16 bit value to the buffer at the offset requested.
*
* @param[in] val Value to be packed.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
*
* @retval 0 if the procedure is successful.
* @retval -ENOMEM if there is no place in the buffer to store the value.
*/
static int pack_uint16(u16_t val, struct buf_ctx *buf)
{
if ((buf->end - buf->cur) < sizeof(u16_t)) {
return -ENOMEM;
}
MQTT_TRC(">> val:%04x cur:%p, end:%p", val, buf->cur, buf->end);
/* Pack value. */
*(buf->cur++) = (val >> 8) & 0xFF;
*(buf->cur++) = val & 0xFF;
return 0;
}
/**
* @brief Packs utf8 string to the buffer at the offset requested.
*
* @param[in] str UTF-8 string and its length to be packed.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
*
* @retval 0 if the procedure is successful.
* @retval -ENOMEM if there is no place in the buffer to store the string.
*/
static int pack_utf8_str(const struct mqtt_utf8 *str, struct buf_ctx *buf)
{
if ((buf->end - buf->cur) < GET_UT8STR_BUFFER_SIZE(str)) {
return -ENOMEM;
}
MQTT_TRC(">> str_size:%08x cur:%p, end:%p",
(u32_t)GET_UT8STR_BUFFER_SIZE(str), buf->cur, buf->end);
/* Pack length followed by string. */
(void)pack_uint16(str->size, buf);
memcpy(buf->cur, str->utf8, str->size);
buf->cur += str->size;
return 0;
}
/**
* @brief Computes and encodes length for the MQTT fixed header.
*
* @note The remaining length is not packed as a fixed unsigned 32 bit integer.
* Instead it is packed on algorithm below:
*
* @code
* do
* encodedByte = X MOD 128
* X = X DIV 128
* // if there are more data to encode, set the top bit of this byte
* if ( X > 0 )
* encodedByte = encodedByte OR 128
* endif
* 'output' encodedByte
* while ( X > 0 )
* @endcode
*
* @param[in] length Length of variable header and payload in the MQTT message.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position. May be NULL (in this case function will
* only calculate number of bytes needed).
*
* @return Number of bytes needed to encode length value.
*/
static u8_t packet_length_encode(u32_t length, struct buf_ctx *buf)
{
u8_t encoded_bytes = 0;
MQTT_TRC(">> length:0x%08x cur:%p, end:%p", length, buf->cur, buf->end);
do {
encoded_bytes++;
if (buf != NULL) {
*(buf->cur) = length & MQTT_LENGTH_VALUE_MASK;
}
length >>= MQTT_LENGTH_SHIFT;
if (buf != NULL) {
if (length > 0) {
*(buf->cur) |= MQTT_LENGTH_CONTINUATION_BIT;
}
buf->cur++;
}
} while (length > 0);
return encoded_bytes;
}
/**
* @brief Encodes fixed header for the MQTT message and provides pointer to
* start of the header.
*
* @param[in] message_type Message type containing packet type and the flags.
* Use @ref MQTT_MESSAGES_OPTIONS to construct the
* message_type.
* @param[in] start Pointer to the start of the variable header.
* @param[inout] buf Buffer context used to encode the frame.
* The 5 bytes before the start of the message are assumed
* by the routine to be available to pack the fixed header.
* However, since the fixed header length is variable
* length, the pointer to the start of the MQTT message
* along with encoded fixed header is supplied as output
* parameter if the procedure was successful.
* As output, the pointers will point to beginning and the end
* of the frame.
*
* @retval 0 if the procedure is successful.
* @retval -EMSGSIZE if the message is too big for MQTT.
*/
static u32_t mqtt_encode_fixed_header(u8_t message_type, u8_t *start,
struct buf_ctx *buf)
{
u32_t length = buf->cur - start;
u8_t fixed_header_length;
if (length > MQTT_MAX_PAYLOAD_SIZE) {
return -EMSGSIZE;
}
MQTT_TRC("<< msg type:0x%02x length:0x%08x", message_type, length);
fixed_header_length = packet_length_encode(length, NULL);
fixed_header_length += sizeof(u8_t);
MQTT_TRC("Fixed header length = %02x", fixed_header_length);
/* Set the pointer at the start of the frame before encoding. */
buf->cur = start - fixed_header_length;
(void)pack_uint8(message_type, buf);
(void)packet_length_encode(length, buf);
/* Set the cur pointer back at the start of the frame,
* and end pointer to the end of the frame.
*/
buf->cur = buf->cur - fixed_header_length;
buf->end = buf->cur + length + fixed_header_length;
return 0;
}
/**
* @brief Encodes a string of a zero length.
*
* @param[in] buffer_len Total size of the buffer on which string will be
* encoded. This shall not be zero.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
*
* @retval 0 if the procedure is successful.
* @retval -ENOMEM if there is no place in the buffer to store the binary
* string.
*/
static int zero_len_str_encode(struct buf_ctx *buf)
{
return pack_uint16(0x0000, buf);
}
/**
* @brief Encodes and sends messages that contain only message id in
* the variable header.
*
* @param[in] message_type Message type and reserved bit fields.
* @param[in] message_id Message id to be encoded in the variable header.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
*
* @retval 0 or an error code indicating a reason for failure.
*/
static int mqtt_message_id_only_enc(u8_t message_type, u16_t message_id,
struct buf_ctx *buf)
{
int err_code;
u8_t *start;
/* Message id zero is not permitted by spec. */
if (message_id == 0) {
return -EINVAL;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_uint16(message_id, buf);
if (err_code != 0) {
return err_code;
}
return mqtt_encode_fixed_header(message_type, start, buf);
}
int connect_request_encode(const struct mqtt_client *client,
struct buf_ctx *buf)
{
u8_t connect_flags = client->clean_session << 1;
const u8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_CONNECT, 0, 0, 0);
const struct mqtt_utf8 *mqtt_proto_desc;
u8_t *connect_flags_pos;
int err_code;
u8_t *start;
if (client->protocol_version == MQTT_VERSION_3_1_1) {
mqtt_proto_desc = &mqtt_3_1_1_proto_desc;
} else {
mqtt_proto_desc = &mqtt_3_1_0_proto_desc;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
MQTT_TRC("Encoding Protocol Description. Str:%s Size:%08x.",
mqtt_proto_desc->utf8, mqtt_proto_desc->size);
err_code = pack_utf8_str(mqtt_proto_desc, buf);
if (err_code != 0) {
return err_code;
}
MQTT_TRC("Encoding Protocol Version %02x.", client->protocol_version);
err_code = pack_uint8(client->protocol_version, buf);
if (err_code != 0) {
return err_code;
}
/* Remember position of connect flag and leave one byte for it to
* be packed once we determine its value.
*/
connect_flags_pos = buf->cur;
err_code = pack_uint8(0, buf);
if (err_code != 0) {
return err_code;
}
MQTT_TRC("Encoding Keep Alive Time %04x.", MQTT_KEEPALIVE);
err_code = pack_uint16(MQTT_KEEPALIVE, buf);
if (err_code != 0) {
return err_code;
}
MQTT_TRC("Encoding Client Id. Str:%s Size:%08x.",
client->client_id.utf8, client->client_id.size);
err_code = pack_utf8_str(&client->client_id, buf);
if (err_code != 0) {
return err_code;
}
/* Pack will topic and QoS */
if (client->will_topic != NULL) {
connect_flags |= MQTT_CONNECT_FLAG_WILL_TOPIC;
/* QoS is always 1 as of now. */
connect_flags |= ((client->will_topic->qos & 0x03) << 3);
connect_flags |= client->will_retain << 5;
MQTT_TRC("Encoding Will Topic. Str:%s Size:%08x.",
client->will_topic->topic.utf8,
client->will_topic->topic.size);
err_code = pack_utf8_str(&client->will_topic->topic, buf);
if (err_code != 0) {
return err_code;
}
if (client->will_message != NULL) {
MQTT_TRC("Encoding Will Message. Str:%s Size:%08x.",
client->will_message->utf8,
client->will_message->size);
err_code = pack_utf8_str(client->will_message, buf);
if (err_code != 0) {
return err_code;
}
} else {
MQTT_TRC("Encoding Zero Length Will Message.");
err_code = zero_len_str_encode(buf);
if (err_code != 0) {
return err_code;
}
}
}
/* Pack Username if any. */
if (client->user_name != NULL) {
connect_flags |= MQTT_CONNECT_FLAG_USERNAME;
MQTT_TRC("Encoding Username. Str:%s, Size:%08x.",
client->user_name->utf8, client->user_name->size);
err_code = pack_utf8_str(client->user_name, buf);
if (err_code != 0) {
return err_code;
}
}
/* Pack Password if any. */
if (client->password != NULL) {
connect_flags |= MQTT_CONNECT_FLAG_PASSWORD;
MQTT_TRC("Encoding Password. Str:%s Size:%08x.",
client->password->utf8, client->password->size);
err_code = pack_utf8_str(client->password, buf);
if (err_code != 0) {
return err_code;
}
}
/* Write the flags the connect flags. */
*connect_flags_pos = connect_flags;
return mqtt_encode_fixed_header(message_type, start, buf);
}
int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf)
{
const u8_t message_type = MQTT_MESSAGES_OPTIONS(
MQTT_PKT_TYPE_PUBLISH, param->dup_flag,
param->message.topic.qos, param->retain_flag);
int err_code;
u8_t *start;
/* Message id zero is not permitted by spec. */
if ((param->message.topic.qos) && (param->message_id == 0)) {
return -EINVAL;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_utf8_str(&param->message.topic.topic, buf);
if (err_code != 0) {
return err_code;
}
if (param->message.topic.qos) {
err_code = pack_uint16(param->message_id, buf);
if (err_code != 0) {
return err_code;
}
}
/* Do not copy payload. We move the buffer pointer to ensure that
* message length in fixed header is encoded correctly.
*/
buf->cur += param->message.payload.len;
err_code = mqtt_encode_fixed_header(message_type, start, buf);
if (err_code != 0) {
return err_code;
}
buf->end -= param->message.payload.len;
return 0;
}
int publish_ack_encode(const struct mqtt_puback_param *param,
struct buf_ctx *buf)
{
const u8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBACK, 0, 0, 0);
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
}
int publish_receive_encode(const struct mqtt_pubrec_param *param,
struct buf_ctx *buf)
{
const u8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREC, 0, 0, 0);
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
}
int publish_release_encode(const struct mqtt_pubrel_param *param,
struct buf_ctx *buf)
{
const u8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREL, 0, 1, 0);
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
}
int publish_complete_encode(const struct mqtt_pubcomp_param *param,
struct buf_ctx *buf)
{
const u8_t message_type =
MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBCOMP, 0, 0, 0);
return mqtt_message_id_only_enc(message_type, param->message_id, buf);
}
int disconnect_encode(struct buf_ctx *buf)
{
if (buf->end - buf->cur < sizeof(disc_packet)) {
return -ENOMEM;
}
memcpy(buf->cur, disc_packet, sizeof(disc_packet));
buf->end = buf->cur + sizeof(disc_packet);
return 0;
}
int subscribe_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
const u8_t message_type = MQTT_MESSAGES_OPTIONS(
MQTT_PKT_TYPE_SUBSCRIBE, 0, 1, 0);
int err_code, i;
u8_t *start;
/* Message id zero is not permitted by spec. */
if (param->message_id == 0) {
return -EINVAL;
}
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_uint16(param->message_id, buf);
if (err_code != 0) {
return err_code;
}
for (i = 0; i < param->list_count; i++) {
err_code = pack_utf8_str(&param->list[i].topic, buf);
if (err_code != 0) {
return err_code;
}
err_code = pack_uint8(param->list[i].qos, buf);
if (err_code != 0) {
return err_code;
}
}
return mqtt_encode_fixed_header(message_type, start, buf);
}
int unsubscribe_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf)
{
const u8_t message_type = MQTT_MESSAGES_OPTIONS(
MQTT_PKT_TYPE_UNSUBSCRIBE, 0, MQTT_QOS_1_AT_LEAST_ONCE, 0);
int err_code, i;
u8_t *start;
/* Reserve space for fixed header. */
buf->cur += MQTT_FIXED_HEADER_MAX_SIZE;
start = buf->cur;
err_code = pack_uint16(param->message_id, buf);
if (err_code != 0) {
return err_code;
}
for (i = 0; i < param->list_count; i++) {
err_code = pack_utf8_str(&param->list[i].topic, buf);
if (err_code != 0) {
return err_code;
}
}
return mqtt_encode_fixed_header(message_type, start, buf);
}
int ping_request_encode(struct buf_ctx *buf)
{
if (buf->end - buf->cur < sizeof(ping_packet)) {
return -ENOMEM;
}
memcpy(buf->cur, ping_packet, sizeof(ping_packet));
buf->end = buf->cur + sizeof(ping_packet);
return 0;
}

View file

@ -0,0 +1,399 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_internal.h
*
* @brief Function and data structures internal to MQTT module.
*/
#ifndef MQTT_INTERNAL_H_
#define MQTT_INTERNAL_H_
#include <stdint.h>
#include <string.h>
#include <net/mqtt.h>
#ifdef __cplusplus
extern "C" {
#endif
/**@brief Keep alive time for MQTT (in seconds). Sending of Ping Requests to
* keep the connection alive are governed by this value.
*/
#define MQTT_KEEPALIVE CONFIG_MQTT_KEEPALIVE
/**@brief Minimum mandatory size of fixed header. */
#define MQTT_FIXED_HEADER_MIN_SIZE 2
/**@brief Maximum size of the fixed header. Remaining length size is 4 in this
* case.
*/
#define MQTT_FIXED_HEADER_MAX_SIZE 5
/**@brief MQTT Control Packet Types. */
#define MQTT_PKT_TYPE_CONNECT 0x10
#define MQTT_PKT_TYPE_CONNACK 0x20
#define MQTT_PKT_TYPE_PUBLISH 0x30
#define MQTT_PKT_TYPE_PUBACK 0x40
#define MQTT_PKT_TYPE_PUBREC 0x50
#define MQTT_PKT_TYPE_PUBREL 0x60
#define MQTT_PKT_TYPE_PUBCOMP 0x70
#define MQTT_PKT_TYPE_SUBSCRIBE 0x80
#define MQTT_PKT_TYPE_SUBACK 0x90
#define MQTT_PKT_TYPE_UNSUBSCRIBE 0xA0
#define MQTT_PKT_TYPE_UNSUBACK 0xB0
#define MQTT_PKT_TYPE_PINGREQ 0xC0
#define MQTT_PKT_TYPE_PINGRSP 0xD0
#define MQTT_PKT_TYPE_DISCONNECT 0xE0
/**@brief Masks for MQTT header flags. */
#define MQTT_HEADER_DUP_MASK 0x08
#define MQTT_HEADER_QOS_MASK 0x06
#define MQTT_HEADER_RETAIN_MASK 0x01
/**@brief Masks for MQTT header flags. */
#define MQTT_CONNECT_FLAG_CLEAN_SESSION 0x02
#define MQTT_CONNECT_FLAG_WILL_TOPIC 0x04
#define MQTT_CONNECT_FLAG_WILL_RETAIN 0x20
#define MQTT_CONNECT_FLAG_PASSWORD 0x40
#define MQTT_CONNECT_FLAG_USERNAME 0x80
#define MQTT_CONNACK_FLAG_SESSION_PRESENT 0x01
/**@brief Maximum payload size of MQTT packet. */
#define MQTT_MAX_PAYLOAD_SIZE 0x0FFFFFFF
/**@brief Computes total size needed to pack a UTF8 string. */
#define GET_UT8STR_BUFFER_SIZE(STR) (sizeof(u16_t) + (STR)->size)
/**@brief Computes total size needed to pack a binary stream. */
#define GET_BINSTR_BUFFER_SIZE(STR) ((STR)->len)
/**@brief Sets MQTT Client's state with one indicated in 'STATE'. */
#define MQTT_SET_STATE(CLIENT, STATE) ((CLIENT)->internal.state |= (STATE))
/**@brief Sets MQTT Client's state exclusive to 'STATE'. */
#define MQTT_SET_STATE_EXCLUSIVE(CLIENT, STATE) \
((CLIENT)->internal.state = (STATE))
/**@brief Verifies if MQTT Client's state is set with one indicated in 'STATE'.
*/
#define MQTT_HAS_STATE(CLIENT, STATE) ((CLIENT)->internal.state & (STATE))
/**@brief Reset 'STATE' in MQTT Client's state. */
#define MQTT_RESET_STATE(CLIENT, STATE) ((CLIENT)->internal.state &= ~(STATE))
/**@brief Initialize MQTT Client's state. */
#define MQTT_STATE_INIT(CLIENT) ((CLIENT)->internal.state = MQTT_STATE_IDLE)
/**@brief Computes the first byte of MQTT message header based on message type,
* duplication flag, QoS and the retain flag.
*/
#define MQTT_MESSAGES_OPTIONS(TYPE, DUP, QOS, RETAIN) \
(((TYPE) & 0xF0) | \
(((DUP) << 3) & 0x08) | \
(((QOS) << 1) & 0x06) | \
((RETAIN) & 0x01))
#define MQTT_MAX_LENGTH_BYTES 4
#define MQTT_LENGTH_VALUE_MASK 0x7F
#define MQTT_LENGTH_CONTINUATION_BIT 0x80
#define MQTT_LENGTH_SHIFT 7
/**@brief Check if the input pointer is NULL, if so it returns -EINVAL. */
#define NULL_PARAM_CHECK(param) \
do { \
if ((param) == NULL) { \
return -EINVAL; \
} \
} while (0)
#define NULL_PARAM_CHECK_VOID(param) \
do { \
if ((param) == NULL) { \
return; \
} \
} while (0)
/** Buffer context to iterate over buffer. */
struct buf_ctx {
u8_t *cur;
u8_t *end;
};
/**@brief MQTT States. */
enum mqtt_state {
/** Idle state, implying the client entry in the table is unused/free.
*/
MQTT_STATE_IDLE = 0x00000000,
/** TCP Connection has been requested, awaiting result of the request.
*/
MQTT_STATE_TCP_CONNECTING = 0x00000001,
/** TCP Connection successfully established. */
MQTT_STATE_TCP_CONNECTED = 0x00000002,
/** MQTT Connection successful. */
MQTT_STATE_CONNECTED = 0x00000004,
/** TCP Disconnect has been requested, awaiting result of the request.
*/
MQTT_STATE_DISCONNECTING = 0x00000008
};
/**@brief Notify application about MQTT event.
*
* @param[in] client Identifies the client for which event occurred.
* @param[in] evt MQTT event.
*/
void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt);
/**@brief Handles MQTT messages received from the peer.
*
* @param[in] client Identifies the client for which the data was received.
* @return 0 if the procedure is successful, an error code otherwise.
*/
int mqtt_handle_rx(struct mqtt_client *client);
/**@brief Constructs/encodes Connect packet.
*
* @param[in] client Identifies the client for which the procedure is requested.
* All information required for creating the packet like
* client id, clean session flag, retain session flag etc are
* assumed to be populated for the client instance when this
* procedure is requested.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int connect_request_encode(const struct mqtt_client *client,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish packet.
*
* @param[in] param Publish message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Ack packet.
*
* @param[in] param Publish Ack message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_ack_encode(const struct mqtt_puback_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Receive packet.
*
* @param[in] param Publish Receive message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_receive_encode(const struct mqtt_pubrec_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Release packet.
*
* @param[in] param Publish Release message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_release_encode(const struct mqtt_pubrel_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Publish Complete packet.
*
* @param[in] param Publish Complete message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_complete_encode(const struct mqtt_pubcomp_param *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Disconnect packet.
*
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int disconnect_encode(struct buf_ctx *buf);
/**@brief Constructs/encodes Subscribe packet.
*
* @param[in] param Subscribe message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int subscribe_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Unsubscribe packet.
*
* @param[in] param Unsubscribe message parameters.
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int unsubscribe_encode(const struct mqtt_subscription_list *param,
struct buf_ctx *buf);
/**@brief Constructs/encodes Ping Request packet.
*
* @param[inout] buf_ctx Pointer to the buffer context structure,
* containing buffer for the encoded message.
* As output points to the beginning and end of
* the frame.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int ping_request_encode(struct buf_ctx *buf);
/**@brief Decode MQTT Packet Type and Length in the MQTT fixed header.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] type_and_flags Message type and flags.
* @param[out] length Length of variable header and payload in the MQTT message.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int fixed_header_decode(struct buf_ctx *buf, u8_t *type_and_flags,
u32_t *length);
/**@brief Decode MQTT Connect Ack packet.
*
* @param[in] client MQTT client for which packet is decoded.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Connect Ack parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
struct mqtt_connack_param *param);
/**@brief Decode MQTT Publish packet.
*
* @param[in] flags Byte containing message type and flags.
* @param[in] var_length Length of the variable part of the message.
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_decode(u8_t flags, u32_t var_length, struct buf_ctx *buf,
struct mqtt_publish_param *param);
/**@brief Decode MQTT Publish Ack packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Ack parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param);
/**@brief Decode MQTT Publish Receive packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Receive parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_receive_decode(struct buf_ctx *buf,
struct mqtt_pubrec_param *param);
/**@brief Decode MQTT Publish Release packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Release parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_release_decode(struct buf_ctx *buf,
struct mqtt_pubrel_param *param);
/**@brief Decode MQTT Publish Complete packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Publish Complete parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int publish_complete_decode(struct buf_ctx *buf,
struct mqtt_pubcomp_param *param);
/**@brief Decode MQTT Subscribe packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Subscribe parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int subscribe_ack_decode(struct buf_ctx *buf,
struct mqtt_suback_param *param);
/**@brief Decode MQTT Unsubscribe packet.
*
* @param[inout] buf A pointer to the buf_ctx structure containing current
* buffer position.
* @param[out] param Pointer to buffer for decoded Unsubscribe parameters.
*
* @return 0 if the procedure is successful, an error code otherwise.
*/
int unsubscribe_ack_decode(struct buf_ctx *buf,
struct mqtt_unsuback_param *param);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_INTERNAL_H_ */

View file

@ -0,0 +1,94 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_os.h
*
* @brief MQTT Client depends on certain OS specific functionality. The needed
* methods are mapped here and should be implemented based on OS in use.
*
* @details Memory management, mutex, logging and wall clock are the needed
* functionality for MQTT module. The needed interfaces are defined
* in the OS. OS specific port of the interface shall be provided.
*
*/
#ifndef MQTT_OS_H_
#define MQTT_OS_H_
#include <stddef.h>
#include <kernel.h>
#include <net/net_core.h>
#include "mqtt_internal.h"
#ifdef __cplusplus
extern "C" {
#endif
/**@brief Method to get trace logs from the module. */
#define MQTT_TRC(...) NET_DBG(__VA_ARGS__)
/**@brief Method to error logs from the module. */
#define MQTT_ERR(...) NET_ERR(__VA_ARGS__)
/**@brief Initialize the mutex for the module, if any.
*
* @details This method is called during module initialization @ref mqtt_init.
*/
static inline void mqtt_mutex_init(struct mqtt_client *client)
{
k_mutex_init(&client->internal.mutex);
}
/**@brief Acquire lock on the module specific mutex, if any.
*
* @details This is assumed to be a blocking method until the acquisition
* of the mutex succeeds.
*/
static inline void mqtt_mutex_lock(struct mqtt_client *client)
{
(void)k_mutex_lock(&client->internal.mutex, K_FOREVER);
}
/**@brief Release the lock on the module specific mutex, if any.
*/
static inline void mqtt_mutex_unlock(struct mqtt_client *client)
{
k_mutex_unlock(&client->internal.mutex);
}
/**@brief Method to get the sys tick or a wall clock in millisecond resolution.
*
* @retval Current wall clock or sys tick value in milliseconds.
*/
static inline u32_t mqtt_sys_tick_in_ms_get(void)
{
return k_uptime_get_32();
}
/**@brief Method to get elapsed time in milliseconds since the last activity.
*
* @param[in] last_activity The value since elapsed time is requested.
*
* @retval Time elapsed since last_activity time.
*/
static inline u32_t mqtt_elapsed_time_in_ms_get(u32_t last_activity)
{
s32_t diff = k_uptime_get_32() - last_activity;
if (diff < 0) {
return 0;
}
return diff;
}
#ifdef __cplusplus
}
#endif
#endif /* MQTT_OS_H_ */

View file

@ -0,0 +1,279 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
#define LOG_MODULE_NAME net_mqtt_rx
#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL
#include "mqtt_internal.h"
#include "mqtt_transport.h"
#include "mqtt_os.h"
/** @file mqtt_rx.c
*
* @brief MQTT Received data handling.
*/
static int mqtt_handle_packet(struct mqtt_client *client,
u8_t type_and_flags,
u32_t var_length,
struct buf_ctx *buf)
{
int err_code = 0;
bool notify_event = true;
struct mqtt_evt evt;
/* Success by default, overwritten in special cases. */
evt.result = 0;
switch (type_and_flags & 0xF0) {
case MQTT_PKT_TYPE_CONNACK:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
evt.type = MQTT_EVT_CONNACK;
err_code = connect_ack_decode(client, buf, &evt.param.connack);
if (err_code == 0) {
MQTT_TRC("[CID %p]: return_code: %d", client,
evt.param.connack.return_code);
if (evt.param.connack.return_code ==
MQTT_CONNECTION_ACCEPTED) {
/* Set state. */
MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
}
evt.result = evt.param.connack.return_code;
} else {
evt.result = err_code;
}
break;
case MQTT_PKT_TYPE_PUBLISH:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
evt.type = MQTT_EVT_PUBLISH;
err_code = publish_decode(type_and_flags, var_length, buf,
&evt.param.publish);
evt.result = err_code;
client->internal.remaining_payload =
evt.param.publish.message.payload.len;
MQTT_TRC("PUB QoS:%02x, message len %08x, topic len %08x",
evt.param.publish.message.topic.qos,
evt.param.publish.message.payload.len,
evt.param.publish.message.topic.topic.size);
break;
case MQTT_PKT_TYPE_PUBACK:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
evt.type = MQTT_EVT_PUBACK;
err_code = publish_ack_decode(buf, &evt.param.puback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBREC:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
evt.type = MQTT_EVT_PUBREC;
err_code = publish_receive_decode(buf, &evt.param.pubrec);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBREL:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
evt.type = MQTT_EVT_PUBREL;
err_code = publish_release_decode(buf, &evt.param.pubrel);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PUBCOMP:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
evt.type = MQTT_EVT_PUBCOMP;
err_code = publish_complete_decode(buf, &evt.param.pubcomp);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_SUBACK:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
evt.type = MQTT_EVT_SUBACK;
err_code = subscribe_ack_decode(buf, &evt.param.suback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_UNSUBACK:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
evt.type = MQTT_EVT_UNSUBACK;
err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback);
evt.result = err_code;
break;
case MQTT_PKT_TYPE_PINGRSP:
MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
/* No notification of Ping response to application. */
notify_event = false;
break;
default:
/* Nothing to notify. */
notify_event = false;
break;
}
if (notify_event == true) {
event_notify(client, &evt);
}
return err_code;
}
static int mqtt_read_message_chunk(struct mqtt_client *client,
struct buf_ctx *buf, u32_t length)
{
int remaining;
int len;
/* Calculate how much data we need to read from the transport,
* given the already buffered data.
*/
remaining = length - (buf->end - buf->cur);
if (remaining <= 0) {
return 0;
}
/* Check if read does not exceed the buffer. */
if (buf->end + remaining > client->rx_buf + client->rx_buf_size) {
MQTT_ERR("[CID %p]: Buffer too small to receive the message",
client);
return -ENOMEM;
}
len = mqtt_transport_read(client, buf->end, remaining);
if (len < 0) {
MQTT_TRC("[CID %p]: Transport read error: %d", client, len);
return len;
}
if (len == 0) {
MQTT_TRC("[CID %p]: Connection closed.", client);
return -ENOTCONN;
}
client->internal.rx_buf_datalen += len;
buf->end += len;
if (len < remaining) {
MQTT_TRC("[CID %p]: Message partially received.", client);
return -EAGAIN;
}
return 0;
}
static int mqtt_read_publish_var_header(struct mqtt_client *client,
u8_t type_and_flags,
struct buf_ctx *buf)
{
u8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
int err_code;
u32_t variable_header_length;
/* Read topic length field. */
err_code = mqtt_read_message_chunk(client, buf, sizeof(u16_t));
if (err_code < 0) {
return err_code;
}
variable_header_length = *buf->cur << 8; /* MSB */
variable_header_length |= *(buf->cur + 1); /* LSB */
/* Add two bytes for topic length field. */
variable_header_length += sizeof(u16_t);
/* Add two bytes for message_id, if needed. */
if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
variable_header_length += sizeof(u16_t);
}
/* Now we can read the whole header. */
err_code = mqtt_read_message_chunk(client, buf,
variable_header_length);
if (err_code < 0) {
return err_code;
}
return 0;
}
static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
u8_t *type_and_flags,
u32_t *var_length,
struct buf_ctx *buf)
{
/* Read the mandatory part of the fixed header in first iteration. */
u8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
int err_code;
do {
err_code = mqtt_read_message_chunk(client, buf, chunk_size);
if (err_code < 0) {
return err_code;
}
/* Reset to pointer to the beginning of the frame. */
buf->cur = client->rx_buf;
chunk_size = 1;
err_code = fixed_header_decode(buf, type_and_flags, var_length);
} while (err_code == -EAGAIN);
return err_code;
}
int mqtt_handle_rx(struct mqtt_client *client)
{
int err_code;
u8_t type_and_flags;
u32_t var_length;
struct buf_ctx buf;
buf.cur = client->rx_buf;
buf.end = client->rx_buf + client->internal.rx_buf_datalen;
err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
&var_length, &buf);
if (err_code < 0) {
return (err_code == -EAGAIN) ? 0 : err_code;
}
if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
err_code = mqtt_read_publish_var_header(client, type_and_flags,
&buf);
} else {
err_code = mqtt_read_message_chunk(client, &buf, var_length);
}
if (err_code < 0) {
return (err_code == -EAGAIN) ? 0 : err_code;
}
/* At this point, packet is ready to be passed to the application. */
err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
if (err_code < 0) {
return err_code;
}
client->internal.rx_buf_datalen = 0;
return 0;
}

View file

@ -0,0 +1,52 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_transport.c
*
* @brief Internal functions to handle transport in MQTT module.
*/
#include "mqtt_transport.h"
/* Transport handler functions for TCP socket transport. */
extern int mqtt_client_tcp_connect(struct mqtt_client *client);
extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
u32_t buflen);
extern int mqtt_client_tcp_disconnect(struct mqtt_client *client);
/**@brief Function pointer array for TCP/TLS transport handlers. */
const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
{
mqtt_client_tcp_connect,
mqtt_client_tcp_write,
mqtt_client_tcp_read,
mqtt_client_tcp_disconnect,
}
};
int mqtt_transport_connect(struct mqtt_client *client)
{
return transport_fn[client->transport.type].connect(client);
}
int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen)
{
return transport_fn[client->transport.type].write(client, data,
datalen);
}
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen)
{
return transport_fn[client->transport.type].read(client, data, buflen);
}
int mqtt_transport_disconnect(struct mqtt_client *client)
{
return transport_fn[client->transport.type].disconnect(client);
}

View file

@ -0,0 +1,100 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_transport.h
*
* @brief Internal functions to handle transport in MQTT module.
*/
#ifndef MQTT_TRANSPORT_H_
#define MQTT_TRANSPORT_H_
#include <net/mqtt.h>
#ifdef __cplusplus
extern "C" {
#endif
/**@brief Transport for handling transport connect procedure. */
typedef int (*transport_connect_handler_t)(struct mqtt_client *client);
/**@brief Transport write handler. */
typedef int (*transport_write_handler_t)(struct mqtt_client *client,
const u8_t *data, u32_t datalen);
/**@brief Transport read handler. */
typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data,
u32_t buflen);
/**@brief Transport disconnect handler. */
typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client);
/**@brief Transport procedure handlers. */
struct transport_procedure {
/** Transport connect handler. Handles TCP connection callback based on
* type of transport.
*/
transport_connect_handler_t connect;
/** Transport write handler. Handles transport write based on type of
* transport.
*/
transport_write_handler_t write;
/** Transport read handler. Handles transport read based on type of
* transport.
*/
transport_read_handler_t read;
/** Transport disconnect handler. Handles transport disconnection based
* on type of transport.
*/
transport_disconnect_handler_t disconnect;
};
/**@brief Handles TCP Connection Complete for configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_transport_connect(struct mqtt_client *client);
/**@brief Handles write requests on configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Data to be written on the transport.
* @param[in] datalen Length of data to be written on the transport.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
/**@brief Handles read requests on configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation.
*
* @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed.
*/
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen);
/**@brief Handles transport disconnection requests on configured transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_transport_disconnect(struct mqtt_client *client);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_TRANSPORT_H_ */

View file

@ -0,0 +1,123 @@
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_transport_socket_tcp.h
*
* @brief Internal functions to handle transport over TCP socket.
*/
#define LOG_MODULE_NAME net_mqtt_sock_tcp
#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL
#include <errno.h>
#include <net/socket.h>
#include <net/mqtt.h>
#include "mqtt_os.h"
/**@brief Handles connect request for TCP socket transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_client_tcp_connect(struct mqtt_client *client)
{
const struct sockaddr *broker = client->broker;
int ret;
client->transport.tcp.sock = socket(broker->sa_family, SOCK_STREAM,
IPPROTO_TCP);
if (client->transport.tcp.sock < 0) {
return -errno;
}
MQTT_TRC("Created socket %d", client->transport.tcp.sock);
size_t peer_addr_size = sizeof(struct sockaddr_in6);
if (broker->sa_family == AF_INET) {
peer_addr_size = sizeof(struct sockaddr_in);
}
ret = connect(client->transport.tcp.sock, client->broker,
peer_addr_size);
if (ret < 0) {
(void)close(client->transport.tcp.sock);
return -errno;
}
MQTT_TRC("Connect completed");
return 0;
}
/**@brief Handles write requests on TCP socket transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Data to be written on the transport.
* @param[in] datalen Length of data to be written on the transport.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen)
{
u32_t offset = 0;
int ret;
while (offset < datalen) {
ret = send(client->transport.tcp.sock, data + offset,
datalen - offset, 0);
if (ret < 0) {
return -errno;
}
offset += ret;
}
return 0;
}
/**@brief Handles read requests on TCP socket transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation.
*
* @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed.
*/
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen)
{
int ret;
ret = recv(client->transport.tcp.sock, data, buflen, MSG_DONTWAIT);
if (ret < 0) {
return -errno;
}
return ret;
}
/**@brief Handles transport disconnection requests on TCP socket transport.
*
* @param[in] client Identifies the client on which the procedure is requested.
*
* @retval 0 or an error code indicating reason for failure.
*/
int mqtt_client_tcp_disconnect(struct mqtt_client *client)
{
int ret;
MQTT_TRC("Closing socket %d", client->transport.tcp.sock);
ret = close(client->transport.tcp.sock);
if (ret < 0) {
return -errno;
}
return 0;
}