net: lib: mqtt: Enable blocking PUBLISH payload readout

It is convenient to have a blocking version of
`mqtt_read_publish_payload` function, for cases when it is called from
the event handler. Therefore, extend the 'mqtt_read_publish_payload'
argument list with information whether the call should block or not.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2019-02-05 12:52:40 +01:00 committed by Anas Nashif
commit b8494d9a51
8 changed files with 61 additions and 17 deletions

View file

@ -698,6 +698,20 @@ int mqtt_input(struct mqtt_client *client);
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length); size_t length);
/**
* @brief Blocking version of @ref mqtt_read_publish_payload function.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[out] buffer Buffer where payload should be stored.
* @param[in] length Length of the buffer, in bytes.
*
* @return Number of bytes read or a negative error code (errno.h) indicating
* reason of failure.
*/
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
size_t length);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View file

@ -596,8 +596,8 @@ int mqtt_input(struct mqtt_client *client)
return err_code; return err_code;
} }
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, static int read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length) size_t length, bool shall_block)
{ {
int ret; int ret;
@ -614,8 +614,8 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
length = client->internal.remaining_payload; length = client->internal.remaining_payload;
} }
ret = mqtt_transport_read(client, buffer, length); ret = mqtt_transport_read(client, buffer, length, shall_block);
if (ret == -EAGAIN) { if (!shall_block && ret == -EAGAIN) {
goto exit; goto exit;
} }
@ -635,3 +635,15 @@ exit:
return ret; return ret;
} }
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length)
{
return read_publish_payload(client, buffer, length, false);
}
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
size_t length)
{
return read_publish_payload(client, buffer, length, true);
}

View file

@ -158,7 +158,7 @@ static int mqtt_read_message_chunk(struct mqtt_client *client,
return -ENOMEM; return -ENOMEM;
} }
len = mqtt_transport_read(client, buf->end, remaining); len = mqtt_transport_read(client, buf->end, remaining, false);
if (len < 0) { if (len < 0) {
MQTT_TRC("[CID %p]: Transport read error: %d", client, len); MQTT_TRC("[CID %p]: Transport read error: %d", client, len);
return len; return len;

View file

@ -16,7 +16,7 @@ extern int mqtt_client_tcp_connect(struct mqtt_client *client);
extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen); u32_t datalen);
extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
u32_t buflen); u32_t buflen, bool shall_block);
extern int mqtt_client_tcp_disconnect(struct mqtt_client *client); extern int mqtt_client_tcp_disconnect(struct mqtt_client *client);
#if defined(CONFIG_MQTT_LIB_TLS) #if defined(CONFIG_MQTT_LIB_TLS)
@ -25,7 +25,7 @@ extern int mqtt_client_tls_connect(struct mqtt_client *client);
extern int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data, extern int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen); u32_t datalen);
extern int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, extern int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data,
u32_t buflen); u32_t buflen, bool shall_block);
extern int mqtt_client_tls_disconnect(struct mqtt_client *client); extern int mqtt_client_tls_disconnect(struct mqtt_client *client);
#endif /* CONFIG_MQTT_LIB_TLS */ #endif /* CONFIG_MQTT_LIB_TLS */
@ -72,9 +72,11 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
datalen); datalen);
} }
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen) int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{ {
return transport_fn[client->transport.type].read(client, data, buflen); return transport_fn[client->transport.type].read(client, data, buflen,
shall_block);
} }
int mqtt_transport_disconnect(struct mqtt_client *client) int mqtt_transport_disconnect(struct mqtt_client *client)

View file

@ -27,7 +27,7 @@ typedef int (*transport_write_handler_t)(struct mqtt_client *client,
/**@brief Transport read handler. */ /**@brief Transport read handler. */
typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data, typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data,
u32_t buflen); u32_t buflen, bool shall_block);
/**@brief Transport disconnect handler. */ /**@brief Transport disconnect handler. */
typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client); typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client);
@ -79,11 +79,13 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested. * @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched. * @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation. * @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
* *
* @retval Number of bytes read or an error code indicating reason for failure. * @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed. * 0 if connection was closed.
*/ */
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen); int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block);
/**@brief Handles transport disconnection requests on configured transport. /**@brief Handles transport disconnection requests on configured transport.
* *

View file

@ -86,15 +86,22 @@ int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested. * @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched. * @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation. * @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
* *
* @retval Number of bytes read or an error code indicating reason for failure. * @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed. * 0 if connection was closed.
*/ */
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen) int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{ {
int flags = 0;
int ret; int ret;
ret = recv(client->transport.tcp.sock, data, buflen, MSG_DONTWAIT); if (!shall_block) {
flags |= MSG_DONTWAIT;
}
ret = recv(client->transport.tcp.sock, data, buflen, flags);
if (ret < 0) { if (ret < 0) {
return -errno; return -errno;
} }

View file

@ -125,15 +125,22 @@ int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested. * @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched. * @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation. * @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
* *
* @retval Number of bytes read or an error code indicating reason for failure. * @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed. * 0 if connection was closed.
*/ */
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen) int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{ {
int flags = 0;
int ret; int ret;
ret = recv(client->transport.tls.sock, data, buflen, MSG_DONTWAIT); if (!shall_block) {
flags |= MSG_DONTWAIT;
}
ret = recv(client->transport.tls.sock, data, buflen, flags);
if (ret < 0) { if (ret < 0) {
return -errno; return -errno;
} }

View file

@ -116,8 +116,8 @@ void publish_handler(struct mqtt_client *const client,
} }
while (payload_left > 0) { while (payload_left > 0) {
wait(APP_SLEEP_MSECS); rc = mqtt_read_publish_payload_blocking(client, buf,
rc = mqtt_read_publish_payload(client, buf, sizeof(buf)); sizeof(buf));
if (rc <= 0) { if (rc <= 0) {
TC_PRINT("Failed to receive payload, err: %d\n", -rc); TC_PRINT("Failed to receive payload, err: %d\n", -rc);
if (rc == -EAGAIN) { if (rc == -EAGAIN) {