rtio: Add RTIO Work-queues service

Adds ability to process synchronous requests in an asynchronous
fashion, relying on dedicated P4WQ's and pre-allocated work items.

Signed-off-by: Luis Ubieda <luisf@croxel.com>
This commit is contained in:
Luis Ubieda 2024-06-19 17:22:47 -04:00 committed by Anas Nashif
commit 3646b63217
5 changed files with 208 additions and 0 deletions

View file

@ -0,0 +1,80 @@
/*
* Copyright (c) 2024 Croxel Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_INCLUDE_RTIO_WORKQ_H_
#define ZEPHYR_INCLUDE_RTIO_WORKQ_H_
#include <stdint.h>
#include <zephyr/device.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/sys/p4wq.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Callback API to execute work operation.
*
* @param iodev_sqe Associated SQE operation.
*/
typedef void (*rtio_work_submit_t)(struct rtio_iodev_sqe *iodev_sqe);
/**
* @brief RTIO Work request.
*
* This RTIO Work request to perform a work operation decoupled
* from its submission in the RTIO work-queues.
*/
struct rtio_work_req {
/** Work item used to submit unit of work. */
struct k_p4wq_work work;
/** Handle to IODEV SQE containing the operation.
* This is filled inside @ref rtio_work_req_submit.
*/
struct rtio_iodev_sqe *iodev_sqe;
/** Callback handler where synchronous operation may be executed.
* This is filled inside @ref rtio_work_req_submit.
*/
rtio_work_submit_t handler;
};
/**
* @brief Allocate item to perform an RTIO work request.
*
* @details This allocation utilizes its internal memory slab with
* pre-allocated elements.
*
* @return Pointer to allocated item if successful.
* @return NULL if allocation failed.
*/
struct rtio_work_req *rtio_work_req_alloc(void);
/**
* @brief Submit RTIO work request.
*
* @param req Item to fill with request information.
* @param iodev_sqe RTIO Operation information.
* @param handler Callback to handler where work operation is performed.
*/
void rtio_work_req_submit(struct rtio_work_req *req,
struct rtio_iodev_sqe *iodev_sqe,
rtio_work_submit_t handler);
/**
* @brief Obtain number of currently used items from the pre-allocated pool.
*
* @return Number of used items.
*/
uint32_t rtio_work_req_used_count_get(void);
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_RTIO_WORKQ_H_ */

View file

@ -12,3 +12,5 @@ if(CONFIG_RTIO)
zephyr_library_sources(rtio_init.c)
zephyr_library_sources_ifdef(CONFIG_USERSPACE rtio_handlers.c)
endif()
zephyr_library_sources_ifdef(CONFIG_RTIO_WORKQ rtio_workq.c)

View file

@ -31,6 +31,8 @@ config RTIO_SYS_MEM_BLOCKS
without a pre-allocated memory buffer. Instead the buffer will be taken
from the allocated memory pool associated with the RTIO context.
rsource "Kconfig.workq"
module = RTIO
module-str = RTIO
module-help = Sets log level for RTIO support

34
subsys/rtio/Kconfig.workq Normal file
View file

@ -0,0 +1,34 @@
# Copyright (c) 2024 Croxel Inc.
# SPDX-License-Identifier: Apache-2.0
config RTIO_WORKQ
bool "RTIO Work-queues service to process Sync operations"
select SCHED_DEADLINE
select RTIO_CONSUME_SEM
help
Enable RTIO Work-queues to allow processing synchronous operations
in an asynchronous non-blocking fashion.
if RTIO_WORKQ
config RTIO_WORKQ_PRIO_MED
int "Medium Thread priority of RTIO Work-queues"
default MAIN_THREAD_PRIORITY
config RTIO_WORKQ_STACK_SIZE
int "Thread stack-size of RTIO Workqueues"
default 2048
config RTIO_WORKQ_THREADS_POOL
int "Number of threads to use for processing work-items"
default 1
config RTIO_WORKQ_POOL_ITEMS
int "Pool of work items to use with the RTIO Work-queues"
default 4
help
Configure the Pool of work items appropriately to your
application, the more simultaneous requests you expect
to issue, the bigger this pool should be.
endif # RTIO_WORKQ

90
subsys/rtio/rtio_workq.c Normal file
View file

@ -0,0 +1,90 @@
/*
* Copyright (c) 2024 Croxel Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/rtio/work.h>
#include <zephyr/kernel.h>
#define RTIO_WORKQ_PRIO_MED CONFIG_RTIO_WORKQ_PRIO_MED
#define RTIO_WORKQ_PRIO_HIGH RTIO_WORKQ_PRIO_MED - 1
#define RTIO_WORKQ_PRIO_LOW RTIO_WORKQ_PRIO_MED + 1
K_P4WQ_DEFINE(rtio_workq,
CONFIG_RTIO_WORKQ_THREADS_POOL,
CONFIG_RTIO_WORKQ_STACK_SIZE);
K_MEM_SLAB_DEFINE_STATIC(rtio_work_items_slab,
sizeof(struct rtio_work_req),
CONFIG_RTIO_WORKQ_POOL_ITEMS,
4);
static void rtio_work_handler(struct k_p4wq_work *work)
{
struct rtio_work_req *req = CONTAINER_OF(work,
struct rtio_work_req,
work);
struct rtio_iodev_sqe *iodev_sqe = req->iodev_sqe;
req->handler(iodev_sqe);
k_mem_slab_free(&rtio_work_items_slab, req);
}
struct rtio_work_req *rtio_work_req_alloc(void)
{
struct rtio_work_req *req;
int err;
err = k_mem_slab_alloc(&rtio_work_items_slab, (void **)&req, K_NO_WAIT);
if (err) {
return NULL;
}
(void)k_sem_init(&req->work.done_sem, 1, 1);
return req;
}
void rtio_work_req_submit(struct rtio_work_req *req,
struct rtio_iodev_sqe *iodev_sqe,
rtio_work_submit_t handler)
{
if (!req) {
return;
}
if (!iodev_sqe || !handler) {
k_mem_slab_free(&rtio_work_items_slab, req);
return;
}
struct k_p4wq_work *work = &req->work;
struct rtio_sqe *sqe = &iodev_sqe->sqe;
/** Link the relevant info so that we can get it on the k_p4wq_work work item.
*/
req->iodev_sqe = iodev_sqe;
req->handler = handler;
/** Set the required information to handle the action */
work->handler = rtio_work_handler;
work->deadline = 0;
if (sqe->prio == RTIO_PRIO_LOW) {
work->priority = RTIO_WORKQ_PRIO_LOW;
} else if (sqe->prio == RTIO_PRIO_HIGH) {
work->priority = RTIO_WORKQ_PRIO_HIGH;
} else {
work->priority = RTIO_WORKQ_PRIO_MED;
}
/** Decoupling action: Let the P4WQ execute the action. */
k_p4wq_submit(&rtio_workq, work);
}
uint32_t rtio_work_req_used_count_get(void)
{
return k_mem_slab_num_used_get(&rtio_work_items_slab);
}