diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index 7204aa2b8eb..7f609226f2b 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -159,6 +159,27 @@ static int client_write(struct mqtt_client *client, const u8_t *data, return 0; } +static int client_write_msg(struct mqtt_client *client, + const struct msghdr *message) +{ + int err_code; + + MQTT_TRC("[%p]: Transport writing message.", client); + + err_code = mqtt_transport_write_msg(client, message); + if (err_code < 0) { + MQTT_TRC("Transport write failed, err_code = %d, " + "closing connection", err_code); + client_disconnect(client, err_code); + return err_code; + } + + MQTT_TRC("[%p]: Transport write complete.", client); + client->internal.last_activity = mqtt_sys_tick_in_ms_get(); + + return 0; +} + void mqtt_client_init(struct mqtt_client *client) { NULL_PARAM_CHECK_VOID(client); @@ -233,6 +254,8 @@ int mqtt_publish(struct mqtt_client *client, { int err_code; struct buf_ctx packet; + struct iovec io_vector[2]; + struct msghdr msg; NULL_PARAM_CHECK(client); NULL_PARAM_CHECK(param); @@ -256,13 +279,17 @@ int mqtt_publish(struct mqtt_client *client, goto error; } - err_code = client_write(client, packet.cur, packet.end - packet.cur); - if (err_code < 0) { - goto error; - } + io_vector[0].iov_base = packet.cur; + io_vector[0].iov_len = packet.end - packet.cur; + io_vector[1].iov_base = param->message.payload.data; + io_vector[1].iov_len = param->message.payload.len; - err_code = client_write(client, param->message.payload.data, - param->message.payload.len); + memset(&msg, 0, sizeof(msg)); + + msg.msg_iov = io_vector; + msg.msg_iovlen = ARRAY_SIZE(io_vector); + + err_code = client_write_msg(client, &msg); error: MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", diff --git a/subsys/net/lib/mqtt/mqtt_transport.c b/subsys/net/lib/mqtt/mqtt_transport.c index 2f6746cd029..d0bcdaf4c3a 100644 --- a/subsys/net/lib/mqtt/mqtt_transport.c +++ b/subsys/net/lib/mqtt/mqtt_transport.c @@ -16,6 +16,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = { { mqtt_client_tcp_connect, mqtt_client_tcp_write, + mqtt_client_tcp_write_msg, mqtt_client_tcp_read, mqtt_client_tcp_disconnect, }, @@ -23,6 +24,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = { { mqtt_client_tls_connect, mqtt_client_tls_write, + mqtt_client_tls_write_msg, mqtt_client_tls_read, mqtt_client_tls_disconnect, }, @@ -31,6 +33,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = { { mqtt_client_websocket_connect, mqtt_client_websocket_write, + mqtt_client_websocket_write_msg, mqtt_client_websocket_read, mqtt_client_websocket_disconnect, }, @@ -38,6 +41,7 @@ const struct transport_procedure transport_fn[MQTT_TRANSPORT_NUM] = { { mqtt_client_websocket_connect, mqtt_client_websocket_write, + mqtt_client_websocket_write_msg, mqtt_client_websocket_read, mqtt_client_websocket_disconnect, }, @@ -57,6 +61,12 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, datalen); } +int mqtt_transport_write_msg(struct mqtt_client *client, + const struct msghdr *message) +{ + return transport_fn[client->transport.type].write_msg(client, message); +} + int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block) { diff --git a/subsys/net/lib/mqtt/mqtt_transport.h b/subsys/net/lib/mqtt/mqtt_transport.h index e2e5a258e7b..df7e3265e5a 100644 --- a/subsys/net/lib/mqtt/mqtt_transport.h +++ b/subsys/net/lib/mqtt/mqtt_transport.h @@ -25,6 +25,10 @@ typedef int (*transport_connect_handler_t)(struct mqtt_client *client); typedef int (*transport_write_handler_t)(struct mqtt_client *client, const u8_t *data, u32_t datalen); +/**@brief Transport write message handler, similar to POSIX sendmsg function. */ +typedef int (*transport_write_msg_handler_t)(struct mqtt_client *client, + const struct msghdr *message); + /**@brief Transport read handler. */ typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block); @@ -44,6 +48,11 @@ struct transport_procedure { */ transport_write_handler_t write; + /** Transport write message handler. Handles transport write based + * on type of transport. + */ + transport_write_msg_handler_t write_msg; + /** Transport read handler. Handles transport read based on type of * transport. */ @@ -74,6 +83,17 @@ int mqtt_transport_connect(struct mqtt_client *client); int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); +/**@brief Handles write message requests on configured transport. + * + * @param[in] client Identifies the client on which the procedure is requested. + * @param[in] message Pointer to the `struct msghdr` structure, containing data + * to be written on the transport. + * + * @retval 0 or an error code indicating reason for failure. + */ +int mqtt_transport_write_msg(struct mqtt_client *client, + const struct msghdr *message); + /**@brief Handles read requests on configured transport. * * @param[in] client Identifies the client on which the procedure is requested. @@ -99,6 +119,8 @@ int mqtt_transport_disconnect(struct mqtt_client *client); int mqtt_client_tcp_connect(struct mqtt_client *client); int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); +int mqtt_client_tcp_write_msg(struct mqtt_client *client, + const struct msghdr *message); int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block); int mqtt_client_tcp_disconnect(struct mqtt_client *client); @@ -108,6 +130,8 @@ int mqtt_client_tcp_disconnect(struct mqtt_client *client); int mqtt_client_tls_connect(struct mqtt_client *client); int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); +int mqtt_client_tls_write_msg(struct mqtt_client *client, + const struct msghdr *message); int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block); int mqtt_client_tls_disconnect(struct mqtt_client *client); @@ -117,6 +141,8 @@ int mqtt_client_tls_disconnect(struct mqtt_client *client); int mqtt_client_websocket_connect(struct mqtt_client *client); int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); +int mqtt_client_websocket_write_msg(struct mqtt_client *client, + const struct msghdr *message); int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block); int mqtt_client_websocket_disconnect(struct mqtt_client *client); diff --git a/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c b/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c index c6702b11f07..0e4c64fc202 100644 --- a/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c +++ b/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c @@ -79,6 +79,20 @@ int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, return 0; } +int mqtt_client_tcp_write_msg(struct mqtt_client *client, + const struct msghdr *message) + +{ + int ret; + + ret = sendmsg(client->transport.tcp.sock, message, 0); + if (ret < 0) { + return -errno; + } + + return 0; +} + int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block) { diff --git a/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c b/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c index 0997130ec58..f005ea8ffde 100644 --- a/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c +++ b/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c @@ -117,6 +117,19 @@ int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data, return 0; } +int mqtt_client_tls_write_msg(struct mqtt_client *client, + const struct msghdr *message) +{ + int ret; + + ret = sendmsg(client->transport.tls.sock, message, 0); + if (ret < 0) { + return -errno; + } + + return 0; +} + int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block) { diff --git a/subsys/net/lib/mqtt/mqtt_transport_websocket.c b/subsys/net/lib/mqtt/mqtt_transport_websocket.c index af8e98d6d7c..fb4cd10c3f2 100644 --- a/subsys/net/lib/mqtt/mqtt_transport_websocket.c +++ b/subsys/net/lib/mqtt/mqtt_transport_websocket.c @@ -107,6 +107,36 @@ int mqtt_client_websocket_write(struct mqtt_client *client, const u8_t *data, return 0; } +int mqtt_client_websocket_write_msg(struct mqtt_client *client, + const struct msghdr *message) +{ + enum websocket_opcode opcode = WEBSOCKET_OPCODE_DATA_BINARY; + bool final = false; + ssize_t len; + ssize_t ret; + int i; + + len = 0; + for (i = 0; i < message->msg_iovlen; i++) { + if (i == message->msg_iovlen - 1) { + final = true; + } + + ret = websocket_send_msg(client->transport.websocket.sock, + message->msg_iov[i].iov_base, + message->msg_iov[i].iov_len, opcode, + true, final, K_FOREVER); + if (ret < 0) { + return ret; + } + + opcode = WEBSOCKET_OPCODE_CONTINUE; + len += ret; + } + + return len; +} + int mqtt_client_websocket_read(struct mqtt_client *client, u8_t *data, u32_t buflen, bool shall_block) {