diff --git a/include/net/mqtt.h b/include/net/mqtt.h index 1757aa2216f..963849e2c96 100644 --- a/include/net/mqtt.h +++ b/include/net/mqtt.h @@ -715,6 +715,21 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, size_t length); +/** + * @brief Blocking version of @ref mqtt_read_publish_payload function which + * runs until the required number of bytes are read. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[out] buffer Buffer where payload should be stored. + * @param[in] length Number of bytes to read. + * + * @return 0 if success, otherwise a negative error code (errno.h) indicating + * reason of failure. + */ +int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer, + size_t length); + #ifdef __cplusplus } #endif diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index 77653d2a66e..f3ab439e502 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -647,3 +647,25 @@ int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, { return read_publish_payload(client, buffer, length, true); } + +int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer, + size_t length) +{ + u8_t *end = buffer + length; + + while (buffer < end) { + int ret = mqtt_read_publish_payload_blocking(client, buffer, + end - buffer); + + if (ret < 0) { + return ret; + } else if (ret == 0) { + return -EIO; + } + + buffer += ret; + } + + return 0; +} + diff --git a/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c b/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c index 5d06b8e3802..01a9bbc71a3 100644 --- a/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c +++ b/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c @@ -101,8 +101,7 @@ void publish_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) { int rc; - u8_t buf[16]; - u32_t offset = 0U; + static u8_t buf[sizeof(payload_long)]; if (evt->result != 0) { TC_PRINT("MQTT PUBLISH error: %d\n", evt->result); @@ -115,26 +114,19 @@ void publish_handler(struct mqtt_client *const client, goto error; } - while (payload_left > 0) { - rc = mqtt_read_publish_payload_blocking(client, buf, - sizeof(buf)); - if (rc <= 0) { - TC_PRINT("Failed to receive payload, err: %d\n", -rc); - if (rc == -EAGAIN) { - continue; - } - goto error; - } - - if (memcmp(payload + offset, buf, rc) != 0) { - TC_PRINT("Invalid payload content\n"); - goto error; - } - - payload_left -= rc; - offset += rc; + rc = mqtt_readall_publish_payload(client, buf, payload_left); + if (rc != 0) { + TC_PRINT("Error while reading publish payload\n"); + goto error; } + if (memcmp(payload, buf, evt->param.publish.message.payload.len != 0)) { + TC_PRINT("Invalid payload content\n"); + goto error; + } + + payload_left = 0; + return; error: