ipc: icmsg: Dedicated workqueue for ICMSG backend

The ICMSG backend now has a dedicated workqueue to process incoming IPC
messages. The system workqueue is no longer utilized for that purpose.

Testing shows that in certain scenarios substituting a RPMsg backend
with ICMsg results in deadlocks.
The deadlocks were a symptom of running a synchronous RPC protocol from
the context of the system workqueue and transpired as follows:

1. The RPC protocol sends a request over the ICMsg backend on the system
   workqueue thread.
2. The RPC protocol puts the thread to sleep until response is received.
   This puts the system workqueue thread to sleep.
3. The response to the request arrives over ICMsg backend.
4. The backend signals a work item to the system workqueue.
5. The system workqueue is unable to process the response due to being
   previously pended on the RPC request.

The deadlock was initially observed with the nrf-802154 driver in
conjuntion with the IPv6 stack.

To prevent this condition from occurring, the approach was selected to
give ICMsg a dedicated workqueue thread.

Added a Kconfig option that enables the dedicated workqueue by default.
The config can be disabled, if the user wants to preserve RAM capacity
and is certain that the deadlock condition is not encountered.

Signed-off-by: Rafał Kuźnia <rafal.kuznia@nordicsemi.no>
This commit is contained in:
Rafał Kuźnia 2023-08-07 13:51:53 +02:00 committed by Carles Cufí
commit 5b12b6ab55
2 changed files with 74 additions and 8 deletions

View file

@ -27,11 +27,6 @@ config IPC_SERVICE_ICMSG_SHMEM_ACCESS_TO_MS
Maximum time to wait, in milliseconds, for access to send data with Maximum time to wait, in milliseconds, for access to send data with
backends basing on icmsg library. This time should be relatively low. backends basing on icmsg library. This time should be relatively low.
# The Icmsg library in its simplicity requires the system workqueue to execute
# at a cooperative priority.
config SYSTEM_WORKQUEUE_PRIORITY
range -256 -1
config IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS config IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS
int "Bond notification timeout in miliseconds" int "Bond notification timeout in miliseconds"
range 1 100 range 1 100
@ -39,3 +34,46 @@ config IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS
help help
Time to wait for remote bonding notification before the Time to wait for remote bonding notification before the
notification is repeated. notification is repeated.
config IPC_SERVICE_BACKEND_ICMSG_WQ_ENABLE
bool "Use dedicated workqueue"
default y
help
Enable dedicated workqueue thread for the ICMsg backend.
Disabling this configuration will cause the ICMsg backend to
process incoming data through the system workqueue context, and
therefore reduces the RAM footprint of the backend.
Disabling this config may result in deadlocks in certain usage
scenarios, such as when synchronous IPC is executed from the system
workqueue context.
The callbacks coming from the backend are executed from the workqueue
context.
When the option is disabled, the user must obey the restrictions
imposed by the system workqueue, such as never performing blocking
operations from within the callback.
if IPC_SERVICE_BACKEND_ICMSG_WQ_ENABLE
config IPC_SERVICE_BACKEND_ICMSG_WQ_STACK_SIZE
int "Size of RX work queue stack"
default 1024
help
Size of stack used by work queue RX thread. This work queue is
created to prevent notifying service users about received data
from the system work queue. The queue is shared among instances.
config IPC_SERVICE_BACKEND_ICMSG_WQ_PRIORITY
int "Priority of RX work queue thread"
default -1
range -256 -1
help
Priority of the ICMSG RX work queue thread.
The ICMSG library in its simplicity requires the workqueue to execute
at a cooperative priority.
endif
# The Icmsg library in its simplicity requires the system workqueue to execute
# at a cooperative priority.
config SYSTEM_WORKQUEUE_PRIORITY
range -256 -1 if !IPC_SERVICE_BACKEND_ICMSG_WQ_ENABLE

View file

