rtio: Callback chaining and testing

Callbacks were a bit neglected in terms of test coverage, especially
when used in chains. It was clear from the code that chained callbacks
may not actually work, and callback ordering then was hard to verify.
Test callbacks chained to transactions work as expected.

The test iodev had built up some cruft over time and in the process
showed a few bugs once callback chaining was fixed so the test iodev now
better matches typical iodev implementations at this point.

Cancellation testing now includes an added case for cancelling a the
second submission in the chain prior to calling submit noting that no
completions notifications should be given back for those.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
This commit is contained in:
Tom Burdick 2024-05-14 11:39:48 -05:00 committed by Henrik Brix Andersen
commit 054f453ea7
3 changed files with 224 additions and 119 deletions

View file

@ -10,24 +10,6 @@
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
/**
* @brief Submit to an iodev a submission to work on
*
* Should be called by the executor when it wishes to submit work
* to an iodev.
*
* @param iodev_sqe Submission to work on
*/
static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
{
if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
/* Canceled */
rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
return;
}
iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
}
/**
* @brief Executor handled submissions
*/
@ -45,6 +27,30 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
}
}
/**
* @brief Submit to an iodev a submission to work on
*
* Should be called by the executor when it wishes to submit work
* to an iodev.
*
* @param iodev_sqe Submission to work on
*/
static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
{
if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
return;
}
/* No iodev means its an executor specific operation */
if (iodev_sqe->sqe.iodev == NULL) {
rtio_executor_op(iodev_sqe);
return;
}
iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
}
/**
* @brief Submit operations in the queue to iodevs
*
@ -54,46 +60,52 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
*/
void rtio_executor_submit(struct rtio *r)
{
const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq);
while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
uint16_t canceled_mask = iodev_sqe->sqe.flags & RTIO_SQE_CANCELED;
/* If this submission was cancelled before submit, then generate no response */
if (iodev_sqe->sqe.flags & RTIO_SQE_CANCELED) {
iodev_sqe->sqe.flags |= cancel_no_response;
}
iodev_sqe->r = r;
if (iodev_sqe->sqe.iodev == NULL) {
rtio_executor_op(iodev_sqe);
} else {
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
/* Link up transaction or queue list if needed */
while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
/* Link up transaction or queue list if needed */
while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
#ifdef CONFIG_ASSERT
bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
__ASSERT(transaction != chained,
"Expected chained or transaction flag, not both");
__ASSERT(transaction != chained,
"Expected chained or transaction flag, not both");
#endif
node = rtio_mpsc_pop(&iodev_sqe->r->sq);
next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
next->sqe.flags |= canceled_mask;
curr->next = next;
curr = next;
curr->r = r;
node = rtio_mpsc_pop(&iodev_sqe->r->sq);
next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
__ASSERT(
curr != NULL,
"Expected a valid sqe following transaction or chain flag");
/* If the current submission was cancelled before submit,
* then cancel the next one and generate no response
*/
if (curr->sqe.flags & RTIO_SQE_CANCELED) {
next->sqe.flags |= cancel_no_response;
}
curr->next = NULL;
curr->next = next;
curr = next;
curr->r = r;
rtio_iodev_submit(iodev_sqe);
__ASSERT(
curr != NULL,
"Expected a valid sqe following transaction or chain flag");
}
curr->next = NULL;
curr->r = r;
rtio_iodev_submit(iodev_sqe);
node = rtio_mpsc_pop(&r->sq);
}
}
@ -160,7 +172,7 @@ static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int resu
}
} while (sqe_flags & RTIO_SQE_TRANSACTION);
/* Curr should now be the last sqe in the transaction if that is what completed */
/* curr should now be the last sqe in the transaction if that is what completed */
if (sqe_flags & RTIO_SQE_CHAINED) {
rtio_iodev_submit(curr);
}

View file

