diff --git a/include/zephyr/rtio/rtio_executor_concurrent.h b/include/zephyr/rtio/rtio_executor_concurrent.h new file mode 100644 index 00000000000..1b9de4b4da0 --- /dev/null +++ b/include/zephyr/rtio/rtio_executor_concurrent.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_ +#define ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief RTIO Concurrent Executor + * + * Provides a concurrent executor with a pointer overhead per task and a + * 2 word overhead over the simple executor to know the order of tasks (fifo). + * + * @defgroup rtio_executor_concurrent RTIO concurrent Executor + * @ingroup rtio + * @{ + */ + + +/** + * @brief Submit to the concurrent executor + * + * @param r RTIO context to submit + * + * @retval 0 always succeeds + */ +int rtio_concurrent_submit(struct rtio *r); + +/** + * @brief Report a SQE has completed successfully + * + * @param r RTIO context to use + * @param sqe RTIO SQE to report success + * @param result Result of the SQE + */ +void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result); + +/** + * @brief Report a SQE has completed with error + * + * @param r RTIO context to use + * @param sqe RTIO SQE to report success + * @param result Result of the SQE + */ +void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result); + +/** + * @brief Concurrent Executor + * + * Notably all values are effectively owned by each task with the exception + * of task_in and task_out. + */ +struct rtio_concurrent_executor { + struct rtio_executor ctx; + + /* Lock around the queues */ + struct k_spinlock lock; + + /* Task ring position and count */ + uint16_t task_in, task_out, task_mask; + + /* First pending sqe to start when a task becomes available */ + struct rtio_sqe *pending_sqe; + + /* Last sqe seen from the most recent submit */ + struct rtio_sqe *last_sqe; + + /* Array of task statuses */ + uint8_t *task_status; + + /* Array of struct rtio_sqe *'s one per task' */ + struct rtio_sqe **task_cur; +}; + +/** + * @cond INTERNAL_HIDDEN + */ +static const struct rtio_executor_api z_rtio_concurrent_api = { + .submit = rtio_concurrent_submit, + .ok = rtio_concurrent_ok, + .err = rtio_concurrent_err +}; + +/** + * @endcond INTERNAL_HIDDEN + */ + + +/** + * @brief Statically define and initialie a concurrent executor + * + * @param name Symbol name, must be unique in the context in which its used + * @param concurrency Allowed concurrency (number of concurrent tasks). + */ +#define RTIO_EXECUTOR_CONCURRENT_DEFINE(name, concurrency) \ + static struct rtio_sqe *_task_cur_##name[(concurrency)]; \ + uint8_t _task_status_##name[(concurrency)]; \ + static struct rtio_concurrent_executor name = { \ + .ctx = { .api = &z_rtio_concurrent_api }, \ + .task_in = 0, \ + .task_out = 0, \ + .task_mask = (concurrency)-1, \ + .pending_sqe = NULL, \ + .last_sqe = NULL, \ + .task_status = _task_status_##name, \ + .task_cur = _task_cur_##name, \ + }; + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + + +#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_ */ diff --git a/include/zephyr/rtio/rtio_spsc.h b/include/zephyr/rtio/rtio_spsc.h index 8714aec36dc..bf6dc0866b9 100644 --- a/include/zephyr/rtio/rtio_spsc.h +++ b/include/zephyr/rtio/rtio_spsc.h @@ -109,9 +109,8 @@ struct rtio_spsc { * @param type Type stored in the spsc * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) */ -#define RTIO_SPSC_DEFINE(name, type, sz) \ - RTIO_SPSC_DECLARE(name, type, sz) name = \ - RTIO_SPSC_INITIALIZER(name, type, sz); +#define RTIO_SPSC_DEFINE(name, type, sz) \ + RTIO_SPSC_DECLARE(name, type, sz) name = RTIO_SPSC_INITIALIZER(name, type, sz); /** * @brief Size of the SPSC queue @@ -127,7 +126,7 @@ struct rtio_spsc { * @param spsc SPSC reference * @param i Value to modulo to the size of the spsc */ -#define z_rtio_spsc_mask(spsc, i) (i & (spsc)->_spsc.mask) +#define z_rtio_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask) /** * @brief Initialize/reset a spsc such that its empty @@ -154,9 +153,8 @@ struct rtio_spsc { */ #define rtio_spsc_acquire(spsc) \ ({ \ - uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \ - bool acq = \ - (idx - (uint32_t)atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \ + unsigned long idx = atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \ + bool acq = (idx - atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \ if (acq) { \ (spsc)->_spsc.acquire += 1; \ } \ @@ -170,12 +168,12 @@ struct rtio_spsc { * * @param spsc SPSC to produce the previously acquired element or do nothing */ -#define rtio_spsc_produce(spsc) \ - ({ \ - if ((spsc)->_spsc.acquire > 0) { \ - (spsc)->_spsc.acquire -= 1; \ - atomic_add(&(spsc)->_spsc.in, 1); \ - } \ +#define rtio_spsc_produce(spsc) \ + ({ \ + if ((spsc)->_spsc.acquire > 0) { \ + (spsc)->_spsc.acquire -= 1; \ + atomic_add(&(spsc)->_spsc.in, 1); \ + } \ }) /** @@ -186,27 +184,13 @@ struct rtio_spsc { * * @param spsc SPSC to produce all previously acquired elements or do nothing */ -#define rtio_spsc_produce_all(spsc) \ - ({ \ - if ((spsc)->_spsc.acquire > 0) { \ - unsigned long acquired = (spsc)->_spsc.acquire; \ - (spsc)->_spsc.acquire = 0; \ - atomic_add(&(spsc)->_spsc.in, acquired); \ - } \ - }) - -/** - * @brief Peek at an element from the spsc - * - * @param spsc Spsc to peek into - * - * @return Pointer to element or null if no consumable elements left - */ -#define rtio_spsc_peek(spsc) \ +#define rtio_spsc_produce_all(spsc) \ ({ \ - uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ - bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \ - has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + if ((spsc)->_spsc.acquire > 0) { \ + unsigned long acquired = (spsc)->_spsc.acquire; \ + (spsc)->_spsc.acquire = 0; \ + atomic_add(&(spsc)->_spsc.in, acquired); \ + } \ }) /** @@ -216,14 +200,14 @@ struct rtio_spsc { * * @return Pointer to element or null if no consumable elements left */ -#define rtio_spsc_consume(spsc) \ - ({ \ - uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ - bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \ - if (has_consumable) { \ - (spsc)->_spsc.consume += 1; \ - } \ - has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ +#define rtio_spsc_consume(spsc) \ + ({ \ + unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in)); \ + if (has_consumable) { \ + (spsc)->_spsc.consume += 1; \ + } \ + has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ }) /** @@ -231,12 +215,12 @@ struct rtio_spsc { * * @param spsc SPSC to release consumed element or do nothing */ -#define rtio_spsc_release(spsc) \ - ({ \ - if ((spsc)->_spsc.consume > 0) { \ - (spsc)->_spsc.consume -= 1; \ - atomic_add(&(spsc)->_spsc.out, 1); \ - } \ +#define rtio_spsc_release(spsc) \ + ({ \ + if ((spsc)->_spsc.consume > 0) { \ + (spsc)->_spsc.consume -= 1; \ + atomic_add(&(spsc)->_spsc.out, 1); \ + } \ }) /** @@ -244,12 +228,54 @@ struct rtio_spsc { * * @param spsc SPSC to get item count for */ -#define rtio_spsc_consumable(spsc) \ - ({ \ - (spsc)->_spsc.in \ - - (spsc)->_spsc.out \ - - (spsc)->_spsc.consume; \ - }) \ +#define rtio_spsc_consumable(spsc) \ + ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; }) + +/** + * @brief Peek at the first available item in queue + * + * @param spsc Spsc to peek into + * + * @return Pointer to element or null if no consumable elements left + */ +#define rtio_spsc_peek(spsc) \ + ({ \ + unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in)); \ + has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + }) + +/** + * @brief Peek at the next item in the queue from a given one + * + * + * @param spsc SPSC to peek at + * @param item Pointer to an item in the queue + * + * @return Pointer to element or null if none left + */ +#define rtio_spsc_next(spsc, item) \ + ({ \ + unsigned long idx = ((item) - (spsc)->buffer); \ + bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) != \ + (z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.in))); \ + has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL; \ + }) + +/** + * @brief Get the previous item in the queue from a given one + * + * @param spsc SPSC to peek at + * @param item Pointer to an item in the queue + * + * @return Pointer to element or null if none left + */ +#define rtio_spsc_prev(spsc, item) \ + ({ \ + unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \ + bool has_prev = idx != z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.out)); \ + has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL; \ + }) /** * @} diff --git a/subsys/rtio/CMakeLists.txt b/subsys/rtio/CMakeLists.txt index d2732c1ecb5..e7494a3f65f 100644 --- a/subsys/rtio/CMakeLists.txt +++ b/subsys/rtio/CMakeLists.txt @@ -11,4 +11,9 @@ if(CONFIG_RTIO) rtio_executor_simple.c ) + zephyr_library_sources_ifdef( + CONFIG_RTIO_EXECUTOR_CONCURRENT + rtio_executor_concurrent.c + ) + endif() diff --git a/subsys/rtio/Kconfig b/subsys/rtio/Kconfig index 7795128e411..589d6672b5e 100644 --- a/subsys/rtio/Kconfig +++ b/subsys/rtio/Kconfig @@ -12,7 +12,14 @@ config RTIO_EXECUTOR_SIMPLE help An simple RTIO executor that will execute a queue of requested I/O operations as if they are a single chain of submission queue entries. This - does not support concurrent chains. + does not support concurrent chains or submissions. + +config RTIO_EXECUTOR_CONCURRENT + bool "A low cost concurrent executor for RTIO" + default y + help + A low memory cost RTIO executor that will execute a queue of requested I/O + with a fixed amount of concurrency using minimal memory overhead. config RTIO_SUBMIT_SEM bool "Use a semaphore when waiting for completions in rtio_submit" diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c new file mode 100644 index 00000000000..d17de11dc88 --- /dev/null +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "spinlock.h" +#include +#include +#include + +#include +LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL); + +#define CONEX_TASK_COMPLETE BIT(0) +#define CONEX_TASK_SUSPENDED BIT(1) + + +/** + * @file + * @brief Concurrent RTIO Executor + * + * The concurrent executor provides fixed amounts of concurrency + * using minimal overhead but assumes a small number of concurrent tasks. + * + * Many of the task lookup and management functions in here are O(N) over N + * tasks. This is fine when the task set is *small*. Task lookup could be + * improved in the future with a binary search at the expense of code size. + * + * The assumption here is that perhaps only 8-16 concurrent tasks are likely + * such that simple short for loops over task array are reasonably fast. + * + * A maximum of 65K submissions queue entries are possible. + */ + +/** + * check if there is a free task available + */ +static bool conex_task_free(struct rtio_concurrent_executor *exc) +{ + return (exc->task_in - exc->task_out) < (exc->task_mask + 1); +} + +/** + * get the next free available task index + */ +static uint16_t conex_task_next(struct rtio_concurrent_executor *exc) +{ + uint16_t task_id = exc->task_in; + + exc->task_in++; + return task_id; +} + +static uint16_t conex_task_id(struct rtio_concurrent_executor *exc, + const struct rtio_sqe *sqe) +{ + uint16_t task_id = exc->task_out; + + for (; task_id < exc->task_in; task_id++) { + if (exc->task_cur[task_id & exc->task_mask] == sqe) { + break; + } + } + return task_id; +} + +static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); + + while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) { + rtio_spsc_release(r->sq); + sqe = rtio_spsc_consume(r->sq); + } + + rtio_spsc_release(r->sq); +} + +static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + /* In order sweep up */ + for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { + if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) { + LOG_INF("sweeping oldest task %d", task_id); + conex_sweep_task(r, exc); + exc->task_out++; + } else { + break; + } + } +} + +static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + /* In order resume tasks */ + for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { + if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { + LOG_INF("resuming suspended task %d", task_id); + exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED; + rtio_iodev_submit(exc->task_cur[task_id], r); + } + } +} + +static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + conex_sweep(r, exc); + conex_resume(r, exc); +} + +/** + * @brief Submit submissions to concurrent executor + * + * @param r RTIO context + * + * @retval 0 Always succeeds + */ +int rtio_concurrent_submit(struct rtio *r) +{ + + LOG_INF("submit"); + + struct rtio_concurrent_executor *exc = + (struct rtio_concurrent_executor *)r->executor; + struct rtio_sqe *sqe; + struct rtio_sqe *last_sqe; + k_spinlock_key_t key; + + key = k_spin_lock(&exc->lock); + + /* If never submitted before peek at the first item + * otherwise start back up where the last submit call + * left off + */ + if (exc->last_sqe == NULL) { + sqe = rtio_spsc_peek(r->sq); + } else { + /* Pickup from last submit call */ + sqe = rtio_spsc_next(r->sq, exc->last_sqe); + } + + last_sqe = sqe; + while (sqe != NULL && conex_task_free(exc)) { + LOG_INF("head SQE in chain %p", sqe); + + /* Get the next task id if one exists */ + uint16_t task_idx = conex_task_next(exc); + + LOG_INF("setting up task %d", task_idx); + + /* Setup task (yes this is it) */ + exc->task_cur[task_idx] = sqe; + exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; + + LOG_INF("submitted sqe %p", sqe); + /* Go to the next sqe not in the current chain */ + while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) { + sqe = rtio_spsc_next(r->sq, sqe); + } + + LOG_INF("tail SQE in chain %p", sqe); + + last_sqe = sqe; + + /* SQE is the end of the previous chain */ + sqe = rtio_spsc_next(r->sq, sqe); + } + + /* Out of available pointers, wait til others complete, note the + * first pending submission queue. May be NULL if nothing is pending. + */ + exc->pending_sqe = sqe; + + /** + * Run through the queue until the last item + * and take not of it + */ + while (sqe != NULL) { + last_sqe = sqe; + sqe = rtio_spsc_next(r->sq, sqe); + } + + /* Note the last sqe for the next submit call */ + exc->last_sqe = last_sqe; + + /* Resume all suspended tasks */ + conex_resume(r, exc); + + k_spin_unlock(&exc->lock, key); + + return 0; +} + +/** + * @brief Callback from an iodev describing success + */ +void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + struct rtio_sqe *next_sqe; + k_spinlock_key_t key; + struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; + + /* Interrupt may occur in spsc_acquire, breaking the contract + * so spin around it effectively preventing another interrupt on + * this core, and another core trying to concurrently work in here. + * + * This can and should be broken up into a few sections with a try + * lock around the sweep and resume. + */ + key = k_spin_lock(&exc->lock); + + rtio_cqe_submit(r, result, sqe->userdata); + + /* Determine the task id : O(n) */ + uint16_t task_id = conex_task_id(exc, sqe); + + if (sqe->flags & RTIO_SQE_CHAINED) { + next_sqe = rtio_spsc_next(r->sq, sqe); + + rtio_iodev_submit(next_sqe, r); + + exc->task_cur[task_id] = next_sqe; + } else { + exc->task_status[task_id] |= CONEX_TASK_COMPLETE; + } + + + /* Sweep up unused SQEs and tasks, retry suspended tasks */ + /* TODO Use a try lock here and don't bother doing it if we are already + * doing it elsewhere + */ + conex_sweep_resume(r, exc); + + k_spin_unlock(&exc->lock, key); +} + +/** + * @brief Callback from an iodev describing error + */ +void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + struct rtio_sqe *nsqe; + k_spinlock_key_t key; + struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; + + /* Another interrupt (and sqe complete) may occur in spsc_acquire, + * breaking the contract so spin around it effectively preventing another + * interrupt on this core, and another core trying to concurrently work + * in here. + * + * This can and should be broken up into a few sections with a try + * lock around the sweep and resume. + */ + key = k_spin_lock(&exc->lock); + + rtio_cqe_submit(r, result, sqe->userdata); + + /* Determine the task id : O(n) */ + uint16_t task_id = conex_task_id(exc, sqe); + + + /* Fail the remaining sqe's in the chain */ + if (sqe->flags & RTIO_SQE_CHAINED) { + nsqe = rtio_spsc_next(r->sq, sqe); + while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { + rtio_cqe_submit(r, -ECANCELED, nsqe->userdata); + nsqe = rtio_spsc_next(r->sq, nsqe); + } + } + + /* Task is complete (failed) */ + exc->task_status[task_id] |= CONEX_TASK_COMPLETE; + + conex_sweep_resume(r, exc); + + k_spin_unlock(&exc->lock, key); +} diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c index c06d09d95a9..926a63cdcf0 100644 --- a/subsys/rtio/rtio_executor_simple.c +++ b/subsys/rtio/rtio_executor_simple.c @@ -30,7 +30,6 @@ int rtio_simple_submit(struct rtio *r) struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); if (sqe != NULL) { - LOG_DBG("Calling iodev submit"); rtio_iodev_submit(sqe, r); } diff --git a/tests/subsys/rtio/rtio_api/src/main.c b/tests/subsys/rtio/rtio_api/src/main.c index 9f9977afec0..a532fe9521d 100644 --- a/tests/subsys/rtio/rtio_api/src/main.c +++ b/tests/subsys/rtio/rtio_api/src/main.c @@ -11,6 +11,7 @@ #include #include #include +#include #include "rtio_iodev_test.h" @@ -231,8 +232,12 @@ void test_spsc_threaded(void) } -RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec); -RTIO_DEFINE(r_simple, (struct rtio_executor *)&simple_exec, 4, 4); +RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); +RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); +RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); + struct rtio_iodev_test iodev_test_simple; /** @@ -241,7 +246,7 @@ struct rtio_iodev_test iodev_test_simple; * Ensures that we can setup an RTIO context, enqueue a request, and receive * a completion event. */ -void test_rtio_simple(void) +void test_rtio_simple_(struct rtio *r) { int res; uintptr_t userdata[2] = {0, 1}; @@ -251,23 +256,35 @@ void test_rtio_simple(void) rtio_iodev_test_init(&iodev_test_simple); TC_PRINT("setting up single no-op\n"); - sqe = rtio_spsc_acquire(r_simple.sq); + sqe = rtio_spsc_acquire(r->sq); zassert_not_null(sqe, "Expected a valid sqe"); rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); TC_PRINT("submit with wait\n"); - res = rtio_submit(&r_simple, 1); + res = rtio_submit(r, 1); zassert_ok(res, "Should return ok from rtio_execute"); - cqe = rtio_spsc_consume(r_simple.cq); + cqe = rtio_spsc_consume(r->cq); zassert_not_null(cqe, "Expected a valid cqe"); zassert_ok(cqe->result, "Result should be ok"); zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); - rtio_spsc_release(r_simple.cq); + rtio_spsc_release(r->cq); } -RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec); -RTIO_DEFINE(r_chain, (struct rtio_executor *)&chain_exec, 4, 4); +void test_rtio_simple(void) +{ + TC_PRINT("rtio simple simple\n"); + test_rtio_simple_(&r_simple_simp); + TC_PRINT("rtio simple concurrent\n"); + test_rtio_simple_(&r_simple_con); +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); +RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); +RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); + struct rtio_iodev_test iodev_test_chain[2]; /** @@ -277,60 +294,72 @@ struct rtio_iodev_test iodev_test_chain[2]; * and receive completion events in the correct order given the chained * flag and multiple devices where serialization isn't guaranteed. */ -void test_rtio_chain(void) +void test_rtio_chain_(struct rtio *r) { int res; uintptr_t userdata[4] = {0, 1, 2, 3}; struct rtio_sqe *sqe; struct rtio_cqe *cqe; - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(&iodev_test_chain[i]); - } - for (int i = 0; i < 4; i++) { - sqe = rtio_spsc_acquire(r_chain.sq); + sqe = rtio_spsc_acquire(r->sq); zassert_not_null(sqe, "Expected a valid sqe"); rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_chain[i % 2], &userdata[i]); sqe->flags |= RTIO_SQE_CHAINED; } - res = rtio_submit(&r_chain, 4); + /* Clear the last one */ + sqe->flags = 0; + + res = rtio_submit(r, 4); zassert_ok(res, "Should return ok from rtio_execute"); - zassert_equal(rtio_spsc_consumable(r_chain.cq), 4, "Should have 4 pending completions"); + zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); for (int i = 0; i < 4; i++) { TC_PRINT("consume %d\n", i); - cqe = rtio_spsc_consume(r_chain.cq); + cqe = rtio_spsc_consume(r->cq); zassert_not_null(cqe, "Expected a valid cqe"); zassert_ok(cqe->result, "Result should be ok"); zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); - rtio_spsc_release(r_chain.cq); + rtio_spsc_release(r->cq); } } -RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec); -RTIO_DEFINE(r_multi, (struct rtio_executor *)&multi_exec, 4, 4); +void test_rtio_chain(void) +{ + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(&iodev_test_chain[i]); + } + + TC_PRINT("rtio chain simple\n"); + test_rtio_chain_(&r_chain_simp); + TC_PRINT("rtio chain concurrent\n"); + test_rtio_chain_(&r_chain_con); +} + + +RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); +RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); +RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); + struct rtio_iodev_test iodev_test_multi[2]; /** * @brief Test multiple asynchronous chains against one iodev */ -void test_rtio_multiple_chains(void) +void test_rtio_multiple_chains_(struct rtio *r) { int res; uintptr_t userdata[4] = {0, 1, 2, 3}; struct rtio_sqe *sqe; struct rtio_cqe *cqe; - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(&iodev_test_multi[i]); - } - for (int i = 0; i < 2; i++) { for (int j = 0; j < 2; j++) { - sqe = rtio_spsc_acquire(r_multi.sq); + sqe = rtio_spsc_acquire(r->sq); zassert_not_null(sqe, "Expected a valid sqe"); rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_multi[i], (void *)userdata[i*2 + j]); @@ -343,7 +372,7 @@ void test_rtio_multiple_chains(void) } TC_PRINT("calling submit from test case\n"); - res = rtio_submit(&r_multi, 0); + res = rtio_submit(r, 0); zassert_ok(res, "Should return ok from rtio_execute"); bool seen[4] = { 0 }; @@ -351,11 +380,11 @@ void test_rtio_multiple_chains(void) TC_PRINT("waiting for 4 completions\n"); for (int i = 0; i < 4; i++) { TC_PRINT("waiting on completion %d\n", i); - cqe = rtio_spsc_consume(r_multi.cq); + cqe = rtio_spsc_consume(r->cq); while (cqe == NULL) { k_sleep(K_MSEC(1)); - cqe = rtio_spsc_consume(r_multi.cq); + cqe = rtio_spsc_consume(r->cq); } zassert_not_null(cqe, "Expected a valid cqe"); @@ -369,12 +398,25 @@ void test_rtio_multiple_chains(void) if (seen[3]) { zassert_true(seen[2], "Should see 2 before 3"); } - rtio_spsc_release(r_multi.cq); + rtio_spsc_release(r->cq); } } +void test_rtio_multiple_chains(void) +{ + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(&iodev_test_multi[i]); + } + + TC_PRINT("rtio multiple simple\n"); + test_rtio_multiple_chains_(&r_multi_simp); + TC_PRINT("rtio_multiple concurrent\n"); + test_rtio_multiple_chains_(&r_multi_con); +} + void test_main(void) { + TC_PRINT("imxrt1010 RTIO\n"); ztest_test_suite(rtio_spsc_test, ztest_1cpu_unit_test(test_produce_consume_size1), ztest_1cpu_unit_test(test_produce_consume_wrap_around),