net: mqtt: Add support for running MQTT over Websocket
Initial support for running MQTT over Websocket. Fixes #19539 Signed-off-by: Jukka Rissanen <jukka.rissanen@linux.intel.com>
This commit is contained in:
parent
82ed1681f0
commit
23ca8899fa
6 changed files with 204 additions and 18 deletions
|
@ -29,6 +29,7 @@
|
||||||
#include <net/tls_credentials.h>
|
#include <net/tls_credentials.h>
|
||||||
#include <net/net_ip.h>
|
#include <net/net_ip.h>
|
||||||
#include <sys/mutex.h>
|
#include <sys/mutex.h>
|
||||||
|
#include <net/websocket.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -350,6 +351,15 @@ enum mqtt_transport_type {
|
||||||
MQTT_TRANSPORT_SECURE,
|
MQTT_TRANSPORT_SECURE,
|
||||||
#endif /* CONFIG_MQTT_LIB_TLS */
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
||||||
|
|
||||||
|
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
|
||||||
|
/** Use non secure Websocket transport for MQTT connection. */
|
||||||
|
MQTT_TRANSPORT_NON_SECURE_WEBSOCKET,
|
||||||
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
||||||
|
/** Use secure Websocket transport (TLS) for MQTT connection. */
|
||||||
|
MQTT_TRANSPORT_SECURE_WEBSOCKET,
|
||||||
|
#endif
|
||||||
|
#endif /* CONFIG_MQTT_LIB_WEBSOCKET */
|
||||||
|
|
||||||
/** Shall not be used as a transport type.
|
/** Shall not be used as a transport type.
|
||||||
* Indicator of maximum transport types possible.
|
* Indicator of maximum transport types possible.
|
||||||
*/
|
*/
|
||||||
|
@ -385,6 +395,20 @@ struct mqtt_transport {
|
||||||
#endif /* CONFIG_MQTT_LIB_TLS */
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
|
||||||
|
/** Websocket transport for MQTT */
|
||||||
|
struct {
|
||||||
|
/** Websocket configuration. */
|
||||||
|
struct websocket_request config;
|
||||||
|
|
||||||
|
/** Socket descriptor */
|
||||||
|
int sock;
|
||||||
|
|
||||||
|
/** Websocket timeout */
|
||||||
|
s32_t timeout;
|
||||||
|
} websocket;
|
||||||
|
#endif
|
||||||
|
|
||||||
#if defined(CONFIG_SOCKS)
|
#if defined(CONFIG_SOCKS)
|
||||||
struct {
|
struct {
|
||||||
struct sockaddr addr;
|
struct sockaddr addr;
|
||||||
|
|
|
@ -14,3 +14,7 @@ zephyr_library_sources(
|
||||||
zephyr_library_sources_ifdef(CONFIG_MQTT_LIB_TLS
|
zephyr_library_sources_ifdef(CONFIG_MQTT_LIB_TLS
|
||||||
mqtt_transport_socket_tls.c
|
mqtt_transport_socket_tls.c
|
||||||
)
|
)
|
||||||
|
|
||||||
|
zephyr_library_sources_ifdef(CONFIG_MQTT_LIB_WEBSOCKET
|
||||||
|
mqtt_transport_websocket.c
|
||||||
|
)
|
||||||
|
|
|
@ -32,4 +32,9 @@ config MQTT_LIB_TLS
|
||||||
help
|
help
|
||||||
Enable TLS support for socket MQTT Library
|
Enable TLS support for socket MQTT Library
|
||||||
|
|
||||||
|
config MQTT_LIB_WEBSOCKET
|
||||||
|
bool "Websocket support for socket MQTT Library"
|
||||||
|
help
|
||||||
|
Enable Websocket support for socket MQTT Library.
|
||||||
|
|
||||||
endif # MQTT_LIB
|
endif # MQTT_LIB
|
||||||
|
|
|
@ -11,24 +11,6 @@
|
||||||
|
|
||||||
#include "mqtt_transport.h"
|
#include "mqtt_transport.h"
|
||||||
|
|
||||||
/* Transport handler functions for TCP socket transport. */
|
|
||||||
extern int mqtt_client_tcp_connect(struct mqtt_client *client);
|
|
||||||
extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
|
|
||||||
u32_t datalen);
|
|
||||||
extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
|
|
||||||
u32_t buflen, bool shall_block);
|
|
||||||
extern int mqtt_client_tcp_disconnect(struct mqtt_client *client);
|
|
||||||
|
|
||||||
#if defined(CONFIG_MQTT_LIB_TLS)
|
|
||||||
/* Transport handler functions for TLS socket transport. */
|
|
||||||
extern int mqtt_client_tls_connect(struct mqtt_client *client);
|
|
||||||
extern int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
|
|
||||||
u32_t datalen);
|
|
||||||
extern int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data,
|
|
||||||
u32_t buflen, bool shall_block);
|
|
||||||
extern int mqtt_client_tls_disconnect(struct mqtt_client *client);
|
|
||||||
#endif /* CONFIG_MQTT_LIB_TLS */
|
|
||||||
|
|
||||||
/**@brief Function pointer array for TCP/TLS transport handlers. */
|
/**@brief Function pointer array for TCP/TLS transport handlers. */
|
||||||
const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
|
const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
|
||||||
{
|
{
|
||||||
|
@ -45,6 +27,22 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = {
|
||||||
mqtt_client_tls_disconnect,
|
mqtt_client_tls_disconnect,
|
||||||
},
|
},
|
||||||
#endif /* CONFIG_MQTT_LIB_TLS */
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
||||||
|
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
|
||||||
|
{
|
||||||
|
mqtt_client_websocket_connect,
|
||||||
|
mqtt_client_websocket_write,
|
||||||
|
mqtt_client_websocket_read,
|
||||||
|
mqtt_client_websocket_disconnect,
|
||||||
|
},
|
||||||
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
||||||
|
{
|
||||||
|
mqtt_client_websocket_connect,
|
||||||
|
mqtt_client_websocket_write,
|
||||||
|
mqtt_client_websocket_read,
|
||||||
|
mqtt_client_websocket_disconnect,
|
||||||
|
},
|
||||||
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
||||||
|
#endif /* CONFIG_MQTT_LIB_WEBSOCKET */
|
||||||
};
|
};
|
||||||
|
|
||||||
int mqtt_transport_connect(struct mqtt_client *client)
|
int mqtt_transport_connect(struct mqtt_client *client)
|
||||||
|
|
|
@ -95,6 +95,33 @@ int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
|
||||||
*/
|
*/
|
||||||
int mqtt_transport_disconnect(struct mqtt_client *client);
|
int mqtt_transport_disconnect(struct mqtt_client *client);
|
||||||
|
|
||||||
|
/* Transport handler functions for TCP socket transport. */
|
||||||
|
int mqtt_client_tcp_connect(struct mqtt_client *client);
|
||||||
|
int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
|
||||||
|
u32_t datalen);
|
||||||
|
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
|
||||||
|
u32_t buflen, bool shall_block);
|
||||||
|
int mqtt_client_tcp_disconnect(struct mqtt_client *client);
|
||||||
|
|
||||||
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
||||||
|
/* Transport handler functions for TLS socket transport. */
|
||||||
|
int mqtt_client_tls_connect(struct mqtt_client *client);
|
||||||
|
int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
|
||||||
|
u32_t datalen);
|
||||||
|
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data,
|
||||||
|
u32_t buflen, bool shall_block);
|
||||||
|
int mqtt_client_tls_disconnect(struct mqtt_client *client);
|
||||||
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
||||||
|
|
||||||
|
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
|
||||||
|
int mqtt_client_websocket_connect(struct mqtt_client *client);
|
||||||
|
int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data,
|
||||||
|
u32_t datalen);
|
||||||
|
int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data,
|
||||||
|
u32_t buflen, bool shall_block);
|
||||||
|
int mqtt_client_websocket_disconnect(struct mqtt_client *client);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
128
subsys/net/lib/mqtt/mqtt_transport_websocket.c
Normal file
128
subsys/net/lib/mqtt/mqtt_transport_websocket.c
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2018 Nordic Semiconductor ASA
|
||||||
|
* Copyright (c) 2019 Intel Corporation
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** @file mqtt_transport_websocket.c
|
||||||
|
*
|
||||||
|
* @brief Internal functions to handle transport over Websocket.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <logging/log.h>
|
||||||
|
LOG_MODULE_REGISTER(net_mqtt_websocket, CONFIG_MQTT_LOG_LEVEL);
|
||||||
|
|
||||||
|
#include <errno.h>
|
||||||
|
#include <net/socket.h>
|
||||||
|
#include <net/mqtt.h>
|
||||||
|
#include <net/websocket.h>
|
||||||
|
|
||||||
|
#include "mqtt_os.h"
|
||||||
|
#include "mqtt_transport.h"
|
||||||
|
|
||||||
|
int mqtt_client_websocket_connect(struct mqtt_client *client)
|
||||||
|
{
|
||||||
|
const char *extra_headers[] = {
|
||||||
|
"Sec-WebSocket-Protocol: mqtt\r\n",
|
||||||
|
NULL
|
||||||
|
};
|
||||||
|
int transport_sock;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE_WEBSOCKET) {
|
||||||
|
ret = mqtt_client_tcp_connect(client);
|
||||||
|
if (ret < 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
transport_sock = client->transport.tcp.sock;
|
||||||
|
}
|
||||||
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
||||||
|
else if (client->transport.type == MQTT_TRANSPORT_SECURE_WEBSOCKET) {
|
||||||
|
ret = mqtt_client_tls_connect(client);
|
||||||
|
if (ret < 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
transport_sock = client->transport.tls.sock;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
else {
|
||||||
|
return -EINVAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (client->transport.websocket.config.url == NULL) {
|
||||||
|
client->transport.websocket.config.url = "/mqtt";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (client->transport.websocket.config.host == NULL) {
|
||||||
|
client->transport.websocket.config.host = "localhost";
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If application needs to set some extra header options, then
|
||||||
|
* it can set the optional_headers_cb. In this case the app
|
||||||
|
* will need to also send "Sec-WebSocket-Protocol: mqtt\r\n"
|
||||||
|
* field as the optional_headers field is ignored if the callback
|
||||||
|
* is set.
|
||||||
|
*/
|
||||||
|
client->transport.websocket.config.optional_headers = extra_headers;
|
||||||
|
|
||||||
|
client->transport.websocket.sock = websocket_connect(
|
||||||
|
transport_sock,
|
||||||
|
&client->transport.websocket.config,
|
||||||
|
client->transport.websocket.timeout,
|
||||||
|
NULL);
|
||||||
|
if (client->transport.websocket.sock < 0) {
|
||||||
|
MQTT_TRC("Websocket connect failed (%d)",
|
||||||
|
client->transport.websocket.sock);
|
||||||
|
|
||||||
|
(void)close(transport_sock);
|
||||||
|
return client->transport.websocket.sock;
|
||||||
|
}
|
||||||
|
|
||||||
|
MQTT_TRC("Connect completed");
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data,
|
||||||
|
u32_t datalen)
|
||||||
|
{
|
||||||
|
u32_t offset = 0U;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
while (offset < datalen) {
|
||||||
|
ret = websocket_send_msg(client->transport.websocket.sock,
|
||||||
|
data + offset, datalen - offset,
|
||||||
|
WEBSOCKET_OPCODE_DATA_BINARY,
|
||||||
|
true, true, K_FOREVER);
|
||||||
|
if (ret < 0) {
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data,
|
||||||
|
u32_t buflen, bool shall_block)
|
||||||
|
{
|
||||||
|
s32_t timeout = K_FOREVER;
|
||||||
|
|
||||||
|
if (!shall_block) {
|
||||||
|
timeout = K_NO_WAIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return websocket_recv_msg(client->transport.websocket.sock,
|
||||||
|
data, buflen, NULL, NULL, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
int mqtt_client_websocket_disconnect(struct mqtt_client *client)
|
||||||
|
{
|
||||||
|
MQTT_TRC("Closing socket %d", client->transport.websocket.sock);
|
||||||
|
|
||||||
|
return websocket_disconnect(client->transport.websocket.sock);
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue