net: mqtt: add mqtt_readall_publish_payload()
This function uses mqtt_read_publish_payload_blocking to perform a blocking read of the specified number of bytes. When reading out a payload, the normal use case is to read the entire payload. This function facilitates that use case. Signed-off-by: Håkon Øye Amundsen <haakon.amundsen@nordicsemi.no>
This commit is contained in:
parent
e1f0b61d23
commit
05cd3420ac
3 changed files with 49 additions and 20 deletions
|
@ -715,6 +715,21 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
|
||||||
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
|
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
|
||||||
size_t length);
|
size_t length);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Blocking version of @ref mqtt_read_publish_payload function which
|
||||||
|
* runs until the required number of bytes are read.
|
||||||
|
*
|
||||||
|
* @param[in] client Client instance for which the procedure is requested.
|
||||||
|
* Shall not be NULL.
|
||||||
|
* @param[out] buffer Buffer where payload should be stored.
|
||||||
|
* @param[in] length Number of bytes to read.
|
||||||
|
*
|
||||||
|
* @return 0 if success, otherwise a negative error code (errno.h) indicating
|
||||||
|
* reason of failure.
|
||||||
|
*/
|
||||||
|
int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer,
|
||||||
|
size_t length);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -647,3 +647,25 @@ int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
|
||||||
{
|
{
|
||||||
return read_publish_payload(client, buffer, length, true);
|
return read_publish_payload(client, buffer, length, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer,
|
||||||
|
size_t length)
|
||||||
|
{
|
||||||
|
u8_t *end = buffer + length;
|
||||||
|
|
||||||
|
while (buffer < end) {
|
||||||
|
int ret = mqtt_read_publish_payload_blocking(client, buffer,
|
||||||
|
end - buffer);
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
|
return ret;
|
||||||
|
} else if (ret == 0) {
|
||||||
|
return -EIO;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer += ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,8 +101,7 @@ void publish_handler(struct mqtt_client *const client,
|
||||||
const struct mqtt_evt *evt)
|
const struct mqtt_evt *evt)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
u8_t buf[16];
|
static u8_t buf[sizeof(payload_long)];
|
||||||
u32_t offset = 0U;
|
|
||||||
|
|
||||||
if (evt->result != 0) {
|
if (evt->result != 0) {
|
||||||
TC_PRINT("MQTT PUBLISH error: %d\n", evt->result);
|
TC_PRINT("MQTT PUBLISH error: %d\n", evt->result);
|
||||||
|
@ -115,25 +114,18 @@ void publish_handler(struct mqtt_client *const client,
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (payload_left > 0) {
|
rc = mqtt_readall_publish_payload(client, buf, payload_left);
|
||||||
rc = mqtt_read_publish_payload_blocking(client, buf,
|
if (rc != 0) {
|
||||||
sizeof(buf));
|
TC_PRINT("Error while reading publish payload\n");
|
||||||
if (rc <= 0) {
|
|
||||||
TC_PRINT("Failed to receive payload, err: %d\n", -rc);
|
|
||||||
if (rc == -EAGAIN) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (memcmp(payload + offset, buf, rc) != 0) {
|
if (memcmp(payload, buf, evt->param.publish.message.payload.len != 0)) {
|
||||||
TC_PRINT("Invalid payload content\n");
|
TC_PRINT("Invalid payload content\n");
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
payload_left -= rc;
|
payload_left = 0;
|
||||||
offset += rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue