rtio: Low (Memory) Cost Concurrent scheduler

Schedules I/O chains in the same order as they arrive providing a fixed
amount of concurrency. The low memory cost comes at the cost of some
computational cost that is likely to be acceptable with small amounts
of concurrency.

The code cost is about 4x higher than the simple linear executor
which isn't entirely unexpected as the logic requirements are quite a bit
more than doing the next thing in the queue.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
This commit is contained in:
Tom Burdick 2022-05-12 12:00:21 -05:00 committed by Anas Nashif
commit 121462b129
7 changed files with 569 additions and 86 deletions

View file

@ -0,0 +1,126 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_
#define ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief RTIO Concurrent Executor
*
* Provides a concurrent executor with a pointer overhead per task and a
* 2 word overhead over the simple executor to know the order of tasks (fifo).
*
* @defgroup rtio_executor_concurrent RTIO concurrent Executor
* @ingroup rtio
* @{
*/
/**
* @brief Submit to the concurrent executor
*
* @param r RTIO context to submit
*
* @retval 0 always succeeds
*/
int rtio_concurrent_submit(struct rtio *r);
/**
* @brief Report a SQE has completed successfully
*
* @param r RTIO context to use
* @param sqe RTIO SQE to report success
* @param result Result of the SQE
*/
void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result);
/**
* @brief Report a SQE has completed with error
*
* @param r RTIO context to use
* @param sqe RTIO SQE to report success
* @param result Result of the SQE
*/
void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result);
/**
* @brief Concurrent Executor
*
* Notably all values are effectively owned by each task with the exception
* of task_in and task_out.
*/
struct rtio_concurrent_executor {
struct rtio_executor ctx;
/* Lock around the queues */
struct k_spinlock lock;
/* Task ring position and count */
uint16_t task_in, task_out, task_mask;
/* First pending sqe to start when a task becomes available */
struct rtio_sqe *pending_sqe;
/* Last sqe seen from the most recent submit */
struct rtio_sqe *last_sqe;
/* Array of task statuses */
uint8_t *task_status;
/* Array of struct rtio_sqe *'s one per task' */
struct rtio_sqe **task_cur;
};
/**
* @cond INTERNAL_HIDDEN
*/
static const struct rtio_executor_api z_rtio_concurrent_api = {
.submit = rtio_concurrent_submit,
.ok = rtio_concurrent_ok,
.err = rtio_concurrent_err
};
/**
* @endcond INTERNAL_HIDDEN
*/
/**
* @brief Statically define and initialie a concurrent executor
*
* @param name Symbol name, must be unique in the context in which its used
* @param concurrency Allowed concurrency (number of concurrent tasks).
*/
#define RTIO_EXECUTOR_CONCURRENT_DEFINE(name, concurrency) \
static struct rtio_sqe *_task_cur_##name[(concurrency)]; \
uint8_t _task_status_##name[(concurrency)]; \
static struct rtio_concurrent_executor name = { \
.ctx = { .api = &z_rtio_concurrent_api }, \
.task_in = 0, \
.task_out = 0, \
.task_mask = (concurrency)-1, \
.pending_sqe = NULL, \
.last_sqe = NULL, \
.task_status = _task_status_##name, \
.task_cur = _task_cur_##name, \
};
/**
* @}
*/
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_ */

View file

@ -109,9 +109,8 @@ struct rtio_spsc {
* @param type Type stored in the spsc
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
*/
#define RTIO_SPSC_DEFINE(name, type, sz) \
RTIO_SPSC_DECLARE(name, type, sz) name = \
RTIO_SPSC_INITIALIZER(name, type, sz);
#define RTIO_SPSC_DEFINE(name, type, sz) \
RTIO_SPSC_DECLARE(name, type, sz) name = RTIO_SPSC_INITIALIZER(name, type, sz);
/**
* @brief Size of the SPSC queue
@ -127,7 +126,7 @@ struct rtio_spsc {
* @param spsc SPSC reference
* @param i Value to modulo to the size of the spsc
*/
#define z_rtio_spsc_mask(spsc, i) (i & (spsc)->_spsc.mask)
#define z_rtio_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask)
/**
* @brief Initialize/reset a spsc such that its empty
@ -154,9 +153,8 @@ struct rtio_spsc {
*/
#define rtio_spsc_acquire(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \
bool acq = \
(idx - (uint32_t)atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \
unsigned long idx = atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \
bool acq = (idx - atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \
if (acq) { \
(spsc)->_spsc.acquire += 1; \
} \
@ -170,12 +168,12 @@ struct rtio_spsc {
*
* @param spsc SPSC to produce the previously acquired element or do nothing
*/
#define rtio_spsc_produce(spsc) \
({ \
if ((spsc)->_spsc.acquire > 0) { \
(spsc)->_spsc.acquire -= 1; \
atomic_add(&(spsc)->_spsc.in, 1); \
} \
#define rtio_spsc_produce(spsc) \
({ \
if ((spsc)->_spsc.acquire > 0) { \
(spsc)->_spsc.acquire -= 1; \
atomic_add(&(spsc)->_spsc.in, 1); \
} \
})
/**
@ -186,27 +184,13 @@ struct rtio_spsc {
*
* @param spsc SPSC to produce all previously acquired elements or do nothing
*/
#define rtio_spsc_produce_all(spsc) \
({ \
if ((spsc)->_spsc.acquire > 0) { \
unsigned long acquired = (spsc)->_spsc.acquire; \
(spsc)->_spsc.acquire = 0; \
atomic_add(&(spsc)->_spsc.in, acquired); \
} \
})
/**
* @brief Peek at an element from the spsc
*
* @param spsc Spsc to peek into
*
* @return Pointer to element or null if no consumable elements left
*/
#define rtio_spsc_peek(spsc) \
#define rtio_spsc_produce_all(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
if ((spsc)->_spsc.acquire > 0) { \
unsigned long acquired = (spsc)->_spsc.acquire; \
(spsc)->_spsc.acquire = 0; \
atomic_add(&(spsc)->_spsc.in, acquired); \
} \
})
/**
@ -216,14 +200,14 @@ struct rtio_spsc {
*
* @return Pointer to element or null if no consumable elements left
*/
#define rtio_spsc_consume(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \
if (has_consumable) { \
(spsc)->_spsc.consume += 1; \
} \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
#define rtio_spsc_consume(spsc) \
({ \
unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in)); \
if (has_consumable) { \
(spsc)->_spsc.consume += 1; \
} \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})
/**
@ -231,12 +215,12 @@ struct rtio_spsc {
*
* @param spsc SPSC to release consumed element or do nothing
*/
#define rtio_spsc_release(spsc) \
({ \
if ((spsc)->_spsc.consume > 0) { \
(spsc)->_spsc.consume -= 1; \
atomic_add(&(spsc)->_spsc.out, 1); \
} \
#define rtio_spsc_release(spsc) \
({ \
if ((spsc)->_spsc.consume > 0) { \
(spsc)->_spsc.consume -= 1; \
atomic_add(&(spsc)->_spsc.out, 1); \
} \
})
/**
@ -244,12 +228,54 @@ struct rtio_spsc {
*
* @param spsc SPSC to get item count for
*/
#define rtio_spsc_consumable(spsc) \
({ \
(spsc)->_spsc.in \
- (spsc)->_spsc.out \
- (spsc)->_spsc.consume; \
}) \
#define rtio_spsc_consumable(spsc) \
({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
/**
* @brief Peek at the first available item in queue
*
* @param spsc Spsc to peek into
*
* @return Pointer to element or null if no consumable elements left
*/
#define rtio_spsc_peek(spsc) \
({ \
unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in)); \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})
/**
* @brief Peek at the next item in the queue from a given one
*
*
* @param spsc SPSC to peek at
* @param item Pointer to an item in the queue
*
* @return Pointer to element or null if none left
*/
#define rtio_spsc_next(spsc, item) \
({ \
unsigned long idx = ((item) - (spsc)->buffer); \
bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) != \
(z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.in))); \
has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL; \
})
/**
* @brief Get the previous item in the queue from a given one
*
* @param spsc SPSC to peek at
* @param item Pointer to an item in the queue
*
* @return Pointer to element or null if none left
*/
#define rtio_spsc_prev(spsc, item) \
({ \
unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \
bool has_prev = idx != z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.out)); \
has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL; \
})
/**
* @}

View file

@ -11,4 +11,9 @@ if(CONFIG_RTIO)
rtio_executor_simple.c
)
zephyr_library_sources_ifdef(
CONFIG_RTIO_EXECUTOR_CONCURRENT
rtio_executor_concurrent.c
)
endif()

View file

@ -12,7 +12,14 @@ config RTIO_EXECUTOR_SIMPLE
help
An simple RTIO executor that will execute a queue of requested I/O
operations as if they are a single chain of submission queue entries. This
does not support concurrent chains.
does not support concurrent chains or submissions.
config RTIO_EXECUTOR_CONCURRENT
bool "A low cost concurrent executor for RTIO"
default y
help
A low memory cost RTIO executor that will execute a queue of requested I/O
with a fixed amount of concurrency using minimal memory overhead.
config RTIO_SUBMIT_SEM
bool "Use a semaphore when waiting for completions in rtio_submit"

View file

@ -0,0 +1,278 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include "spinlock.h"
#include <rtio/rtio_executor_concurrent.h>
#include <rtio/rtio.h>
#include <zephyr/kernel.h>
#include <logging/log.h>
LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL);
#define CONEX_TASK_COMPLETE BIT(0)
#define CONEX_TASK_SUSPENDED BIT(1)
/**
* @file
* @brief Concurrent RTIO Executor
*
* The concurrent executor provides fixed amounts of concurrency
* using minimal overhead but assumes a small number of concurrent tasks.
*
* Many of the task lookup and management functions in here are O(N) over N
* tasks. This is fine when the task set is *small*. Task lookup could be
* improved in the future with a binary search at the expense of code size.
*
* The assumption here is that perhaps only 8-16 concurrent tasks are likely
* such that simple short for loops over task array are reasonably fast.
*
* A maximum of 65K submissions queue entries are possible.
*/
/**
* check if there is a free task available
*/
static bool conex_task_free(struct rtio_concurrent_executor *exc)
{
return (exc->task_in - exc->task_out) < (exc->task_mask + 1);
}
/**
* get the next free available task index
*/
static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
{
uint16_t task_id = exc->task_in;
exc->task_in++;
return task_id;
}
static uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
const struct rtio_sqe *sqe)
{
uint16_t task_id = exc->task_out;
for (; task_id < exc->task_in; task_id++) {
if (exc->task_cur[task_id & exc->task_mask] == sqe) {
break;
}
}
return task_id;
}
static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc)
{
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) {
rtio_spsc_release(r->sq);
sqe = rtio_spsc_consume(r->sq);
}
rtio_spsc_release(r->sq);
}
static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order sweep up */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) {
LOG_INF("sweeping oldest task %d", task_id);
conex_sweep_task(r, exc);
exc->task_out++;
} else {
break;
}
}
}
static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
{
/* In order resume tasks */
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
LOG_INF("resuming suspended task %d", task_id);
exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED;
rtio_iodev_submit(exc->task_cur[task_id], r);
}
}
}
static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
{
conex_sweep(r, exc);
conex_resume(r, exc);
}
/**
* @brief Submit submissions to concurrent executor
*
* @param r RTIO context
*
* @retval 0 Always succeeds
*/
int rtio_concurrent_submit(struct rtio *r)
{
LOG_INF("submit");
struct rtio_concurrent_executor *exc =
(struct rtio_concurrent_executor *)r->executor;
struct rtio_sqe *sqe;
struct rtio_sqe *last_sqe;
k_spinlock_key_t key;
key = k_spin_lock(&exc->lock);
/* If never submitted before peek at the first item
* otherwise start back up where the last submit call
* left off
*/
if (exc->last_sqe == NULL) {
sqe = rtio_spsc_peek(r->sq);
} else {
/* Pickup from last submit call */
sqe = rtio_spsc_next(r->sq, exc->last_sqe);
}
last_sqe = sqe;
while (sqe != NULL && conex_task_free(exc)) {
LOG_INF("head SQE in chain %p", sqe);
/* Get the next task id if one exists */
uint16_t task_idx = conex_task_next(exc);
LOG_INF("setting up task %d", task_idx);
/* Setup task (yes this is it) */
exc->task_cur[task_idx] = sqe;
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
LOG_INF("submitted sqe %p", sqe);
/* Go to the next sqe not in the current chain */
while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) {
sqe = rtio_spsc_next(r->sq, sqe);
}
LOG_INF("tail SQE in chain %p", sqe);
last_sqe = sqe;
/* SQE is the end of the previous chain */
sqe = rtio_spsc_next(r->sq, sqe);
}
/* Out of available pointers, wait til others complete, note the
* first pending submission queue. May be NULL if nothing is pending.
*/
exc->pending_sqe = sqe;
/**
* Run through the queue until the last item
* and take not of it
*/
while (sqe != NULL) {
last_sqe = sqe;
sqe = rtio_spsc_next(r->sq, sqe);
}
/* Note the last sqe for the next submit call */
exc->last_sqe = last_sqe;
/* Resume all suspended tasks */
conex_resume(r, exc);
k_spin_unlock(&exc->lock, key);
return 0;
}
/**
* @brief Callback from an iodev describing success
*/
void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
struct rtio_sqe *next_sqe;
k_spinlock_key_t key;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
/* Interrupt may occur in spsc_acquire, breaking the contract
* so spin around it effectively preventing another interrupt on
* this core, and another core trying to concurrently work in here.
*
* This can and should be broken up into a few sections with a try
* lock around the sweep and resume.
*/
key = k_spin_lock(&exc->lock);
rtio_cqe_submit(r, result, sqe->userdata);
/* Determine the task id : O(n) */
uint16_t task_id = conex_task_id(exc, sqe);
if (sqe->flags & RTIO_SQE_CHAINED) {
next_sqe = rtio_spsc_next(r->sq, sqe);
rtio_iodev_submit(next_sqe, r);
exc->task_cur[task_id] = next_sqe;
} else {
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
}
/* Sweep up unused SQEs and tasks, retry suspended tasks */
/* TODO Use a try lock here and don't bother doing it if we are already
* doing it elsewhere
*/
conex_sweep_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}
/**
* @brief Callback from an iodev describing error
*/
void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
struct rtio_sqe *nsqe;
k_spinlock_key_t key;
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
/* Another interrupt (and sqe complete) may occur in spsc_acquire,
* breaking the contract so spin around it effectively preventing another
* interrupt on this core, and another core trying to concurrently work
* in here.
*
* This can and should be broken up into a few sections with a try
* lock around the sweep and resume.
*/
key = k_spin_lock(&exc->lock);
rtio_cqe_submit(r, result, sqe->userdata);
/* Determine the task id : O(n) */
uint16_t task_id = conex_task_id(exc, sqe);
/* Fail the remaining sqe's in the chain */
if (sqe->flags & RTIO_SQE_CHAINED) {
nsqe = rtio_spsc_next(r->sq, sqe);
while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) {
rtio_cqe_submit(r, -ECANCELED, nsqe->userdata);
nsqe = rtio_spsc_next(r->sq, nsqe);
}
}
/* Task is complete (failed) */
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
conex_sweep_resume(r, exc);
k_spin_unlock(&exc->lock, key);
}