@ -16,7 +16,10 @@ struct rtio_iodev_test_data {
/* k_timer for an asynchronous task */
struct k_timer timer;
/* Currently executing sqe */
/* Queue of requests */
struct rtio_mpsc io_q;
/* Currently executing transaction */
struct rtio_iodev_sqe *txn_head;
struct rtio_iodev_sqe *txn_curr;
@ -27,86 +30,76 @@ struct rtio_iodev_test_data {
struct k_spinlock lock;
};
static void rtio_iodev_test_next(struct rtio_iodev *iodev)
static void rtio_iodev_test_next(struct rtio_iodev_test_data *data, bool completion)
{
struct rtio_iodev_test_data *data = iodev->data;
/* The next section must be serialized to ensure single consumer semantics */
k_spinlock_key_t key = k_spin_lock(&data->lock);
if (data->txn_head != NULL) {
/* Already working on something, bail early */
if (!completion && data->txn_head != NULL) {
goto out;
}
struct rtio_mpsc_node *next = rtio_mpsc_pop(&iodev->iodev_sq);
struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->io_q);
if (next != NULL) {
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
TC_PRINT("next task in queue %p\n", (void *)next_sqe);
data->txn_head = next_sqe;
data->txn_curr = next_sqe;
k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT);
} else {
TC_PRINT("no more tasks in the queue\n");
/* Nothing left to do, cleanup */
if (next == NULL) {
data->txn_head = NULL;
data->txn_curr = NULL;
goto out;
}
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
data->txn_head = next_sqe;
data->txn_curr = next_sqe;
k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT);
out:
k_spin_unlock(&data->lock, key);
}
static void rtio_iodev_timer_fn(struct k_timer *tm)
static void rtio_iodev_test_complete(struct rtio_iodev_test_data *data, int status)
{
static struct rtio_iodev_sqe *last_iodev_sqe;
static int consecutive_sqes;
struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer);
struct rtio_iodev_sqe *iodev_sqe = data->txn_curr;
struct rtio_iodev *iodev = (struct rtio_iodev *)data->txn_head->sqe.iodev;
if (iodev_sqe == last_iodev_sqe) {
consecutive_sqes++;
} else {
consecutive_sqes = 0;
}
last_iodev_sqe = iodev_sqe;
if (iodev_sqe->sqe.op == RTIO_OP_RX) {
uint8_t *buf;
uint32_t buf_len;
int rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len);
if (rc != 0) {
iodev_sqe = data->txn_head;
data->txn_head = NULL;
data->txn_curr = NULL;
rtio_iodev_sqe_err(iodev_sqe, rc);
rtio_iodev_test_next(iodev);
return;
}
for (int i = 0; i < 16; ++i) {
buf[i] = ((uint8_t *)iodev_sqe->sqe.userdata)[i];
}
if (status < 0) {
rtio_iodev_sqe_err(data->txn_head, status);
rtio_iodev_test_next(data, true);
}
if (iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION) {
data->txn_curr = rtio_txn_next(data->txn_curr);
TC_PRINT("iodev_sqe %p marked transaction, next %p\n", iodev_sqe, data->txn_curr);
k_timer_start(tm, K_MSEC(10), K_NO_WAIT);
data->txn_curr = rtio_txn_next(data->txn_curr);
if (data->txn_curr) {
k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT);
return;
}
iodev_sqe = data->txn_head;
data->txn_head = NULL;
data->txn_curr = NULL;
rtio_iodev_test_next(iodev);
if (consecutive_sqes == 0) {
rtio_iodev_sqe_ok(iodev_sqe, 0);
} else {
rtio_iodev_sqe_err(iodev_sqe, consecutive_sqes);
rtio_iodev_sqe_ok(data->txn_head, status);
rtio_iodev_test_next(data, true);
}
static void rtio_iodev_timer_fn(struct k_timer *tm)
{
struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer);
struct rtio_iodev_sqe *iodev_sqe = data->txn_curr;
uint8_t *buf;
uint32_t buf_len;
int rc;
switch (iodev_sqe->sqe.op) {
case RTIO_OP_NOP:
rtio_iodev_test_complete(data, 0);
break;
case RTIO_OP_RX:
rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len);
if (rc != 0) {
rtio_iodev_test_complete(data, rc);
return;
}
/* For reads the test device copies from the given userdata */
memcpy(buf, ((uint8_t *)iodev_sqe->sqe.userdata), 16);
rtio_iodev_test_complete(data, 0);
break;
default:
rtio_iodev_test_complete(data, -ENOTSUP);
}
}
@ -117,10 +110,10 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe)
atomic_inc(&data->submit_count);
/* The only safe operation is enqueuing */
rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q);
/* The only safe operation is enqueuing without a lock */
rtio_mpsc_push(&data->io_q, &iodev_sqe->q);
rtio_iodev_test_next(iodev);
rtio_iodev_test_next(data, false);
}
const struct rtio_iodev_api rtio_iodev_test_api = {
@ -131,7 +124,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test)
{
struct rtio_iodev_test_data *data = test->data;
rtio_mpsc_init(&test->iodev_sq);
rtio_mpsc_init(&data->io_q);
data->txn_head = NULL;
data->txn_curr = NULL;
k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL);

View file

