From 37563a92d5d85e2b984d3ad5c33322ee4b689549 Mon Sep 17 00:00:00 2001 From: Robert Lubos Date: Wed, 27 Jun 2018 10:41:51 +0200 Subject: [PATCH] net: mqtt: Add BSD socket implementation Add new, socket based MQTT implementation, based on MQTT from Nordic nRF5 SDK, introducing the following features: * transport independent MQTT logic, with support for multiple transports * support for multiple MQTT versions (3.1.0 and 3.1.1 supported) * single event handler - no need to keep callback array in RAM * automatic send of Ping Requests, for connection keep-alive * message/event parameters wrapped into strucutres - easier extension for future MQTT versions * no separate thread needed to run MQTT - application only needs to call mqtt_input and mqtt_live periodically Signed-off-by: Robert Lubos --- include/net/mqtt.h | 648 ++++++++++++++++++ subsys/net/lib/CMakeLists.txt | 1 + subsys/net/lib/Kconfig | 2 + subsys/net/lib/mqtt_sock/CMakeLists.txt | 10 + subsys/net/lib/mqtt_sock/Kconfig | 30 + subsys/net/lib/mqtt_sock/mqtt.c | 639 +++++++++++++++++ subsys/net/lib/mqtt_sock/mqtt_decoder.c | 298 ++++++++ subsys/net/lib/mqtt_sock/mqtt_encoder.c | 568 +++++++++++++++ subsys/net/lib/mqtt_sock/mqtt_internal.h | 399 +++++++++++ subsys/net/lib/mqtt_sock/mqtt_os.h | 94 +++ subsys/net/lib/mqtt_sock/mqtt_rx.c | 279 ++++++++ subsys/net/lib/mqtt_sock/mqtt_transport.c | 52 ++ subsys/net/lib/mqtt_sock/mqtt_transport.h | 100 +++ .../lib/mqtt_sock/mqtt_transport_socket_tcp.c | 123 ++++ 14 files changed, 3243 insertions(+) create mode 100644 include/net/mqtt.h create mode 100644 subsys/net/lib/mqtt_sock/CMakeLists.txt create mode 100644 subsys/net/lib/mqtt_sock/Kconfig create mode 100644 subsys/net/lib/mqtt_sock/mqtt.c create mode 100644 subsys/net/lib/mqtt_sock/mqtt_decoder.c create mode 100644 subsys/net/lib/mqtt_sock/mqtt_encoder.c create mode 100644 subsys/net/lib/mqtt_sock/mqtt_internal.h create mode 100644 subsys/net/lib/mqtt_sock/mqtt_os.h create mode 100644 subsys/net/lib/mqtt_sock/mqtt_rx.c create mode 100644 subsys/net/lib/mqtt_sock/mqtt_transport.c create mode 100644 subsys/net/lib/mqtt_sock/mqtt_transport.h create mode 100644 subsys/net/lib/mqtt_sock/mqtt_transport_socket_tcp.c diff --git a/include/net/mqtt.h b/include/net/mqtt.h new file mode 100644 index 00000000000..b6de19014dd --- /dev/null +++ b/include/net/mqtt.h @@ -0,0 +1,648 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt.h + * + * @defgroup mqtt_socket MQTT Client library + * @ingroup networking + * @{ + * @brief MQTT Client Implementation + * + * @details + * MQTT Client's Application interface is defined in this header. + * + * @note The implementation assumes TCP module is enabled. + * + * @note By default the implementation uses MQTT version 3.1.1. + */ + +#ifndef ZEPHYR_INCLUDE_NET_MQTT_H_ +#define ZEPHYR_INCLUDE_NET_MQTT_H_ + +#include + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief MQTT Asynchronous Events notified to the application from the module + * through the callback registered by the application. + */ +enum mqtt_evt_type { + /** Acknowledgment of connection request. Event result accompanying + * the event indicates whether the connection failed or succeeded. + */ + MQTT_EVT_CONNACK, + + /** Disconnection Event. MQTT Client Reference is no longer valid once + * this event is received for the client. + */ + MQTT_EVT_DISCONNECT, + + /** Publish event received when message is published on a topic client + * is subscribed to. + * + * @note PUBLISH event structure only contains payload size, the payload + * data parameter should be ignored. Payload content has to be + * read manually with @ref mqtt_read_publish_payload function. + */ + MQTT_EVT_PUBLISH, + + /** Acknowledgment for published message with QoS 1. */ + MQTT_EVT_PUBACK, + + /** Reception confirmation for published message with QoS 2. */ + MQTT_EVT_PUBREC, + + /** Release of published message with QoS 2. */ + MQTT_EVT_PUBREL, + + /** Confirmation to a publish release message with QoS 2. */ + MQTT_EVT_PUBCOMP, + + /** Acknowledgment to a subscribe request. */ + MQTT_EVT_SUBACK, + + /** Acknowledgment to a unsubscribe request. */ + MQTT_EVT_UNSUBACK +}; + +/** @brief MQTT version protocol level. */ +enum mqtt_version { + MQTT_VERSION_3_1_0 = 3, /**< Protocol level for 3.1.0. */ + MQTT_VERSION_3_1_1 = 4 /**< Protocol level for 3.1.1. */ +}; + +/** @brief MQTT Quality of Service types. */ +enum mqtt_qos { + /** Lowest Quality of Service, no acknowledgment needed for published + * message. + */ + MQTT_QOS_0_AT_MOST_ONCE = 0x00, + + /** Medium Quality of Service, if acknowledgment expected for published + * message, duplicate messages permitted. + */ + MQTT_QOS_1_AT_LEAST_ONCE = 0x01, + + /** Highest Quality of Service, acknowledgment expected and message + * shall be published only once. Message not published to interested + * parties unless client issues a PUBREL. + */ + MQTT_QOS_2_EXACTLY_ONCE = 0x02 +}; + +/** @brief MQTT CONNACK return codes. */ +enum mqtt_conn_return_code { + /** Connection accepted. */ + MQTT_CONNECTION_ACCEPTED = 0x00, + + /** The Server does not support the level of the MQTT protocol + * requested by the Client. + */ + MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 0x01, + + /** The Client identifier is correct UTF-8 but not allowed by the + * Server. + */ + MQTT_IDENTIFIER_REJECTED = 0x02, + + /** The Network Connection has been made but the MQTT service is + * unavailable. + */ + MQTT_SERVER_UNAVAILABLE = 0x03, + + /** The data in the user name or password is malformed. */ + MQTT_BAD_USER_NAME_OR_PASSWORD = 0x04, + + /** The Client is not authorized to connect. */ + MQTT_NOT_AUTHORIZED = 0x05 +}; + +/** @brief MQTT SUBACK return codes. */ +enum mqtt_suback_return_code { + /** Subscription with QoS 0 succeeded. */ + MQTT_SUBACK_SUCCESS_QoS_0 = 0x00, + + /** Subscription with QoS 1 succeeded. */ + MQTT_SUBACK_SUCCESS_QoS_1 = 0x01, + + /** Subscription with QoS 2 succeeded. */ + MQTT_SUBACK_SUCCESS_QoS_2 = 0x02, + + /** Subscription for a topic failed. */ + MQTT_SUBACK_FAILURE = 0x80 +}; + +/** @brief Abstracts UTF-8 encoded strings. */ +struct mqtt_utf8 { + u8_t *utf8; /**< Pointer to UTF-8 string. */ + u32_t size; /**< Size of UTF string, in bytes. */ +}; + +/** @brief Abstracts binary strings. */ +struct mqtt_binstr { + u8_t *data; /**< Pointer to binary stream. */ + u32_t len; /**< Length of binary stream. */ +}; + +/** @brief Abstracts MQTT UTF-8 encoded topic that can be subscribed + * to or published. + */ +struct mqtt_topic { + /** Topic on to be published or subscribed to. */ + struct mqtt_utf8 topic; + + /** Quality of service requested for the subscription. + * @ref mqtt_qos for details. + */ + u8_t qos; +}; + +/** @brief Parameters for a publish message. */ +struct mqtt_publish_message { + struct mqtt_topic topic; /**< Topic on which data was published. */ + struct mqtt_binstr payload; /**< Payload on the topic published. */ +}; + +/** @brief Parameters for a connection acknowledgment (CONNACK). */ +struct mqtt_connack_param { + /** The Session Present flag enables a Client to establish whether + * the Client and Server have a consistent view about whether there + * is already stored Session state. + */ + u8_t session_present_flag; + + /** The appropriate non-zero Connect return code indicates if the Server + * is unable to process a connection request for some reason. + */ + enum mqtt_conn_return_code return_code; +}; + +/** @brief Parameters for MQTT publish acknowledgment (PUBACK). */ +struct mqtt_puback_param { + u16_t message_id; +}; + +/** @brief Parameters for MQTT publish receive (PUBREC). */ +struct mqtt_pubrec_param { + u16_t message_id; +}; + +/** @brief Parameters for MQTT publish release (PUBREL). */ +struct mqtt_pubrel_param { + u16_t message_id; +}; + +/** @brief Parameters for MQTT publish complete (PUBCOMP). */ +struct mqtt_pubcomp_param { + u16_t message_id; +}; + +/** @brief Parameters for MQTT subscription acknowledgment (SUBACK). */ +struct mqtt_suback_param { + u16_t message_id; + struct mqtt_binstr return_codes; +}; + +/** @brief Parameters for MQTT unsubscribe acknowledgment (UNSUBACK). */ +struct mqtt_unsuback_param { + u16_t message_id; +}; + +/** @brief Parameters for a publish message. */ +struct mqtt_publish_param { + /** Messages including topic, QoS and its payload (if any) + * to be published. + */ + struct mqtt_publish_message message; + + /** Message id used for the publish message. Redundant for QoS 0. */ + u16_t message_id; + + /** Duplicate flag. If 1, it indicates the message is being + * retransmitted. Has no meaning with QoS 0. + */ + u8_t dup_flag : 1; + + /** Retain flag. If 1, the message shall be stored persistently + * by the broker. + */ + u8_t retain_flag : 1; +}; + +/** @brief List of topics in a subscription request. */ +struct mqtt_subscription_list { + /** Array containing topics along with QoS for each. */ + struct mqtt_topic *list; + + /** Number of topics in the subscription list */ + u16_t list_count; + + /** Message id used to identify subscription request. */ + u16_t message_id; +}; + +/** + * @brief Defines event parameters notified along with asynchronous events + * to the application. + */ +union mqtt_evt_param { + /** Parameters accompanying MQTT_EVT_CONNACK event. */ + struct mqtt_connack_param connack; + + /** Parameters accompanying MQTT_EVT_PUBLISH event. + * + * @note PUBLISH event structure only contains payload size, the payload + * data parameter should be ignored. Payload content has to be + * read manually with @ref mqtt_read_publish_payload function. + */ + struct mqtt_publish_param publish; + + /** Parameters accompanying MQTT_EVT_PUBACK event. */ + struct mqtt_puback_param puback; + + /** Parameters accompanying MQTT_EVT_PUBREC event. */ + struct mqtt_pubrec_param pubrec; + + /** Parameters accompanying MQTT_EVT_PUBREL event. */ + struct mqtt_pubrel_param pubrel; + + /** Parameters accompanying MQTT_EVT_PUBCOMP event. */ + struct mqtt_pubcomp_param pubcomp; + + /** Parameters accompanying MQTT_EVT_SUBACK event. */ + struct mqtt_suback_param suback; + + /** Parameters accompanying MQTT_EVT_UNSUBACK event. */ + struct mqtt_unsuback_param unsuback; +}; + +/** @brief Defines MQTT asynchronous event notified to the application. */ +struct mqtt_evt { + /** Identifies the event. */ + enum mqtt_evt_type type; + + /** Contains parameters (if any) accompanying the event. */ + union mqtt_evt_param param; + + /** Event result. 0 or a negative error code (errno.h) indicating + * reason of failure. + */ + int result; +}; + +struct mqtt_client; + +/** + * @brief Asynchronous event notification callback registered by the + * application. + * + * @param[in] client Identifies the client for which the event is notified. + * @param[in] evt Event description along with result and associated + * parameters (if any). + */ +typedef void (*mqtt_evt_cb_t)(struct mqtt_client *client, + const struct mqtt_evt *evt); + +/** @brief MQTT transport type. */ +enum mqtt_transport_type { + /** Use non secure TCP transport for MQTT connection. */ + MQTT_TRANSPORT_NON_SECURE = 0x00, + + /** Use secure TCP transport (TLS) for MQTT connection. */ + MQTT_TRANSPORT_SECURE = 0x01, + + /** Shall not be used as a transport type. + * Indicator of maximum transport types possible. + */ + MQTT_TRANSPORT_NUM = 0x02 +}; + +/** @brief MQTT transport specific data. */ +struct mqtt_transport { + /** Transport type selection for client instance. + * @ref mqtt_transport_type for possible values. MQTT_TRANSPORT_MAX + * is not a valid type. + */ + enum mqtt_transport_type type; + + union { + /* TCP socket transport for MQTT */ + struct { + /** Socket descriptor. */ + int sock; + } tcp; + }; +}; + +/** @brief MQTT internal state. */ +struct mqtt_internal { + /** Internal. Mutex to protect access to the client instance. */ + struct k_mutex mutex; + + /** Internal. Wall clock value (in milliseconds) of the last activity + * that occurred. Needed for periodic PING. + */ + u32_t last_activity; + + /** Internal. Client's state in the connection. */ + u32_t state; + + /** Internal. Packet length read so far. */ + u32_t rx_buf_datalen; + + /** Internal. Remaining payload length to read. */ + u32_t remaining_payload; +}; + +/** + * @brief MQTT Client definition to maintain information relevant to the + * client. + */ +struct mqtt_client { + /** MQTT client internal state. */ + struct mqtt_internal internal; + + /** MQTT transport configuration and data. */ + struct mqtt_transport transport; + + /** Unique client identification to be used for the connection. */ + struct mqtt_utf8 client_id; + + /** Broker details, for example, address, port. Address type should + * be compatible with transport used. + */ + const void *broker; + + /** User name (if any) to be used for the connection. NULL indicates + * no user name. + */ + struct mqtt_utf8 *user_name; + + /** Password (if any) to be used for the connection. Note that if + * password is provided, user name shall also be provided. NULL + * indicates no password. + */ + struct mqtt_utf8 *password; + + /** Will topic and QoS. Can be NULL. */ + struct mqtt_topic *will_topic; + + /** Will message. Can be NULL. Non NULL value valid only if will topic + * is not NULL. + */ + struct mqtt_utf8 *will_message; + + /** Application callback registered with the module to get MQTT events. + */ + mqtt_evt_cb_t evt_cb; + + /** Receive buffer used for MQTT packet reception in RX path. */ + u8_t *rx_buf; + + /** Size of receive buffer. */ + u32_t rx_buf_size; + + /** Transmit buffer used for creating MQTT packet in TX path. */ + u8_t *tx_buf; + + /** Size of transmit buffer. */ + u32_t tx_buf_size; + + /** MQTT protocol version. */ + u8_t protocol_version; + + /** Will retain flag, 1 if will message shall be retained persistently. + */ + u8_t will_retain : 1; + + /** Clean session flag indicating a fresh (1) or a retained session (0). + * Default is 1. + */ + u8_t clean_session : 1; +}; + +/** + * @brief Initializes the client instance. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * + * @note Shall be called to initialize client structure, before setting any + * client parameters and before connecting to broker. + */ +void mqtt_client_init(struct mqtt_client *client); + +/** + * @brief API to request new MQTT client connection. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * + * @note This memory is assumed to be resident until mqtt_disconnect is called. + * @note Any subsequent changes to parameters like broker address, user name, + * device id, etc. have no effect once MQTT connection is established. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + * + * @note Default protocol revision used for connection request is 3.1.1. Please + * set client.protocol_version = MQTT_VERSION_3_1_0 to use protocol 3.1.0. + * @note Please modify :option:`CONFIG_MQTT_KEEPALIVE` time to override default + * of 1 minute. + */ +int mqtt_connect(struct mqtt_client *client); + +/** + * @brief API to publish messages on topics. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[in] param Parameters to be used for the publish message. + * Shall not be NULL. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_publish(struct mqtt_client *client, + const struct mqtt_publish_param *param); + +/** + * @brief API used by client to send acknowledgment on receiving QoS1 publish + * message. Should be called on reception of @ref MQTT_EVT_PUBLISH with + * QoS level @ref MQTT_QOS_1_AT_LEAST_ONCE. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[in] param Identifies message being acknowledged. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_publish_qos1_ack(struct mqtt_client *client, + const struct mqtt_puback_param *param); + +/** + * @brief API used by client to send acknowledgment on receiving QoS2 publish + * message. Should be called on reception of @ref MQTT_EVT_PUBLISH with + * QoS level @ref MQTT_QOS_2_EXACTLY_ONCE. + * + * @param[in] client Identifies client instance for which the procedure is + * requested. Shall not be NULL. + * @param[in] param Identifies message being acknowledged. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_publish_qos2_receive(struct mqtt_client *client, + const struct mqtt_pubrec_param *param); + +/** + * @brief API used by client to request release of QoS2 publish message. + * Should be called on reception of @ref MQTT_EVT_PUBREC. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[in] param Identifies message being released. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_publish_qos2_release(struct mqtt_client *client, + const struct mqtt_pubrel_param *param); + +/** + * @brief API used by client to send acknowledgment on receiving QoS2 publish + * release message. Should be called on reception of + * @ref MQTT_EVT_PUBREL. + * + * @param[in] client Identifies client instance for which the procedure is + * requested. Shall not be NULL. + * @param[in] param Identifies message being completed. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_publish_qos2_complete(struct mqtt_client *client, + const struct mqtt_pubcomp_param *param); + +/** + * @brief API to request subscription of one or more topics on the connection. + * + * @param[in] client Identifies client instance for which the procedure + * is requested. Shall not be NULL. + * @param[in] param Subscription parameters. Shall not be NULL. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_subscribe(struct mqtt_client *client, + const struct mqtt_subscription_list *param); + +/** + * @brief API to request unsubscribtion of one or more topics on the connection. + * + * @param[in] client Identifies client instance for which the procedure is + * requested. Shall not be NULL. + * @param[in] param Parameters describing topics being unsubscribed from. + * Shall not be NULL. + * + * @note QoS included in topic description is unused in this API. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_unsubscribe(struct mqtt_client *client, + const struct mqtt_subscription_list *param); + +/** + * @brief API to send MQTT ping. The use of this API is optional, as the library + * handles the connection keep-alive on it's own, see @ref mqtt_live. + * + * @param[in] client Identifies client instance for which procedure is + * requested. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_ping(struct mqtt_client *client); + +/** + * @brief API to disconnect MQTT connection. + * + * @param[in] client Identifies client instance for which procedure is + * requested. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_disconnect(struct mqtt_client *client); + +/** + * @brief API to abort MQTT connection. This will close the corresponding + * transport without closing the connection gracefully at the MQTT level + * (with disconnect message). + * + * @param[in] client Identifies client instance for which procedure is + * requested. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_abort(struct mqtt_client *client); + +/** + * @brief This API should be called periodically for the client to be able + * to keep the connection alive by sending Ping Requests if need be. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * + * @note Application shall ensure that the periodicity of calling this function + * makes it possible to respect the Keep Alive time agreed with the + * broker on connection. @ref mqtt_connect for details on Keep Alive + * time. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_live(struct mqtt_client *client); + +/** + * @brief Receive an incoming MQTT packet. The registered callback will be + * called with the packet content. + * + * @note In case of PUBLISH message, the payload has to be read separately with + * @ref mqtt_read_publish_payload function. The size of the payload to + * read is provided in the publish event structure. + * + * @note This is a non-blocking call. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * + * @return 0 or a negative error code (errno.h) indicating reason of failure. + */ +int mqtt_input(struct mqtt_client *client); + +/** + * @brief Read the payload of the received PUBLISH message. This function should + * be called within the MQTT event handler, when MQTT PUBLISH message is + * notified. + * + * @note This is a non-blocking call. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[out] buffer Buffer where payload should be stored. + * @param[in] length Length of the buffer, in bytes. + * + * @return Number of bytes read or a negative error code (errno.h) indicating + * reason of failure. + */ +int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, + size_t length); + +#ifdef __cplusplus +} +#endif + +#endif /* ZEPHYR_INCLUDE_NET_MQTT_H_ */ + +/**@} */ diff --git a/subsys/net/lib/CMakeLists.txt b/subsys/net/lib/CMakeLists.txt index c5522492a3c..39a0f40b354 100644 --- a/subsys/net/lib/CMakeLists.txt +++ b/subsys/net/lib/CMakeLists.txt @@ -4,6 +4,7 @@ add_subdirectory_if_kconfig(lwm2m) add_subdirectory_if_kconfig(sntp) add_subdirectory_ifdef(CONFIG_DNS_RESOLVER dns) add_subdirectory_ifdef(CONFIG_MQTT_LEGACY_LIB mqtt) +add_subdirectory_ifdef(CONFIG_MQTT_LIB mqtt_sock) add_subdirectory_ifdef(CONFIG_NET_APP app) add_subdirectory_ifdef(CONFIG_NET_CONFIG_SETTINGS config) add_subdirectory_ifdef(CONFIG_NET_SOCKETS sockets) diff --git a/subsys/net/lib/Kconfig b/subsys/net/lib/Kconfig index 153ce0b2b82..94e4f377e33 100644 --- a/subsys/net/lib/Kconfig +++ b/subsys/net/lib/Kconfig @@ -14,6 +14,8 @@ source "subsys/net/lib/dns/Kconfig" source "subsys/net/lib/mqtt/Kconfig" +source "subsys/net/lib/mqtt_sock/Kconfig" + source "subsys/net/lib/http/Kconfig" source "subsys/net/lib/lwm2m/Kconfig" diff --git a/subsys/net/lib/mqtt_sock/CMakeLists.txt b/subsys/net/lib/mqtt_sock/CMakeLists.txt new file mode 100644 index 00000000000..e5ac7db0d71 --- /dev/null +++ b/subsys/net/lib/mqtt_sock/CMakeLists.txt @@ -0,0 +1,10 @@ +zephyr_library() + +zephyr_library_sources( + mqtt_decoder.c + mqtt_encoder.c + mqtt_rx.c + mqtt_transport_socket_tcp.c + mqtt_transport.c + mqtt.c + ) diff --git a/subsys/net/lib/mqtt_sock/Kconfig b/subsys/net/lib/mqtt_sock/Kconfig new file mode 100644 index 00000000000..3b1342ed184 --- /dev/null +++ b/subsys/net/lib/mqtt_sock/Kconfig @@ -0,0 +1,30 @@ +# Kconfig - Socket MQTT Library for Zephyr +# +# Copyright (c) 2018 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 +# + +config MQTT_LIB + bool "Socket MQTT Library Support" + select NET_SOCKETS + select NET_SOCKETS_POSIX_NAMES + help + Enable the Zephyr MQTT Library + +if MQTT_LIB + +module=MQTT +module-dep=NET_LOG +module-str=Log level for MQTT +module-help=Enables mqtt debug messages. +source "subsys/net/Kconfig.template.log_config.net" + +config MQTT_KEEPALIVE + int "Maximum number of clients Keep alive time for MQTT (in seconds)" + default 60 + help + Keep alive time for MQTT (in seconds). Sending of Ping Requests to + keep the connection alive are governed by this value. + +endif # MQTT_LIB diff --git a/subsys/net/lib/mqtt_sock/mqtt.c b/subsys/net/lib/mqtt_sock/mqtt.c new file mode 100644 index 00000000000..d1d947517d2 --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt.c @@ -0,0 +1,639 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt.c + * + * @brief MQTT Client API Implementation. + */ + +#define LOG_MODULE_NAME net_mqtt +#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL + +#include + +#include "mqtt_transport.h" +#include "mqtt_internal.h" +#include "mqtt_os.h" + +static void client_reset(struct mqtt_client *client) +{ + MQTT_STATE_INIT(client); + + client->internal.last_activity = 0; + client->internal.rx_buf_datalen = 0; + client->internal.remaining_payload = 0; +} + +/** @brief Initialize tx buffer. */ +static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf) +{ + memset(client->tx_buf, 0, client->tx_buf_size); + buf->cur = client->tx_buf; + buf->end = client->tx_buf + client->tx_buf_size; +} + +/**@brief Notifies disconnection event to the application. + * + * @param[in] client Identifies the client for which the procedure is requested. + * @param[in] result Reason for disconnection. + */ +static void disconnect_event_notify(struct mqtt_client *client, int result) +{ + struct mqtt_evt evt; + + /* Determine appropriate event to generate. */ + if (MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED) || + MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) { + evt.type = MQTT_EVT_DISCONNECT; + evt.result = result; + } else { + evt.type = MQTT_EVT_CONNACK; + evt.result = -ECONNREFUSED; + } + + /* Notify application. */ + event_notify(client, &evt); + + /* Reset internal state. */ + client_reset(client); +} + +void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt) +{ + if (client->evt_cb != NULL) { + mqtt_mutex_unlock(client); + + client->evt_cb(client, evt); + + mqtt_mutex_lock(client); + } +} + +static void client_disconnect(struct mqtt_client *client, int result) +{ + int err_code; + + err_code = mqtt_transport_disconnect(client); + if (err_code < 0) { + MQTT_ERR("Failed to disconnect transport!"); + } + + disconnect_event_notify(client, result); +} + +static int client_connect(struct mqtt_client *client) +{ + int err_code; + struct buf_ctx packet; + + err_code = mqtt_transport_connect(client); + if (err_code < 0) { + return err_code; + } + + tx_buf_init(client, &packet); + MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED); + + err_code = connect_request_encode(client, &packet); + if (err_code < 0) { + goto error; + } + + /* Send MQTT identification message to broker. */ + err_code = mqtt_transport_write(client, packet.cur, + packet.end - packet.cur); + if (err_code < 0) { + goto error; + } + + client->internal.last_activity = mqtt_sys_tick_in_ms_get(); + + MQTT_TRC("Connect completed"); + + return 0; + +error: + client_disconnect(client, err_code); + return err_code; +} + +static int client_read(struct mqtt_client *client) +{ + int err_code; + + if (client->internal.remaining_payload > 0) { + return -EBUSY; + } + + err_code = mqtt_handle_rx(client); + if (err_code < 0) { + client_disconnect(client, err_code); + } + + return err_code; +} + +static int client_write(struct mqtt_client *client, const u8_t *data, + u32_t datalen) +{ + int err_code; + + MQTT_TRC("[%p]: Transport writing %d bytes.", client, datalen); + + err_code = mqtt_transport_write(client, data, datalen); + if (err_code < 0) { + MQTT_TRC("TCP write failed, errno = %d, " + "closing connection", errno); + client_disconnect(client, err_code); + return err_code; + } + + MQTT_TRC("[%p]: Transport write complete.", client); + client->internal.last_activity = mqtt_sys_tick_in_ms_get(); + + return 0; +} + +void mqtt_client_init(struct mqtt_client *client) +{ + NULL_PARAM_CHECK_VOID(client); + + memset(client, 0, sizeof(*client)); + + MQTT_STATE_INIT(client); + mqtt_mutex_init(client); + + client->protocol_version = MQTT_VERSION_3_1_1; + client->clean_session = 1; +} + +int mqtt_connect(struct mqtt_client *client) +{ + int err_code; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(client->client_id.utf8); + + mqtt_mutex_lock(client); + + if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) { + err_code = -ENOMEM; + goto error; + } + + err_code = client_connect(client); + if (err_code < 0) { + err_code = -ECONNREFUSED; + goto error; + } + + return 0; + +error: + client_reset(client); + mqtt_mutex_unlock(client); + return err_code; +} + +static int verify_tx_state(const struct mqtt_client *client) +{ + if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { + return -ENOTCONN; + } + + return 0; +} + +int mqtt_publish(struct mqtt_client *client, + const struct mqtt_publish_param *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> Topic size 0x%08x, " + "Data size 0x%08x", client, client->internal.state, + param->message.topic.topic.size, + param->message.payload.len); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = publish_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, param->message.payload.data, + param->message.payload.len); + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_publish_qos1_ack(struct mqtt_client *client, + const struct mqtt_puback_param *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", + client, client->internal.state, param->message_id); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = publish_ack_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_publish_qos2_receive(struct mqtt_client *client, + const struct mqtt_pubrec_param *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", + client, client->internal.state, param->message_id); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = publish_receive_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_publish_qos2_release(struct mqtt_client *client, + const struct mqtt_pubrel_param *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", + client, client->internal.state, param->message_id); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = publish_release_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_publish_qos2_complete(struct mqtt_client *client, + const struct mqtt_pubcomp_param *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", + client, client->internal.state, param->message_id); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = publish_complete_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + if (err_code < 0) { + goto error; + } + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_disconnect(struct mqtt_client *client) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = disconnect_encode(&packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + if (err_code < 0) { + goto error; + } + + MQTT_SET_STATE_EXCLUSIVE(client, MQTT_STATE_DISCONNECTING); + +error: + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_subscribe(struct mqtt_client *client, + const struct mqtt_subscription_list *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + MQTT_TRC("[CID %p]:[State 0x%02x]: >> message id 0x%04x " + "topic count 0x%04x", client, client->internal.state, + param->message_id, param->list_count); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = subscribe_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", + client, client->internal.state, err_code); + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_unsubscribe(struct mqtt_client *client, + const struct mqtt_subscription_list *param) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + NULL_PARAM_CHECK(param); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = unsubscribe_encode(param, &packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_ping(struct mqtt_client *client) +{ + int err_code; + struct buf_ctx packet; + + NULL_PARAM_CHECK(client); + + mqtt_mutex_lock(client); + + tx_buf_init(client, &packet); + + err_code = verify_tx_state(client); + if (err_code < 0) { + goto error; + } + + err_code = ping_request_encode(&packet); + if (err_code < 0) { + goto error; + } + + err_code = client_write(client, packet.cur, packet.end - packet.cur); + +error: + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_abort(struct mqtt_client *client) +{ + mqtt_mutex_lock(client); + + NULL_PARAM_CHECK(client); + + if (client->internal.state != MQTT_STATE_IDLE) { + client_disconnect(client, -ECONNABORTED); + } + + mqtt_mutex_unlock(client); + + return 0; +} + +int mqtt_live(struct mqtt_client *client) +{ + u32_t elapsed_time; + + NULL_PARAM_CHECK(client); + + mqtt_mutex_lock(client); + + if (MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) { + client_disconnect(client, 0); + } else { + elapsed_time = mqtt_elapsed_time_in_ms_get( + client->internal.last_activity); + + if ((MQTT_KEEPALIVE > 0) && + (elapsed_time >= (MQTT_KEEPALIVE * 1000))) { + (void)mqtt_ping(client); + } + } + + mqtt_mutex_unlock(client); + + return 0; +} + +int mqtt_input(struct mqtt_client *client) +{ + int err_code = 0; + + NULL_PARAM_CHECK(client); + + mqtt_mutex_lock(client); + + MQTT_TRC("state:0x%08x", client->internal.state); + + if (MQTT_HAS_STATE(client, MQTT_STATE_DISCONNECTING)) { + client_disconnect(client, 0); + } else if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) { + err_code = client_read(client); + } else { + err_code = -EACCES; + } + + mqtt_mutex_unlock(client); + + return err_code; +} + +int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, + size_t length) +{ + int ret; + + NULL_PARAM_CHECK(client); + + mqtt_mutex_lock(client); + + if (client->internal.remaining_payload == 0) { + ret = 0; + goto exit; + } + + if (client->internal.remaining_payload < length) { + length = client->internal.remaining_payload; + } + + ret = mqtt_transport_read(client, buffer, length); + if (ret == -EAGAIN) { + goto exit; + } + + if (ret <= 0) { + if (ret == 0) { + ret = -ENOTCONN; + } + + client_disconnect(client, ret); + goto exit; + } + + client->internal.remaining_payload -= ret; + +exit: + mqtt_mutex_unlock(client); + + return ret; +} diff --git a/subsys/net/lib/mqtt_sock/mqtt_decoder.c b/subsys/net/lib/mqtt_sock/mqtt_decoder.c new file mode 100644 index 00000000000..d46a104eefa --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_decoder.c @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_decoder.c + * + * @brief Decoder functions needed for decoding packets received from the + * broker. + */ + +#define LOG_MODULE_NAME net_mqtt_dec +#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL + +#include "mqtt_internal.h" +#include "mqtt_os.h" + +/** + * @brief Unpacks unsigned 8 bit value from the buffer from the offset + * requested. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] val Memory where the value is to be unpacked. + * + * @retval 0 if the procedure is successful. + * @retval -EINVAL if the buffer would be exceeded during the read + */ +static int unpack_uint8(struct buf_ctx *buf, u8_t *val) +{ + MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end); + + if ((buf->end - buf->cur) < sizeof(u8_t)) { + return -EINVAL; + } + + *val = *(buf->cur++); + + MQTT_TRC("<< val:%02x", *val); + + return 0; +} + +/** + * @brief Unpacks unsigned 16 bit value from the buffer from the offset + * requested. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] val Memory where the value is to be unpacked. + * + * @retval 0 if the procedure is successful. + * @retval -EINVAL if the buffer would be exceeded during the read + */ +static int unpack_uint16(struct buf_ctx *buf, u16_t *val) +{ + MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end); + + if ((buf->end - buf->cur) < sizeof(u16_t)) { + return -EINVAL; + } + + *val = *(buf->cur++) << 8; /* MSB */ + *val |= *(buf->cur++); /* LSB */ + + MQTT_TRC("<< val:%04x", *val); + + return 0; +} + +/** + * @brief Unpacks utf8 string from the buffer from the offset requested. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] str Pointer to a string that will hold the string location + * in the buffer. + * + * @retval 0 if the procedure is successful. + * @retval -EINVAL if the buffer would be exceeded during the read + */ +static int unpack_utf8_str(struct buf_ctx *buf, struct mqtt_utf8 *str) +{ + u16_t utf8_strlen; + int err_code; + + MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end); + + err_code = unpack_uint16(buf, &utf8_strlen); + if (err_code != 0) { + return err_code; + } + + if ((buf->end - buf->cur) < utf8_strlen) { + return -EINVAL; + } + + str->size = utf8_strlen; + /* Zero length UTF8 strings permitted. */ + if (utf8_strlen) { + /* Point to right location in buffer. */ + str->utf8 = buf->cur; + buf->cur += utf8_strlen; + } else { + str->utf8 = NULL; + } + + MQTT_TRC("<< str_size:%08x", (u32_t)GET_UT8STR_BUFFER_SIZE(str)); + + return 0; +} + +/** + * @brief Unpacks binary string from the buffer from the offset requested. + * + * @param[in] length Binary string length. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] str Pointer to a binary string that will hold the binary string + * location in the buffer. + * + * @retval 0 if the procedure is successful. + * @retval -EINVAL if the buffer would be exceeded during the read + */ +static int unpack_data(u32_t length, struct buf_ctx *buf, + struct mqtt_binstr *str) +{ + MQTT_TRC(">> cur:%p, end:%p", buf->cur, buf->end); + + if ((buf->end - buf->cur) < length) { + return -EINVAL; + } + + str->len = length; + + /* Zero length binary strings are permitted. */ + if (length > 0) { + str->data = buf->cur; + buf->cur += length; + } else { + str->data = NULL; + } + + MQTT_TRC("<< bin len:%08x", GET_BINSTR_BUFFER_SIZE(str)); + + return 0; +} + +/**@brief Decode MQTT Packet Length in the MQTT fixed header. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] length Length of variable header and payload in the + * MQTT message. + * + * @retval 0 if the procedure is successful. + * @retval -EINVAL if the length decoding would use more that 4 bytes. + * @retval -EAGAIN if the buffer would be exceeded during the read. + */ +int packet_length_decode(struct buf_ctx *buf, u32_t *length) +{ + u8_t shift = 0; + u8_t bytes = 0; + + *length = 0; + do { + if (bytes > MQTT_MAX_LENGTH_BYTES) { + return -EINVAL; + } + + if (buf->cur >= buf->end) { + return -EAGAIN; + } + + *length += ((u32_t)*(buf->cur) & MQTT_LENGTH_VALUE_MASK) + << shift; + shift += MQTT_LENGTH_SHIFT; + bytes++; + } while ((*(buf->cur++) & MQTT_LENGTH_CONTINUATION_BIT) != 0); + + MQTT_TRC("length:0x%08x", *length); + + return 0; +} + +int fixed_header_decode(struct buf_ctx *buf, u8_t *type_and_flags, + u32_t *length) +{ + int err_code; + + err_code = unpack_uint8(buf, type_and_flags); + if (err_code != 0) { + return err_code; + } + + return packet_length_decode(buf, length); +} + +int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf, + struct mqtt_connack_param *param) +{ + int err_code; + u8_t flags, ret_code; + + err_code = unpack_uint8(buf, &flags); + if (err_code != 0) { + return err_code; + } + + err_code = unpack_uint8(buf, &ret_code); + if (err_code != 0) { + return err_code; + } + + if (client->protocol_version == MQTT_VERSION_3_1_1) { + param->session_present_flag = + flags & MQTT_CONNACK_FLAG_SESSION_PRESENT; + + MQTT_TRC("[CID %p]: session_present_flag: %d", client, + param->session_present_flag); + } + + param->return_code = (enum mqtt_conn_return_code)ret_code; + + return 0; +} + +int publish_decode(u8_t flags, u32_t var_length, struct buf_ctx *buf, + struct mqtt_publish_param *param) +{ + int err_code; + u32_t var_header_length; + + param->dup_flag = flags & MQTT_HEADER_DUP_MASK; + param->retain_flag = flags & MQTT_HEADER_RETAIN_MASK; + param->message.topic.qos = ((flags & MQTT_HEADER_QOS_MASK) >> 1); + + err_code = unpack_utf8_str(buf, ¶m->message.topic.topic); + if (err_code != 0) { + return err_code; + } + + var_header_length = param->message.topic.topic.size + sizeof(u16_t); + + if (param->message.topic.qos > MQTT_QOS_0_AT_MOST_ONCE) { + err_code = unpack_uint16(buf, ¶m->message_id); + if (err_code != 0) { + return err_code; + } + + var_header_length += sizeof(u16_t); + } + + param->message.payload.data = NULL; + param->message.payload.len = var_length - var_header_length; + + return 0; +} + +int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param) +{ + return unpack_uint16(buf, ¶m->message_id); +} + +int publish_receive_decode(struct buf_ctx *buf, struct mqtt_pubrec_param *param) +{ + return unpack_uint16(buf, ¶m->message_id); +} + +int publish_release_decode(struct buf_ctx *buf, struct mqtt_pubrel_param *param) +{ + return unpack_uint16(buf, ¶m->message_id); +} + +int publish_complete_decode(struct buf_ctx *buf, + struct mqtt_pubcomp_param *param) +{ + return unpack_uint16(buf, ¶m->message_id); +} + +int subscribe_ack_decode(struct buf_ctx *buf, struct mqtt_suback_param *param) +{ + int err_code; + + err_code = unpack_uint16(buf, ¶m->message_id); + if (err_code != 0) { + return err_code; + } + + return unpack_data(buf->end - buf->cur, buf, ¶m->return_codes); +} + +int unsubscribe_ack_decode(struct buf_ctx *buf, + struct mqtt_unsuback_param *param) +{ + return unpack_uint16(buf, ¶m->message_id); +} diff --git a/subsys/net/lib/mqtt_sock/mqtt_encoder.c b/subsys/net/lib/mqtt_sock/mqtt_encoder.c new file mode 100644 index 00000000000..8b5c52dcc50 --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_encoder.c @@ -0,0 +1,568 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_encoder.c + * + * @brief Encoding functions needed to create packet to be sent to the broker. + */ + +#define LOG_MODULE_NAME net_mqtt_enc +#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL + +#include "mqtt_internal.h" +#include "mqtt_os.h" + +#define MQTT_3_1_0_PROTO_DESC_LEN 6 +#define MQTT_3_1_1_PROTO_DESC_LEN 4 + +static const u8_t mqtt_3_1_0_proto_desc_str[MQTT_3_1_0_PROTO_DESC_LEN] = { + 'M', 'Q', 'I', 's', 'd', 'p' +}; +static const u8_t mqtt_3_1_1_proto_desc_str[MQTT_3_1_1_PROTO_DESC_LEN] = { + 'M', 'Q', 'T', 'T' +}; + +static const struct mqtt_utf8 mqtt_3_1_0_proto_desc = { + .utf8 = (u8_t *)mqtt_3_1_0_proto_desc_str, + .size = MQTT_3_1_0_PROTO_DESC_LEN +}; + +static const struct mqtt_utf8 mqtt_3_1_1_proto_desc = { + .utf8 = (u8_t *)mqtt_3_1_1_proto_desc_str, + .size = MQTT_3_1_1_PROTO_DESC_LEN +}; + +/** Never changing ping request, needed for Keep Alive. */ +static const u8_t ping_packet[MQTT_FIXED_HEADER_MIN_SIZE] = { + MQTT_PKT_TYPE_PINGREQ, + 0x00 +}; + +/** Never changing disconnect request. */ +static const u8_t disc_packet[MQTT_FIXED_HEADER_MIN_SIZE] = { + MQTT_PKT_TYPE_DISCONNECT, + 0x00 +}; + +/** + * @brief Packs unsigned 8 bit value to the buffer at the offset requested. + * + * @param[in] val Value to be packed. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * + * @retval 0 if procedure is successful. + * @retval -ENOMEM if there is no place in the buffer to store the value. + */ +static int pack_uint8(u8_t val, struct buf_ctx *buf) +{ + if ((buf->end - buf->cur) < sizeof(u8_t)) { + return -ENOMEM; + } + + MQTT_TRC(">> val:%02x cur:%p, end:%p", val, buf->cur, buf->end); + + /* Pack value. */ + *(buf->cur++) = val; + + return 0; +} + +/** + * @brief Packs unsigned 16 bit value to the buffer at the offset requested. + * + * @param[in] val Value to be packed. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * + * @retval 0 if the procedure is successful. + * @retval -ENOMEM if there is no place in the buffer to store the value. + */ +static int pack_uint16(u16_t val, struct buf_ctx *buf) +{ + if ((buf->end - buf->cur) < sizeof(u16_t)) { + return -ENOMEM; + } + + MQTT_TRC(">> val:%04x cur:%p, end:%p", val, buf->cur, buf->end); + + /* Pack value. */ + *(buf->cur++) = (val >> 8) & 0xFF; + *(buf->cur++) = val & 0xFF; + + return 0; +} + +/** + * @brief Packs utf8 string to the buffer at the offset requested. + * + * @param[in] str UTF-8 string and its length to be packed. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * + * @retval 0 if the procedure is successful. + * @retval -ENOMEM if there is no place in the buffer to store the string. + */ +static int pack_utf8_str(const struct mqtt_utf8 *str, struct buf_ctx *buf) +{ + if ((buf->end - buf->cur) < GET_UT8STR_BUFFER_SIZE(str)) { + return -ENOMEM; + } + + MQTT_TRC(">> str_size:%08x cur:%p, end:%p", + (u32_t)GET_UT8STR_BUFFER_SIZE(str), buf->cur, buf->end); + + /* Pack length followed by string. */ + (void)pack_uint16(str->size, buf); + + memcpy(buf->cur, str->utf8, str->size); + buf->cur += str->size; + + return 0; +} + +/** + * @brief Computes and encodes length for the MQTT fixed header. + * + * @note The remaining length is not packed as a fixed unsigned 32 bit integer. + * Instead it is packed on algorithm below: + * + * @code + * do + * encodedByte = X MOD 128 + * X = X DIV 128 + * // if there are more data to encode, set the top bit of this byte + * if ( X > 0 ) + * encodedByte = encodedByte OR 128 + * endif + * 'output' encodedByte + * while ( X > 0 ) + * @endcode + * + * @param[in] length Length of variable header and payload in the MQTT message. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. May be NULL (in this case function will + * only calculate number of bytes needed). + * + * @return Number of bytes needed to encode length value. + */ +static u8_t packet_length_encode(u32_t length, struct buf_ctx *buf) +{ + u8_t encoded_bytes = 0; + + MQTT_TRC(">> length:0x%08x cur:%p, end:%p", length, buf->cur, buf->end); + + do { + encoded_bytes++; + + if (buf != NULL) { + *(buf->cur) = length & MQTT_LENGTH_VALUE_MASK; + } + + length >>= MQTT_LENGTH_SHIFT; + + if (buf != NULL) { + if (length > 0) { + *(buf->cur) |= MQTT_LENGTH_CONTINUATION_BIT; + } + buf->cur++; + } + } while (length > 0); + + return encoded_bytes; +} + +/** + * @brief Encodes fixed header for the MQTT message and provides pointer to + * start of the header. + * + * @param[in] message_type Message type containing packet type and the flags. + * Use @ref MQTT_MESSAGES_OPTIONS to construct the + * message_type. + * @param[in] start Pointer to the start of the variable header. + * @param[inout] buf Buffer context used to encode the frame. + * The 5 bytes before the start of the message are assumed + * by the routine to be available to pack the fixed header. + * However, since the fixed header length is variable + * length, the pointer to the start of the MQTT message + * along with encoded fixed header is supplied as output + * parameter if the procedure was successful. + * As output, the pointers will point to beginning and the end + * of the frame. + * + * @retval 0 if the procedure is successful. + * @retval -EMSGSIZE if the message is too big for MQTT. + */ +static u32_t mqtt_encode_fixed_header(u8_t message_type, u8_t *start, + struct buf_ctx *buf) +{ + u32_t length = buf->cur - start; + u8_t fixed_header_length; + + if (length > MQTT_MAX_PAYLOAD_SIZE) { + return -EMSGSIZE; + } + + MQTT_TRC("<< msg type:0x%02x length:0x%08x", message_type, length); + + fixed_header_length = packet_length_encode(length, NULL); + fixed_header_length += sizeof(u8_t); + + MQTT_TRC("Fixed header length = %02x", fixed_header_length); + + /* Set the pointer at the start of the frame before encoding. */ + buf->cur = start - fixed_header_length; + + (void)pack_uint8(message_type, buf); + (void)packet_length_encode(length, buf); + + /* Set the cur pointer back at the start of the frame, + * and end pointer to the end of the frame. + */ + buf->cur = buf->cur - fixed_header_length; + buf->end = buf->cur + length + fixed_header_length; + + return 0; +} + +/** + * @brief Encodes a string of a zero length. + * + * @param[in] buffer_len Total size of the buffer on which string will be + * encoded. This shall not be zero. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * + * @retval 0 if the procedure is successful. + * @retval -ENOMEM if there is no place in the buffer to store the binary + * string. + */ +static int zero_len_str_encode(struct buf_ctx *buf) +{ + return pack_uint16(0x0000, buf); +} + +/** + * @brief Encodes and sends messages that contain only message id in + * the variable header. + * + * @param[in] message_type Message type and reserved bit fields. + * @param[in] message_id Message id to be encoded in the variable header. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * + * @retval 0 or an error code indicating a reason for failure. + */ +static int mqtt_message_id_only_enc(u8_t message_type, u16_t message_id, + struct buf_ctx *buf) +{ + int err_code; + u8_t *start; + + /* Message id zero is not permitted by spec. */ + if (message_id == 0) { + return -EINVAL; + } + + /* Reserve space for fixed header. */ + buf->cur += MQTT_FIXED_HEADER_MAX_SIZE; + start = buf->cur; + + err_code = pack_uint16(message_id, buf); + if (err_code != 0) { + return err_code; + } + + return mqtt_encode_fixed_header(message_type, start, buf); +} + +int connect_request_encode(const struct mqtt_client *client, + struct buf_ctx *buf) +{ + u8_t connect_flags = client->clean_session << 1; + const u8_t message_type = + MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_CONNECT, 0, 0, 0); + const struct mqtt_utf8 *mqtt_proto_desc; + u8_t *connect_flags_pos; + int err_code; + u8_t *start; + + if (client->protocol_version == MQTT_VERSION_3_1_1) { + mqtt_proto_desc = &mqtt_3_1_1_proto_desc; + } else { + mqtt_proto_desc = &mqtt_3_1_0_proto_desc; + } + + /* Reserve space for fixed header. */ + buf->cur += MQTT_FIXED_HEADER_MAX_SIZE; + start = buf->cur; + + MQTT_TRC("Encoding Protocol Description. Str:%s Size:%08x.", + mqtt_proto_desc->utf8, mqtt_proto_desc->size); + + err_code = pack_utf8_str(mqtt_proto_desc, buf); + if (err_code != 0) { + return err_code; + } + + MQTT_TRC("Encoding Protocol Version %02x.", client->protocol_version); + err_code = pack_uint8(client->protocol_version, buf); + if (err_code != 0) { + return err_code; + } + + /* Remember position of connect flag and leave one byte for it to + * be packed once we determine its value. + */ + connect_flags_pos = buf->cur; + + err_code = pack_uint8(0, buf); + if (err_code != 0) { + return err_code; + } + + MQTT_TRC("Encoding Keep Alive Time %04x.", MQTT_KEEPALIVE); + err_code = pack_uint16(MQTT_KEEPALIVE, buf); + if (err_code != 0) { + return err_code; + } + + MQTT_TRC("Encoding Client Id. Str:%s Size:%08x.", + client->client_id.utf8, client->client_id.size); + err_code = pack_utf8_str(&client->client_id, buf); + if (err_code != 0) { + return err_code; + } + + /* Pack will topic and QoS */ + if (client->will_topic != NULL) { + connect_flags |= MQTT_CONNECT_FLAG_WILL_TOPIC; + /* QoS is always 1 as of now. */ + connect_flags |= ((client->will_topic->qos & 0x03) << 3); + connect_flags |= client->will_retain << 5; + + MQTT_TRC("Encoding Will Topic. Str:%s Size:%08x.", + client->will_topic->topic.utf8, + client->will_topic->topic.size); + err_code = pack_utf8_str(&client->will_topic->topic, buf); + if (err_code != 0) { + return err_code; + } + + if (client->will_message != NULL) { + MQTT_TRC("Encoding Will Message. Str:%s Size:%08x.", + client->will_message->utf8, + client->will_message->size); + err_code = pack_utf8_str(client->will_message, buf); + if (err_code != 0) { + return err_code; + } + } else { + MQTT_TRC("Encoding Zero Length Will Message."); + err_code = zero_len_str_encode(buf); + if (err_code != 0) { + return err_code; + } + } + } + + /* Pack Username if any. */ + if (client->user_name != NULL) { + connect_flags |= MQTT_CONNECT_FLAG_USERNAME; + + MQTT_TRC("Encoding Username. Str:%s, Size:%08x.", + client->user_name->utf8, client->user_name->size); + err_code = pack_utf8_str(client->user_name, buf); + if (err_code != 0) { + return err_code; + } + } + + /* Pack Password if any. */ + if (client->password != NULL) { + connect_flags |= MQTT_CONNECT_FLAG_PASSWORD; + + MQTT_TRC("Encoding Password. Str:%s Size:%08x.", + client->password->utf8, client->password->size); + err_code = pack_utf8_str(client->password, buf); + if (err_code != 0) { + return err_code; + } + } + + /* Write the flags the connect flags. */ + *connect_flags_pos = connect_flags; + + return mqtt_encode_fixed_header(message_type, start, buf); +} + +int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf) +{ + const u8_t message_type = MQTT_MESSAGES_OPTIONS( + MQTT_PKT_TYPE_PUBLISH, param->dup_flag, + param->message.topic.qos, param->retain_flag); + int err_code; + u8_t *start; + + /* Message id zero is not permitted by spec. */ + if ((param->message.topic.qos) && (param->message_id == 0)) { + return -EINVAL; + } + + /* Reserve space for fixed header. */ + buf->cur += MQTT_FIXED_HEADER_MAX_SIZE; + start = buf->cur; + + err_code = pack_utf8_str(¶m->message.topic.topic, buf); + if (err_code != 0) { + return err_code; + } + + if (param->message.topic.qos) { + err_code = pack_uint16(param->message_id, buf); + if (err_code != 0) { + return err_code; + } + } + + /* Do not copy payload. We move the buffer pointer to ensure that + * message length in fixed header is encoded correctly. + */ + buf->cur += param->message.payload.len; + + err_code = mqtt_encode_fixed_header(message_type, start, buf); + if (err_code != 0) { + return err_code; + } + + buf->end -= param->message.payload.len; + + return 0; +} + +int publish_ack_encode(const struct mqtt_puback_param *param, + struct buf_ctx *buf) +{ + const u8_t message_type = + MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBACK, 0, 0, 0); + + return mqtt_message_id_only_enc(message_type, param->message_id, buf); +} + +int publish_receive_encode(const struct mqtt_pubrec_param *param, + struct buf_ctx *buf) +{ + const u8_t message_type = + MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREC, 0, 0, 0); + + return mqtt_message_id_only_enc(message_type, param->message_id, buf); +} + +int publish_release_encode(const struct mqtt_pubrel_param *param, + struct buf_ctx *buf) +{ + const u8_t message_type = + MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBREL, 0, 1, 0); + + return mqtt_message_id_only_enc(message_type, param->message_id, buf); +} + +int publish_complete_encode(const struct mqtt_pubcomp_param *param, + struct buf_ctx *buf) +{ + const u8_t message_type = + MQTT_MESSAGES_OPTIONS(MQTT_PKT_TYPE_PUBCOMP, 0, 0, 0); + + return mqtt_message_id_only_enc(message_type, param->message_id, buf); +} + +int disconnect_encode(struct buf_ctx *buf) +{ + if (buf->end - buf->cur < sizeof(disc_packet)) { + return -ENOMEM; + } + + memcpy(buf->cur, disc_packet, sizeof(disc_packet)); + buf->end = buf->cur + sizeof(disc_packet); + + return 0; +} + +int subscribe_encode(const struct mqtt_subscription_list *param, + struct buf_ctx *buf) +{ + const u8_t message_type = MQTT_MESSAGES_OPTIONS( + MQTT_PKT_TYPE_SUBSCRIBE, 0, 1, 0); + int err_code, i; + u8_t *start; + + /* Message id zero is not permitted by spec. */ + if (param->message_id == 0) { + return -EINVAL; + } + + /* Reserve space for fixed header. */ + buf->cur += MQTT_FIXED_HEADER_MAX_SIZE; + start = buf->cur; + + err_code = pack_uint16(param->message_id, buf); + if (err_code != 0) { + return err_code; + } + + for (i = 0; i < param->list_count; i++) { + err_code = pack_utf8_str(¶m->list[i].topic, buf); + if (err_code != 0) { + return err_code; + } + + err_code = pack_uint8(param->list[i].qos, buf); + if (err_code != 0) { + return err_code; + } + } + + return mqtt_encode_fixed_header(message_type, start, buf); +} + +int unsubscribe_encode(const struct mqtt_subscription_list *param, + struct buf_ctx *buf) +{ + const u8_t message_type = MQTT_MESSAGES_OPTIONS( + MQTT_PKT_TYPE_UNSUBSCRIBE, 0, MQTT_QOS_1_AT_LEAST_ONCE, 0); + int err_code, i; + u8_t *start; + + /* Reserve space for fixed header. */ + buf->cur += MQTT_FIXED_HEADER_MAX_SIZE; + start = buf->cur; + + err_code = pack_uint16(param->message_id, buf); + if (err_code != 0) { + return err_code; + } + + for (i = 0; i < param->list_count; i++) { + err_code = pack_utf8_str(¶m->list[i].topic, buf); + if (err_code != 0) { + return err_code; + } + } + + return mqtt_encode_fixed_header(message_type, start, buf); +} + +int ping_request_encode(struct buf_ctx *buf) +{ + if (buf->end - buf->cur < sizeof(ping_packet)) { + return -ENOMEM; + } + + memcpy(buf->cur, ping_packet, sizeof(ping_packet)); + buf->end = buf->cur + sizeof(ping_packet); + + return 0; +} diff --git a/subsys/net/lib/mqtt_sock/mqtt_internal.h b/subsys/net/lib/mqtt_sock/mqtt_internal.h new file mode 100644 index 00000000000..189ad1d9b32 --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_internal.h @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_internal.h + * + * @brief Function and data structures internal to MQTT module. + */ + +#ifndef MQTT_INTERNAL_H_ +#define MQTT_INTERNAL_H_ + +#include +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/**@brief Keep alive time for MQTT (in seconds). Sending of Ping Requests to + * keep the connection alive are governed by this value. + */ +#define MQTT_KEEPALIVE CONFIG_MQTT_KEEPALIVE + +/**@brief Minimum mandatory size of fixed header. */ +#define MQTT_FIXED_HEADER_MIN_SIZE 2 + +/**@brief Maximum size of the fixed header. Remaining length size is 4 in this + * case. + */ +#define MQTT_FIXED_HEADER_MAX_SIZE 5 + +/**@brief MQTT Control Packet Types. */ +#define MQTT_PKT_TYPE_CONNECT 0x10 +#define MQTT_PKT_TYPE_CONNACK 0x20 +#define MQTT_PKT_TYPE_PUBLISH 0x30 +#define MQTT_PKT_TYPE_PUBACK 0x40 +#define MQTT_PKT_TYPE_PUBREC 0x50 +#define MQTT_PKT_TYPE_PUBREL 0x60 +#define MQTT_PKT_TYPE_PUBCOMP 0x70 +#define MQTT_PKT_TYPE_SUBSCRIBE 0x80 +#define MQTT_PKT_TYPE_SUBACK 0x90 +#define MQTT_PKT_TYPE_UNSUBSCRIBE 0xA0 +#define MQTT_PKT_TYPE_UNSUBACK 0xB0 +#define MQTT_PKT_TYPE_PINGREQ 0xC0 +#define MQTT_PKT_TYPE_PINGRSP 0xD0 +#define MQTT_PKT_TYPE_DISCONNECT 0xE0 + +/**@brief Masks for MQTT header flags. */ +#define MQTT_HEADER_DUP_MASK 0x08 +#define MQTT_HEADER_QOS_MASK 0x06 +#define MQTT_HEADER_RETAIN_MASK 0x01 + +/**@brief Masks for MQTT header flags. */ +#define MQTT_CONNECT_FLAG_CLEAN_SESSION 0x02 +#define MQTT_CONNECT_FLAG_WILL_TOPIC 0x04 +#define MQTT_CONNECT_FLAG_WILL_RETAIN 0x20 +#define MQTT_CONNECT_FLAG_PASSWORD 0x40 +#define MQTT_CONNECT_FLAG_USERNAME 0x80 + +#define MQTT_CONNACK_FLAG_SESSION_PRESENT 0x01 + +/**@brief Maximum payload size of MQTT packet. */ +#define MQTT_MAX_PAYLOAD_SIZE 0x0FFFFFFF + +/**@brief Computes total size needed to pack a UTF8 string. */ +#define GET_UT8STR_BUFFER_SIZE(STR) (sizeof(u16_t) + (STR)->size) + +/**@brief Computes total size needed to pack a binary stream. */ +#define GET_BINSTR_BUFFER_SIZE(STR) ((STR)->len) + +/**@brief Sets MQTT Client's state with one indicated in 'STATE'. */ +#define MQTT_SET_STATE(CLIENT, STATE) ((CLIENT)->internal.state |= (STATE)) + +/**@brief Sets MQTT Client's state exclusive to 'STATE'. */ +#define MQTT_SET_STATE_EXCLUSIVE(CLIENT, STATE) \ + ((CLIENT)->internal.state = (STATE)) + +/**@brief Verifies if MQTT Client's state is set with one indicated in 'STATE'. + */ +#define MQTT_HAS_STATE(CLIENT, STATE) ((CLIENT)->internal.state & (STATE)) + +/**@brief Reset 'STATE' in MQTT Client's state. */ +#define MQTT_RESET_STATE(CLIENT, STATE) ((CLIENT)->internal.state &= ~(STATE)) + +/**@brief Initialize MQTT Client's state. */ +#define MQTT_STATE_INIT(CLIENT) ((CLIENT)->internal.state = MQTT_STATE_IDLE) + +/**@brief Computes the first byte of MQTT message header based on message type, + * duplication flag, QoS and the retain flag. + */ +#define MQTT_MESSAGES_OPTIONS(TYPE, DUP, QOS, RETAIN) \ + (((TYPE) & 0xF0) | \ + (((DUP) << 3) & 0x08) | \ + (((QOS) << 1) & 0x06) | \ + ((RETAIN) & 0x01)) + +#define MQTT_MAX_LENGTH_BYTES 4 +#define MQTT_LENGTH_VALUE_MASK 0x7F +#define MQTT_LENGTH_CONTINUATION_BIT 0x80 +#define MQTT_LENGTH_SHIFT 7 + +/**@brief Check if the input pointer is NULL, if so it returns -EINVAL. */ +#define NULL_PARAM_CHECK(param) \ + do { \ + if ((param) == NULL) { \ + return -EINVAL; \ + } \ + } while (0) + +#define NULL_PARAM_CHECK_VOID(param) \ + do { \ + if ((param) == NULL) { \ + return; \ + } \ + } while (0) + +/** Buffer context to iterate over buffer. */ +struct buf_ctx { + u8_t *cur; + u8_t *end; +}; + +/**@brief MQTT States. */ +enum mqtt_state { + /** Idle state, implying the client entry in the table is unused/free. + */ + MQTT_STATE_IDLE = 0x00000000, + + /** TCP Connection has been requested, awaiting result of the request. + */ + MQTT_STATE_TCP_CONNECTING = 0x00000001, + + /** TCP Connection successfully established. */ + MQTT_STATE_TCP_CONNECTED = 0x00000002, + + /** MQTT Connection successful. */ + MQTT_STATE_CONNECTED = 0x00000004, + + /** TCP Disconnect has been requested, awaiting result of the request. + */ + MQTT_STATE_DISCONNECTING = 0x00000008 +}; + +/**@brief Notify application about MQTT event. + * + * @param[in] client Identifies the client for which event occurred. + * @param[in] evt MQTT event. + */ +void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt); + +/**@brief Handles MQTT messages received from the peer. + * + * @param[in] client Identifies the client for which the data was received. + + * @return 0 if the procedure is successful, an error code otherwise. + */ +int mqtt_handle_rx(struct mqtt_client *client); + +/**@brief Constructs/encodes Connect packet. + * + * @param[in] client Identifies the client for which the procedure is requested. + * All information required for creating the packet like + * client id, clean session flag, retain session flag etc are + * assumed to be populated for the client instance when this + * procedure is requested. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int connect_request_encode(const struct mqtt_client *client, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Publish packet. + * + * @param[in] param Publish message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf); + +/**@brief Constructs/encodes Publish Ack packet. + * + * @param[in] param Publish Ack message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_ack_encode(const struct mqtt_puback_param *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Publish Receive packet. + * + * @param[in] param Publish Receive message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_receive_encode(const struct mqtt_pubrec_param *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Publish Release packet. + * + * @param[in] param Publish Release message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_release_encode(const struct mqtt_pubrel_param *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Publish Complete packet. + * + * @param[in] param Publish Complete message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_complete_encode(const struct mqtt_pubcomp_param *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Disconnect packet. + * + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int disconnect_encode(struct buf_ctx *buf); + +/**@brief Constructs/encodes Subscribe packet. + * + * @param[in] param Subscribe message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int subscribe_encode(const struct mqtt_subscription_list *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Unsubscribe packet. + * + * @param[in] param Unsubscribe message parameters. + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int unsubscribe_encode(const struct mqtt_subscription_list *param, + struct buf_ctx *buf); + +/**@brief Constructs/encodes Ping Request packet. + * + * @param[inout] buf_ctx Pointer to the buffer context structure, + * containing buffer for the encoded message. + * As output points to the beginning and end of + * the frame. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int ping_request_encode(struct buf_ctx *buf); + +/**@brief Decode MQTT Packet Type and Length in the MQTT fixed header. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] type_and_flags Message type and flags. + * @param[out] length Length of variable header and payload in the MQTT message. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int fixed_header_decode(struct buf_ctx *buf, u8_t *type_and_flags, + u32_t *length); + +/**@brief Decode MQTT Connect Ack packet. + * + * @param[in] client MQTT client for which packet is decoded. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Connect Ack parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf, + struct mqtt_connack_param *param); + +/**@brief Decode MQTT Publish packet. + * + * @param[in] flags Byte containing message type and flags. + * @param[in] var_length Length of the variable part of the message. + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Publish parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_decode(u8_t flags, u32_t var_length, struct buf_ctx *buf, + struct mqtt_publish_param *param); + +/**@brief Decode MQTT Publish Ack packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Publish Ack parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_ack_decode(struct buf_ctx *buf, struct mqtt_puback_param *param); + +/**@brief Decode MQTT Publish Receive packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Publish Receive parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_receive_decode(struct buf_ctx *buf, + struct mqtt_pubrec_param *param); + +/**@brief Decode MQTT Publish Release packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Publish Release parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_release_decode(struct buf_ctx *buf, + struct mqtt_pubrel_param *param); + +/**@brief Decode MQTT Publish Complete packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Publish Complete parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int publish_complete_decode(struct buf_ctx *buf, + struct mqtt_pubcomp_param *param); + +/**@brief Decode MQTT Subscribe packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Subscribe parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int subscribe_ack_decode(struct buf_ctx *buf, + struct mqtt_suback_param *param); + +/**@brief Decode MQTT Unsubscribe packet. + * + * @param[inout] buf A pointer to the buf_ctx structure containing current + * buffer position. + * @param[out] param Pointer to buffer for decoded Unsubscribe parameters. + * + * @return 0 if the procedure is successful, an error code otherwise. + */ +int unsubscribe_ack_decode(struct buf_ctx *buf, + struct mqtt_unsuback_param *param); + +#ifdef __cplusplus +} +#endif + +#endif /* MQTT_INTERNAL_H_ */ diff --git a/subsys/net/lib/mqtt_sock/mqtt_os.h b/subsys/net/lib/mqtt_sock/mqtt_os.h new file mode 100644 index 00000000000..708a9a4967d --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_os.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_os.h + * + * @brief MQTT Client depends on certain OS specific functionality. The needed + * methods are mapped here and should be implemented based on OS in use. + * + * @details Memory management, mutex, logging and wall clock are the needed + * functionality for MQTT module. The needed interfaces are defined + * in the OS. OS specific port of the interface shall be provided. + * + */ + +#ifndef MQTT_OS_H_ +#define MQTT_OS_H_ + +#include +#include + +#include + +#include "mqtt_internal.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/**@brief Method to get trace logs from the module. */ +#define MQTT_TRC(...) NET_DBG(__VA_ARGS__) + +/**@brief Method to error logs from the module. */ +#define MQTT_ERR(...) NET_ERR(__VA_ARGS__) + +/**@brief Initialize the mutex for the module, if any. + * + * @details This method is called during module initialization @ref mqtt_init. + */ +static inline void mqtt_mutex_init(struct mqtt_client *client) +{ + k_mutex_init(&client->internal.mutex); +} + +/**@brief Acquire lock on the module specific mutex, if any. + * + * @details This is assumed to be a blocking method until the acquisition + * of the mutex succeeds. + */ +static inline void mqtt_mutex_lock(struct mqtt_client *client) +{ + (void)k_mutex_lock(&client->internal.mutex, K_FOREVER); +} + +/**@brief Release the lock on the module specific mutex, if any. + */ +static inline void mqtt_mutex_unlock(struct mqtt_client *client) +{ + k_mutex_unlock(&client->internal.mutex); +} + +/**@brief Method to get the sys tick or a wall clock in millisecond resolution. + * + * @retval Current wall clock or sys tick value in milliseconds. + */ +static inline u32_t mqtt_sys_tick_in_ms_get(void) +{ + return k_uptime_get_32(); +} + +/**@brief Method to get elapsed time in milliseconds since the last activity. + * + * @param[in] last_activity The value since elapsed time is requested. + * + * @retval Time elapsed since last_activity time. + */ +static inline u32_t mqtt_elapsed_time_in_ms_get(u32_t last_activity) +{ + s32_t diff = k_uptime_get_32() - last_activity; + + if (diff < 0) { + return 0; + } + + return diff; +} + +#ifdef __cplusplus +} +#endif + +#endif /* MQTT_OS_H_ */ diff --git a/subsys/net/lib/mqtt_sock/mqtt_rx.c b/subsys/net/lib/mqtt_sock/mqtt_rx.c new file mode 100644 index 00000000000..b87c97e0d9d --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_rx.c @@ -0,0 +1,279 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#define LOG_MODULE_NAME net_mqtt_rx +#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL + +#include "mqtt_internal.h" +#include "mqtt_transport.h" +#include "mqtt_os.h" + +/** @file mqtt_rx.c + * + * @brief MQTT Received data handling. + */ + +static int mqtt_handle_packet(struct mqtt_client *client, + u8_t type_and_flags, + u32_t var_length, + struct buf_ctx *buf) +{ + int err_code = 0; + bool notify_event = true; + struct mqtt_evt evt; + + /* Success by default, overwritten in special cases. */ + evt.result = 0; + + switch (type_and_flags & 0xF0) { + case MQTT_PKT_TYPE_CONNACK: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client); + + evt.type = MQTT_EVT_CONNACK; + err_code = connect_ack_decode(client, buf, &evt.param.connack); + if (err_code == 0) { + MQTT_TRC("[CID %p]: return_code: %d", client, + evt.param.connack.return_code); + + if (evt.param.connack.return_code == + MQTT_CONNECTION_ACCEPTED) { + /* Set state. */ + MQTT_SET_STATE(client, MQTT_STATE_CONNECTED); + } + + evt.result = evt.param.connack.return_code; + } else { + evt.result = err_code; + } + + break; + + case MQTT_PKT_TYPE_PUBLISH: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client); + + evt.type = MQTT_EVT_PUBLISH; + err_code = publish_decode(type_and_flags, var_length, buf, + &evt.param.publish); + evt.result = err_code; + + client->internal.remaining_payload = + evt.param.publish.message.payload.len; + + MQTT_TRC("PUB QoS:%02x, message len %08x, topic len %08x", + evt.param.publish.message.topic.qos, + evt.param.publish.message.payload.len, + evt.param.publish.message.topic.topic.size); + + break; + + case MQTT_PKT_TYPE_PUBACK: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client); + + evt.type = MQTT_EVT_PUBACK; + err_code = publish_ack_decode(buf, &evt.param.puback); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_PUBREC: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client); + + evt.type = MQTT_EVT_PUBREC; + err_code = publish_receive_decode(buf, &evt.param.pubrec); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_PUBREL: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client); + + evt.type = MQTT_EVT_PUBREL; + err_code = publish_release_decode(buf, &evt.param.pubrel); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_PUBCOMP: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client); + + evt.type = MQTT_EVT_PUBCOMP; + err_code = publish_complete_decode(buf, &evt.param.pubcomp); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_SUBACK: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client); + + evt.type = MQTT_EVT_SUBACK; + err_code = subscribe_ack_decode(buf, &evt.param.suback); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_UNSUBACK: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client); + + evt.type = MQTT_EVT_UNSUBACK; + err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback); + evt.result = err_code; + break; + + case MQTT_PKT_TYPE_PINGRSP: + MQTT_TRC("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client); + + /* No notification of Ping response to application. */ + notify_event = false; + break; + + default: + /* Nothing to notify. */ + notify_event = false; + break; + } + + if (notify_event == true) { + event_notify(client, &evt); + } + + return err_code; +} + +static int mqtt_read_message_chunk(struct mqtt_client *client, + struct buf_ctx *buf, u32_t length) +{ + int remaining; + int len; + + /* Calculate how much data we need to read from the transport, + * given the already buffered data. + */ + remaining = length - (buf->end - buf->cur); + if (remaining <= 0) { + return 0; + } + + /* Check if read does not exceed the buffer. */ + if (buf->end + remaining > client->rx_buf + client->rx_buf_size) { + MQTT_ERR("[CID %p]: Buffer too small to receive the message", + client); + return -ENOMEM; + } + + len = mqtt_transport_read(client, buf->end, remaining); + if (len < 0) { + MQTT_TRC("[CID %p]: Transport read error: %d", client, len); + return len; + } + + if (len == 0) { + MQTT_TRC("[CID %p]: Connection closed.", client); + return -ENOTCONN; + } + + client->internal.rx_buf_datalen += len; + buf->end += len; + + if (len < remaining) { + MQTT_TRC("[CID %p]: Message partially received.", client); + return -EAGAIN; + } + + return 0; +} + +static int mqtt_read_publish_var_header(struct mqtt_client *client, + u8_t type_and_flags, + struct buf_ctx *buf) +{ + u8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1; + int err_code; + u32_t variable_header_length; + + /* Read topic length field. */ + err_code = mqtt_read_message_chunk(client, buf, sizeof(u16_t)); + if (err_code < 0) { + return err_code; + } + + variable_header_length = *buf->cur << 8; /* MSB */ + variable_header_length |= *(buf->cur + 1); /* LSB */ + + /* Add two bytes for topic length field. */ + variable_header_length += sizeof(u16_t); + + /* Add two bytes for message_id, if needed. */ + if (qos > MQTT_QOS_0_AT_MOST_ONCE) { + variable_header_length += sizeof(u16_t); + } + + /* Now we can read the whole header. */ + err_code = mqtt_read_message_chunk(client, buf, + variable_header_length); + if (err_code < 0) { + return err_code; + } + + return 0; +} + +static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client, + u8_t *type_and_flags, + u32_t *var_length, + struct buf_ctx *buf) +{ + /* Read the mandatory part of the fixed header in first iteration. */ + u8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE; + int err_code; + + do { + err_code = mqtt_read_message_chunk(client, buf, chunk_size); + if (err_code < 0) { + return err_code; + } + + /* Reset to pointer to the beginning of the frame. */ + buf->cur = client->rx_buf; + chunk_size = 1; + + err_code = fixed_header_decode(buf, type_and_flags, var_length); + } while (err_code == -EAGAIN); + + return err_code; +} + +int mqtt_handle_rx(struct mqtt_client *client) +{ + int err_code; + u8_t type_and_flags; + u32_t var_length; + struct buf_ctx buf; + + buf.cur = client->rx_buf; + buf.end = client->rx_buf + client->internal.rx_buf_datalen; + + err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags, + &var_length, &buf); + if (err_code < 0) { + return (err_code == -EAGAIN) ? 0 : err_code; + } + + if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) { + err_code = mqtt_read_publish_var_header(client, type_and_flags, + &buf); + } else { + err_code = mqtt_read_message_chunk(client, &buf, var_length); + } + + if (err_code < 0) { + return (err_code == -EAGAIN) ? 0 : err_code; + } + + /* At this point, packet is ready to be passed to the application. */ + err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf); + if (err_code < 0) { + return err_code; + } + + client->internal.rx_buf_datalen = 0; + + return 0; +} diff --git a/subsys/net/lib/mqtt_sock/mqtt_transport.c b/subsys/net/lib/mqtt_sock/mqtt_transport.c new file mode 100644 index 00000000000..dc28f4ba1ff --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_transport.c @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_transport.c + * + * @brief Internal functions to handle transport in MQTT module. + */ + +#include "mqtt_transport.h" + +/* Transport handler functions for TCP socket transport. */ +extern int mqtt_client_tcp_connect(struct mqtt_client *client); +extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, + u32_t datalen); +extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, + u32_t buflen); +extern int mqtt_client_tcp_disconnect(struct mqtt_client *client); + +/**@brief Function pointer array for TCP/TLS transport handlers. */ +const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = { + { + mqtt_client_tcp_connect, + mqtt_client_tcp_write, + mqtt_client_tcp_read, + mqtt_client_tcp_disconnect, + } +}; + +int mqtt_transport_connect(struct mqtt_client *client) +{ + return transport_fn[client->transport.type].connect(client); +} + +int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, + u32_t datalen) +{ + return transport_fn[client->transport.type].write(client, data, + datalen); +} + +int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen) +{ + return transport_fn[client->transport.type].read(client, data, buflen); +} + +int mqtt_transport_disconnect(struct mqtt_client *client) +{ + return transport_fn[client->transport.type].disconnect(client); +} diff --git a/subsys/net/lib/mqtt_sock/mqtt_transport.h b/subsys/net/lib/mqtt_sock/mqtt_transport.h new file mode 100644 index 00000000000..6f295a717af --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_transport.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_transport.h + * + * @brief Internal functions to handle transport in MQTT module. + */ + +#ifndef MQTT_TRANSPORT_H_ +#define MQTT_TRANSPORT_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/**@brief Transport for handling transport connect procedure. */ +typedef int (*transport_connect_handler_t)(struct mqtt_client *client); + +/**@brief Transport write handler. */ +typedef int (*transport_write_handler_t)(struct mqtt_client *client, + const u8_t *data, u32_t datalen); + +/**@brief Transport read handler. */ +typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data, + u32_t buflen); + +/**@brief Transport disconnect handler. */ +typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client); + +/**@brief Transport procedure handlers. */ +struct transport_procedure { + /** Transport connect handler. Handles TCP connection callback based on + * type of transport. + */ + transport_connect_handler_t connect; + + /** Transport write handler. Handles transport write based on type of + * transport. + */ + transport_write_handler_t write; + + /** Transport read handler. Handles transport read based on type of + * transport. + */ + transport_read_handler_t read; + + /** Transport disconnect handler. Handles transport disconnection based + * on type of transport. + */ + transport_disconnect_handler_t disconnect; +}; + +/**@brief Handles TCP Connection Complete for configured transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_transport_connect(struct mqtt_client *client); + +/**@brief Handles write requests on configured transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * @param[in] data Data to be written on the transport. + * @param[in] datalen Length of data to be written on the transport. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, + u32_t datalen); + +/**@brief Handles read requests on configured transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * @param[in] data Pointer where read data is to be fetched. + * @param[in] buflen Size of memory provided for the operation. + * + * @retval Number of bytes read or an error code indicating reason for failure. + * 0 if connection was closed. + */ +int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen); + +/**@brief Handles transport disconnection requests on configured transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_transport_disconnect(struct mqtt_client *client); + +#ifdef __cplusplus +} +#endif + +#endif /* MQTT_TRANSPORT_H_ */ diff --git a/subsys/net/lib/mqtt_sock/mqtt_transport_socket_tcp.c b/subsys/net/lib/mqtt_sock/mqtt_transport_socket_tcp.c new file mode 100644 index 00000000000..c51fd8b454a --- /dev/null +++ b/subsys/net/lib/mqtt_sock/mqtt_transport_socket_tcp.c @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2018 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** @file mqtt_transport_socket_tcp.h + * + * @brief Internal functions to handle transport over TCP socket. + */ + +#define LOG_MODULE_NAME net_mqtt_sock_tcp +#define NET_LOG_LEVEL CONFIG_MQTT_LOG_LEVEL + +#include +#include +#include + +#include "mqtt_os.h" + +/**@brief Handles connect request for TCP socket transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_client_tcp_connect(struct mqtt_client *client) +{ + const struct sockaddr *broker = client->broker; + int ret; + + client->transport.tcp.sock = socket(broker->sa_family, SOCK_STREAM, + IPPROTO_TCP); + if (client->transport.tcp.sock < 0) { + return -errno; + } + + MQTT_TRC("Created socket %d", client->transport.tcp.sock); + + size_t peer_addr_size = sizeof(struct sockaddr_in6); + + if (broker->sa_family == AF_INET) { + peer_addr_size = sizeof(struct sockaddr_in); + } + + ret = connect(client->transport.tcp.sock, client->broker, + peer_addr_size); + if (ret < 0) { + (void)close(client->transport.tcp.sock); + return -errno; + } + + MQTT_TRC("Connect completed"); + return 0; +} + +/**@brief Handles write requests on TCP socket transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * @param[in] data Data to be written on the transport. + * @param[in] datalen Length of data to be written on the transport. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, + u32_t datalen) +{ + u32_t offset = 0; + int ret; + + while (offset < datalen) { + ret = send(client->transport.tcp.sock, data + offset, + datalen - offset, 0); + if (ret < 0) { + return -errno; + } + + offset += ret; + } + + return 0; +} + +/**@brief Handles read requests on TCP socket transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * @param[in] data Pointer where read data is to be fetched. + * @param[in] buflen Size of memory provided for the operation. + * + * @retval Number of bytes read or an error code indicating reason for failure. + * 0 if connection was closed. + */ +int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen) +{ + int ret; + + ret = recv(client->transport.tcp.sock, data, buflen, MSG_DONTWAIT); + if (ret < 0) { + return -errno; + } + + return ret; +} + +/**@brief Handles transport disconnection requests on TCP socket transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_client_tcp_disconnect(struct mqtt_client *client) +{ + int ret; + + MQTT_TRC("Closing socket %d", client->transport.tcp.sock); + + ret = close(client->transport.tcp.sock); + if (ret < 0) { + return -errno; + } + + return 0; +}