View file

@ -30,7 +30,6 @@ int rtio_simple_submit(struct rtio *r)
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
if (sqe != NULL) {
LOG_DBG("Calling iodev submit");
rtio_iodev_submit(sqe, r);
}

View file

@ -11,6 +11,7 @@
#include <zephyr/rtio/rtio_spsc.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_executor_simple.h>
#include <zephyr/rtio/rtio_executor_concurrent.h>
#include "rtio_iodev_test.h"
@ -231,8 +232,12 @@ void test_spsc_threaded(void)
}
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec);
RTIO_DEFINE(r_simple, (struct rtio_executor *)&simple_exec, 4, 4);
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp);
RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4);
RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1);
RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4);
struct rtio_iodev_test iodev_test_simple;
/**
@ -241,7 +246,7 @@ struct rtio_iodev_test iodev_test_simple;
* Ensures that we can setup an RTIO context, enqueue a request, and receive
* a completion event.
*/
void test_rtio_simple(void)
void test_rtio_simple_(struct rtio *r)
{
int res;
uintptr_t userdata[2] = {0, 1};
@ -251,23 +256,35 @@ void test_rtio_simple(void)
rtio_iodev_test_init(&iodev_test_simple);
TC_PRINT("setting up single no-op\n");
sqe = rtio_spsc_acquire(r_simple.sq);
sqe = rtio_spsc_acquire(r->sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]);
TC_PRINT("submit with wait\n");
res = rtio_submit(&r_simple, 1);
res = rtio_submit(r, 1);
zassert_ok(res, "Should return ok from rtio_execute");
cqe = rtio_spsc_consume(r_simple.cq);
cqe = rtio_spsc_consume(r->cq);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");
zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back");
rtio_spsc_release(r_simple.cq);
rtio_spsc_release(r->cq);
}
RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec);
RTIO_DEFINE(r_chain, (struct rtio_executor *)&chain_exec, 4, 4);
void test_rtio_simple(void)
{
TC_PRINT("rtio simple simple\n");
test_rtio_simple_(&r_simple_simp);
TC_PRINT("rtio simple concurrent\n");
test_rtio_simple_(&r_simple_con);
}
RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp);
RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4);
RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1);
RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4);
struct rtio_iodev_test iodev_test_chain[2];
/**
@ -277,60 +294,72 @@ struct rtio_iodev_test iodev_test_chain[2];
* and receive completion events in the correct order given the chained
* flag and multiple devices where serialization isn't guaranteed.
*/
void test_rtio_chain(void)
void test_rtio_chain_(struct rtio *r)
{
int res;
uintptr_t userdata[4] = {0, 1, 2, 3};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_chain[i]);
}
for (int i = 0; i < 4; i++) {
sqe = rtio_spsc_acquire(r_chain.sq);
sqe = rtio_spsc_acquire(r->sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_chain[i % 2],
&userdata[i]);
sqe->flags |= RTIO_SQE_CHAINED;
}
res = rtio_submit(&r_chain, 4);
/* Clear the last one */
sqe->flags = 0;
res = rtio_submit(r, 4);
zassert_ok(res, "Should return ok from rtio_execute");
zassert_equal(rtio_spsc_consumable(r_chain.cq), 4, "Should have 4 pending completions");
zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions");
for (int i = 0; i < 4; i++) {
TC_PRINT("consume %d\n", i);
cqe = rtio_spsc_consume(r_chain.cq);
cqe = rtio_spsc_consume(r->cq);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");
zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions");
rtio_spsc_release(r_chain.cq);
rtio_spsc_release(r->cq);
}
}
RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec);
RTIO_DEFINE(r_multi, (struct rtio_executor *)&multi_exec, 4, 4);
void test_rtio_chain(void)
{
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_chain[i]);
}
TC_PRINT("rtio chain simple\n");
test_rtio_chain_(&r_chain_simp);
TC_PRINT("rtio chain concurrent\n");
test_rtio_chain_(&r_chain_con);
}
RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp);
RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4);
RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2);
RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4);
struct rtio_iodev_test iodev_test_multi[2];
/**
* @brief Test multiple asynchronous chains against one iodev
*/
void test_rtio_multiple_chains(void)
void test_rtio_multiple_chains_(struct rtio *r)
{
int res;
uintptr_t userdata[4] = {0, 1, 2, 3};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_multi[i]);
}
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 2; j++) {
sqe = rtio_spsc_acquire(r_multi.sq);
sqe = rtio_spsc_acquire(r->sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_multi[i],
(void *)userdata[i*2 + j]);
@ -343,7 +372,7 @@ void test_rtio_multiple_chains(void)
}
TC_PRINT("calling submit from test case\n");
res = rtio_submit(&r_multi, 0);
res = rtio_submit(r, 0);
zassert_ok(res, "Should return ok from rtio_execute");
bool seen[4] = { 0 };
@ -351,11 +380,11 @@ void test_rtio_multiple_chains(void)
TC_PRINT("waiting for 4 completions\n");
for (int i = 0; i < 4; i++) {
TC_PRINT("waiting on completion %d\n", i);
cqe = rtio_spsc_consume(r_multi.cq);
cqe = rtio_spsc_consume(r->cq);
while (cqe == NULL) {
k_sleep(K_MSEC(1));
cqe = rtio_spsc_consume(r_multi.cq);
cqe = rtio_spsc_consume(r->cq);
}
zassert_not_null(cqe, "Expected a valid cqe");
@ -369,12 +398,25 @@ void test_rtio_multiple_chains(void)
if (seen[3]) {
zassert_true(seen[2], "Should see 2 before 3");
}
rtio_spsc_release(r_multi.cq);
rtio_spsc_release(r->cq);
}
}
void test_rtio_multiple_chains(void)
{
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_multi[i]);
}
TC_PRINT("rtio multiple simple\n");
test_rtio_multiple_chains_(&r_multi_simp);
TC_PRINT("rtio_multiple concurrent\n");
test_rtio_multiple_chains_(&r_multi_con);
}
void test_main(void)
{
TC_PRINT("imxrt1010 RTIO\n");
ztest_test_suite(rtio_spsc_test,
ztest_1cpu_unit_test(test_produce_consume_size1),
ztest_1cpu_unit_test(test_produce_consume_wrap_around),