@ -365,24 +365,46 @@ static void test_rtio_chain_cancel_(struct rtio *r)
struct rtio_sqe *handle;
/* Prepare the chain */
TC_PRINT("1\n");
k_msleep(20);
rtio_sqe_prep_nop(&sqe[0], (struct rtio_iodev *)&iodev_test_simple, NULL);
rtio_sqe_prep_nop(&sqe[1], (struct rtio_iodev *)&iodev_test_simple, NULL);
sqe[0].flags |= RTIO_SQE_CHAINED;
/* Copy the chain */
TC_PRINT("2\n");
k_msleep(20);
rtio_sqe_copy_in_get_handles(r, sqe, &handle, 2);
TC_PRINT("3\n");
k_msleep(20);
rtio_sqe_cancel(handle);
TC_PRINT("Submitting 2 to RTIO\n");
k_msleep(20);
rtio_submit(r, 0);
/* Check that we don't get a CQE */
/* Check that we don't get cancelled completion notifications */
zassert_equal(0, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15)));
/* Check that the SQE pool is empty by filling it all the way */
for (int i = 0; i < SQE_POOL_SIZE; ++i) {
rtio_sqe_prep_nop(&sqe[i], (struct rtio_iodev *)&iodev_test_simple, NULL);
}
zassert_ok(rtio_sqe_copy_in(r, sqe, SQE_POOL_SIZE));
/* Since there's no good way to just reset the RTIO context, wait for the nops to finish */
rtio_submit(r, SQE_POOL_SIZE);
for (int i = 0; i < SQE_POOL_SIZE; ++i) {
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER));
}
/* Try cancelling the middle sqe in a chain */
rtio_sqe_prep_nop(&sqe[0], (struct rtio_iodev *)&iodev_test_simple, NULL);
rtio_sqe_prep_nop(&sqe[1], (struct rtio_iodev *)&iodev_test_simple, NULL);
rtio_sqe_prep_nop(&sqe[2], (struct rtio_iodev *)&iodev_test_simple, NULL);
sqe[0].flags |= RTIO_SQE_CHAINED;
sqe[1].flags |= RTIO_SQE_CHAINED | RTIO_SQE_CANCELED;
/* Copy in the first non cancelled sqe */
rtio_sqe_copy_in_get_handles(r, sqe, &handle, 3);
rtio_submit(r, 1);
/* Check that we get one completion no cancellation notifications */
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15)));
/* Check that we get no more completions for the cancelled submissions */
zassert_equal(0, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15)));
/* Check that the SQE pool is empty by filling it all the way */
@ -488,7 +510,7 @@ static inline void test_rtio_simple_multishot_(struct rtio *r, int idx)
TC_PRINT("Waiting for next cqe\n");
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER));
zassert_equal(1, cqe.result, "Result should be ok but got %d", cqe.result);
zassert_ok(cqe.result, "Result should be ok but got %d", cqe.result);
zassert_equal_ptr(cqe.userdata, mempool_data, "Expected userdata back");
rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len);
rtio_release_buffer(r, buffer, buffer_len);
@ -538,9 +560,7 @@ void test_rtio_transaction_(struct rtio *r)
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, NULL,
&userdata[0]);
rtio_sqe_prep_nop(sqe, NULL, &userdata[0]);
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
@ -632,6 +652,86 @@ ZTEST(rtio_api, test_rtio_throughput)
_test_rtio_throughput(&r_throughput);
}
RTIO_DEFINE(r_callback_chaining, SQE_POOL_SIZE, CQE_POOL_SIZE);
RTIO_IODEV_TEST_DEFINE(iodev_test_callback_chaining0);
/**
* Callback for testing with
*/
void rtio_callback_chaining_cb(struct rtio *r, const struct rtio_sqe *sqe, void *arg0)
{
TC_PRINT("chaining callback with userdata %p\n", arg0);
}
/**
* @brief Test callback chaining requests
*
* Ensures that we can setup an RTIO context, enqueue a transaction of requests,
* receive completion events, and catch a callback at the end in the correct
* order
*/
void test_rtio_callback_chaining_(struct rtio *r)
{
int res;
int32_t userdata[4] = {0, 1, 2, 3};
int32_t ordering[4] = { -1, -1, -1, -1};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
uintptr_t cq_count = atomic_get(&r->cq_count);
rtio_iodev_test_init(&iodev_test_callback_chaining0);
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_callback(sqe, &rtio_callback_chaining_cb, sqe, &userdata[0]);
sqe->flags |= RTIO_SQE_CHAINED;
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, &iodev_test_callback_chaining0, &userdata[1]);
sqe->flags |= RTIO_SQE_TRANSACTION;
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, &iodev_test_callback_chaining0, &userdata[2]);
sqe->flags |= RTIO_SQE_CHAINED;
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_callback(sqe, &rtio_callback_chaining_cb, sqe, &userdata[3]);
TC_PRINT("submitting\n");
res = rtio_submit(r, 4);
TC_PRINT("checking cq, completions available, count at start %lu, current count %lu\n",
cq_count, atomic_get(&r->cq_count));
zassert_ok(res, "Should return ok from rtio_execute");
zassert_equal(atomic_get(&r->cq_count) - cq_count, 4, "Should have 4 pending completions");
for (int i = 0; i < 4; i++) {
TC_PRINT("consume %d\n", i);
cqe = rtio_cqe_consume(r);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");
int32_t idx = *(int32_t *)cqe->userdata;
TC_PRINT("userdata is %p, value %d\n", cqe->userdata, idx);
ordering[idx] = i;
rtio_cqe_release(r, cqe);
}
for (int i = 0; i < 4; i++) {
zassert_equal(ordering[i], i,
"Execpted ordering of completions to match submissions");
}
}
ZTEST(rtio_api, test_rtio_callback_chaining)
{
test_rtio_callback_chaining_(&r_callback_chaining);
}
static void *rtio_api_setup(void)
{