@ -10,6 +10,7 @@
#include <zephyr/drivers/mbox.h> #include <zephyr/drivers/mbox.h>
#include <zephyr/sys/atomic.h> #include <zephyr/sys/atomic.h>
#include <zephyr/sys/spsc_pbuf.h> #include <zephyr/sys/spsc_pbuf.h>
#include <zephyr/init.h>
#define BOND_NOTIFY_REPEAT_TO K_MSEC(CONFIG_IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS) #define BOND_NOTIFY_REPEAT_TO K_MSEC(CONFIG_IPC_SERVICE_ICMSG_BOND_NOTIFY_REPEAT_TO_MS)
#define SHMEM_ACCESS_TO K_MSEC(CONFIG_IPC_SERVICE_ICMSG_SHMEM_ACCESS_TO_MS) #define SHMEM_ACCESS_TO K_MSEC(CONFIG_IPC_SERVICE_ICMSG_SHMEM_ACCESS_TO_MS)
@ -28,6 +29,14 @@ enum tx_buffer_state {
static const uint8_t magic[] = {0x45, 0x6d, 0x31, 0x6c, 0x31, 0x4b, static const uint8_t magic[] = {0x45, 0x6d, 0x31, 0x6c, 0x31, 0x4b,
0x30, 0x72, 0x6e, 0x33, 0x6c, 0x69, 0x34}; 0x30, 0x72, 0x6e, 0x33, 0x6c, 0x69, 0x34};
#if IS_ENABLED(CONFIG_IPC_SERVICE_BACKEND_ICMSG_WQ_ENABLE)
static K_THREAD_STACK_DEFINE(icmsg_stack, CONFIG_IPC_SERVICE_BACKEND_ICMSG_WQ_STACK_SIZE);
static struct k_work_q icmsg_workq;
static struct k_work_q *const workq = &icmsg_workq;
#else
static struct k_work_q *const workq = &k_sys_work_q;
#endif
static int mbox_deinit(const struct icmsg_config_t *conf, static int mbox_deinit(const struct icmsg_config_t *conf,
struct icmsg_data_t *dev_data) struct icmsg_data_t *dev_data)
{ {
@ -62,7 +71,7 @@ static void notify_process(struct k_work *item)
if (state != ICMSG_STATE_READY) { if (state != ICMSG_STATE_READY) {
int ret; int ret;
ret = k_work_reschedule(dwork, BOND_NOTIFY_REPEAT_TO); ret = k_work_reschedule_for_queue(workq, dwork, BOND_NOTIFY_REPEAT_TO);
__ASSERT_NO_MSG(ret >= 0); __ASSERT_NO_MSG(ret >= 0);
(void)ret; (void)ret;
} }
@ -138,7 +147,7 @@ static bool is_rx_data_available(struct icmsg_data_t *dev_data)
static void submit_mbox_work(struct icmsg_data_t *dev_data) static void submit_mbox_work(struct icmsg_data_t *dev_data)
{ {
if (k_work_submit(&dev_data->mbox_work) < 0) { if (k_work_submit_to_queue(workq, &dev_data->mbox_work) < 0) {
/* The mbox processing work is never canceled. /* The mbox processing work is never canceled.
* The negative error code should never be seen. * The negative error code should never be seen.
*/ */
@ -290,7 +299,7 @@ int icmsg_open(const struct icmsg_config_t *conf,
return ret; return ret;
} }
ret = k_work_schedule(&dev_data->notify_work, K_NO_WAIT); ret = k_work_schedule_for_queue(workq, &dev_data->notify_work, K_NO_WAIT);
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
@ -519,3 +528,22 @@ int icmsg_release_rx_buffer(const struct icmsg_config_t *conf,
return 0; return 0;
} }
#endif /* CONFIG_IPC_SERVICE_ICMSG_NOCOPY_RX */ #endif /* CONFIG_IPC_SERVICE_ICMSG_NOCOPY_RX */
#if IS_ENABLED(CONFIG_IPC_SERVICE_BACKEND_ICMSG_WQ_ENABLE)
static int work_q_init(void)
{
struct k_work_queue_config cfg = {
.name = "icmsg_workq",
};
k_work_queue_start(&icmsg_workq,
icmsg_stack,
K_KERNEL_STACK_SIZEOF(icmsg_stack),
CONFIG_IPC_SERVICE_BACKEND_ICMSG_WQ_PRIORITY, &cfg);
return 0;
}
SYS_INIT(work_q_init, POST_KERNEL, CONFIG_KERNEL_INIT_PRIORITY_DEFAULT);
#endif