ipc: icmsg: send nocopy feature

Add a nocopy feature for sending message through the icmsg IPC library.

Also eliminate data races if multiple threads are using the
same ipc instance and are trying to send a message simultaneously.

Signed-off-by: Hubert Miś <hubert.mis@gmail.com>
This commit is contained in:
Hubert Miś 2022-11-11 16:09:46 +01:00 committed by Carles Cufí
commit 3a1c9bce6e
3 changed files with 264 additions and 12 deletions

View file

@ -31,6 +31,7 @@ struct icmsg_data_t {
/* Tx/Rx buffers. */
struct spsc_pbuf *tx_ib;
struct spsc_pbuf *rx_ib;
atomic_t send_buffer_reserved;
/* Callbacks for an endpoint. */
const struct ipc_service_cb *cb;
@ -134,12 +135,124 @@ int icmsg_close(const struct icmsg_config_t *conf,
* instance.
* @retval -ENODATA when the requested data to send is empty.
* @retval -EBADMSG when the requested data to send is too big.
* @retval -ENOBUFS when there are no TX buffers available.
* @retval other errno codes from dependent modules.
*/
int icmsg_send(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
const void *msg, size_t len);
/** @brief Get an empty TX buffer to be sent using @ref icmsg_send_nocopy
*
* This function can be called to get an empty TX buffer so that the
* application can directly put its data into the sending buffer avoiding copy
* performed by the icmsg library.
*
* It is the application responsibility to correctly fill the allocated TX
* buffer with data and passing correct parameters to @ref
* icmsg_send_nocopy function to perform data no-copy-send mechanism.
*
* The size parameter can be used to request a buffer with a certain size:
* - if the size can be accommodated the function returns no errors and the
* buffer is allocated
* - if the requested size is too big, the function returns -ENOMEM and the
* the buffer is not allocated.
* - if the requested size is '0' the buffer is allocated with the maximum
* allowed size.
*
* In all the cases on return the size parameter contains the maximum size for
* the returned buffer.
*
* When the function returns no errors, the buffer is intended as allocated
* and it is released under one of two conditions: (1) when sending the buffer
* using @ref icmsg_send_nocopy (and in this case the buffer is automatically
* released by the backend), (2) when using @ref icmsg_drop_tx_buffer on a
* buffer not sent.
*
* @param[in] conf Structure containing configuration parameters for the icmsg
* instance being created.
* @param[inout] dev_data Structure containing run-time data used by the icmsg
* instance. The structure is initialized with
* @ref icmsg_init and its content must be preserved
* while the icmsg instance is active.
* @param[out] data Pointer to the empty TX buffer.
* @param[inout] size Pointer to store the requested TX buffer size. If the
* function returns -ENOMEM, this parameter returns the
* maximum allowed size.
*
* @retval -ENOBUFS when there are no TX buffers available.
* @retval -EALREADY when a buffer was already claimed and not yet released.
* @retval -ENOMEM when the requested size is too big (and the size parameter
* contains the maximum allowed size).
*
* @retval 0 on success.
*/
int icmsg_get_tx_buffer(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
void **data, size_t *size);
/** @brief Drop and release a TX buffer
*
* Drop and release a TX buffer. It is possible to drop only TX buffers
* obtained by using @ref icmsg_get_tx_buffer.
*
* @param[in] conf Structure containing configuration parameters for the icmsg
* instance being created.
* @param[inout] dev_data Structure containing run-time data used by the icmsg
* instance. The structure is initialized with
* @ref icmsg_init and its content must be preserved
* while the icmsg instance is active.
* @param[in] data Pointer to the TX buffer.
*
* @retval -EALREADY when the buffer was already dropped.
* @retval -ENXIO when the buffer was not obtained using @ref
* ipc_service_get_tx_buffer
*
* @retval 0 on success.
*/
int icmsg_drop_tx_buffer(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
const void *data);
/** @brief Send a message from a buffer obtained by @ref icmsg_get_tx_buffer
* to the remote icmsg instance.
*
* This is equivalent to @ref icmsg_send but in this case the TX buffer must
* have been obtained by using @ref icmsg_get_tx_buffer.
*
* The API user has to take the responsibility for getting the TX buffer using
* @ref icmsg_get_tx_buffer and filling the TX buffer with the data.
*
* After the @ref icmsg_send_nocopy function is issued the TX buffer is no
* more owned by the sending task and must not be touched anymore unless the
* function fails and returns an error.
*
* If this function returns an error, @ref icmsg_drop_tx_buffer can be used
* to drop the TX buffer.
*
* @param[in] conf Structure containing configuration parameters for the icmsg
* instance being created.
* @param[inout] dev_data Structure containing run-time data used by the icmsg
* instance. The structure is initialized with
* @ref icmsg_init and its content must be preserved
* while the icmsg instance is active.
* @param[in] msg Pointer to a buffer containing data to send.
* @param[in] len Size of data in the @p msg buffer.
*
*
* @retval 0 on success.
* @retval -EBUSY when the instance has not finished handshake with the remote
* instance.
* @retval -ENODATA when the requested data to send is empty.
* @retval -EBADMSG when the requested data to send is too big.
* @retval -ENXIO when the buffer was not obtained using @ref
* ipc_service_get_tx_buffer
* @retval other errno codes from dependent modules.
*/
int icmsg_send_nocopy(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
const void *msg, size_t len);
#ifdef CONFIG_IPC_SERVICE_ICMSG_NOCOPY_RX
/** @brief Hold RX buffer to be used outside of the received callback.
*

View file

@ -15,8 +15,8 @@ config IPC_SERVICE_ICMSG_NOCOPY_RX
bool
depends on IPC_SERVICE_ICMSG
help
Enable nocopy feature for the icmsg library that might be used by
backends based on icmsg.
Enable nocopy feature for receiving path of the icmsg library that
might be used by backends based on icmsg.
# The Icmsg library in its simplicity requires the system workqueue to execute
# at a cooperative priority.

View file

@ -13,8 +13,11 @@
#define RX_BUF_SIZE CONFIG_IPC_SERVICE_ICMSG_CB_BUF_SIZE
#define BOND_NOTIFY_REPEAT_TO K_MSEC(CONFIG_IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS)
#define BUFFER_RELEASED 0
#define BUFFER_HELD 1
#define RX_BUFFER_RELEASED 0
#define RX_BUFFER_HELD 1
#define SEND_BUFFER_UNUSED 0
#define SEND_BUFFER_RESERVED 1
static const uint8_t magic[] = {0x45, 0x6d, 0x31, 0x6c, 0x31, 0x4b,
0x30, 0x72, 0x6e, 0x33, 0x6c, 0x69, 0x34};
@ -66,10 +69,35 @@ static bool is_endpoint_ready(struct icmsg_data_t *dev_data)
return atomic_get(&dev_data->state) == ICMSG_STATE_READY;
}
static bool is_send_buffer_reserved(struct icmsg_data_t *dev_data)
{
return atomic_get(&dev_data->send_buffer_reserved) ==
SEND_BUFFER_RESERVED;
}
static int reserve_send_buffer_if_unused(struct icmsg_data_t *dev_data)
{
bool was_unused;
was_unused = atomic_cas(&dev_data->send_buffer_reserved,
SEND_BUFFER_UNUSED, SEND_BUFFER_RESERVED);
return was_unused ? 0 : -EALREADY;
}
static int release_send_buffer(struct icmsg_data_t *dev_data)
{
bool was_reserved;
was_reserved = atomic_cas(&dev_data->send_buffer_reserved,
SEND_BUFFER_RESERVED, SEND_BUFFER_UNUSED);
return was_reserved ? 0 : -EALREADY;
}
static bool is_rx_buffer_free(struct icmsg_data_t *dev_data)
{
#ifdef CONFIG_IPC_SERVICE_ICMSG_NOCOPY_RX
return atomic_get(&dev_data->rx_buffer_held) == BUFFER_RELEASED;
return atomic_get(&dev_data->rx_buffer_held) == RX_BUFFER_RELEASED;
#else
return true;
#endif
@ -256,6 +284,8 @@ int icmsg_send(const struct icmsg_config_t *conf,
const void *msg, size_t len)
{
int ret;
int write_ret;
int release_ret;
int sent_bytes;
if (!is_endpoint_ready(dev_data)) {
@ -267,13 +297,122 @@ int icmsg_send(const struct icmsg_config_t *conf,
return -ENODATA;
}
ret = spsc_pbuf_write(dev_data->tx_ib, msg, len);
if (ret < 0) {
return ret;
} else if (ret < len) {
ret = reserve_send_buffer_if_unused(dev_data);
if (ret) {
return -ENOBUFS;
}
write_ret = spsc_pbuf_write(dev_data->tx_ib, msg, len);
release_ret = release_send_buffer(dev_data);
__ASSERT_NO_MSG(!release_ret);
if (write_ret < 0) {
return write_ret;
} else if (write_ret < len) {
return -EBADMSG;
}
sent_bytes = ret;
sent_bytes = write_ret;
__ASSERT_NO_MSG(conf->mbox_tx.dev != NULL);
ret = mbox_send(&conf->mbox_tx, NULL);
if (ret) {
return ret;
}
return sent_bytes;
}
int icmsg_get_tx_buffer(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
void **data, size_t *size)
{
int ret;
int release_ret;
uint16_t requested_size;
int allocated_len;
char *allocated_buf;
if (*size == 0) {
/* Requested allocation of maximal size.
* Try to allocate maximal buffer size from spsc,
* potentially after wrapping marker.
*/
requested_size = SPSC_PBUF_MAX_LEN - 1;
} else {
requested_size = *size;
}
ret = reserve_send_buffer_if_unused(dev_data);
if (ret) {
return -ENOBUFS;
}
ret = spsc_pbuf_alloc(dev_data->tx_ib, requested_size, &allocated_buf);
if (ret < 0) {
release_ret = release_send_buffer(dev_data);
__ASSERT_NO_MSG(!release_ret);
return ret;
}
allocated_len = ret;
if (*size == 0) {
/* Requested allocation of maximal size.
* Pass the buffer that was allocated.
*/
*size = allocated_len;
*data = allocated_buf;
return 0;
}
if (*size == allocated_len) {
/* Allocated buffer is of requested size. */
*data = allocated_buf;
return 0;
}
/* Allocated smaller buffer than requested.
* Silently stop using the allocated buffer what is allowed by SPSC API
*/
release_send_buffer(dev_data);
*size = allocated_len;
return -ENOMEM;
}
int icmsg_drop_tx_buffer(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
const void *data)
{
/* Silently stop using the allocated buffer what is allowed by SPSC API
*/
return release_send_buffer(dev_data);
}
int icmsg_send_nocopy(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data,
const void *msg, size_t len)
{
int ret;
int sent_bytes;
if (!is_endpoint_ready(dev_data)) {
return -EBUSY;
}
/* Empty message is not allowed */
if (len == 0) {
return -ENODATA;
}
if (!is_send_buffer_reserved(dev_data)) {
return -ENXIO;
}
spsc_pbuf_commit(dev_data->tx_ib, len);
sent_bytes = len;
ret = release_send_buffer(dev_data);
__ASSERT_NO_MSG(!ret);
__ASSERT_NO_MSG(conf->mbox_tx.dev != NULL);
@ -301,7 +440,7 @@ int icmsg_hold_rx_buffer(const struct icmsg_config_t *conf,
}
was_released = atomic_cas(&dev_data->rx_buffer_held,
BUFFER_RELEASED, BUFFER_HELD);
RX_BUFFER_RELEASED, RX_BUFFER_HELD);
if (!was_released) {
return -EALREADY;
}
@ -324,7 +463,7 @@ int icmsg_release_rx_buffer(const struct icmsg_config_t *conf,
}
was_held = atomic_cas(&dev_data->rx_buffer_held,
BUFFER_HELD, BUFFER_RELEASED);
RX_BUFFER_HELD, RX_BUFFER_RELEASED);
if (!was_held) {
return -EALREADY;
}