diff --git a/subsys/rtio/rtio_executor.c b/subsys/rtio/rtio_executor.c index 0f84d5aabf4..13fd6a7e4dd 100644 --- a/subsys/rtio/rtio_executor.c +++ b/subsys/rtio/rtio_executor.c @@ -10,24 +10,6 @@ #include LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL); -/** - * @brief Submit to an iodev a submission to work on - * - * Should be called by the executor when it wishes to submit work - * to an iodev. - * - * @param iodev_sqe Submission to work on - */ -static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) -{ - if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) { - /* Canceled */ - rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); - return; - } - iodev_sqe->sqe.iodev->api->submit(iodev_sqe); -} - /** * @brief Executor handled submissions */ @@ -45,6 +27,30 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe) } } +/** + * @brief Submit to an iodev a submission to work on + * + * Should be called by the executor when it wishes to submit work + * to an iodev. + * + * @param iodev_sqe Submission to work on + */ +static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) +{ + if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) { + rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); + return; + } + + /* No iodev means its an executor specific operation */ + if (iodev_sqe->sqe.iodev == NULL) { + rtio_executor_op(iodev_sqe); + return; + } + + iodev_sqe->sqe.iodev->api->submit(iodev_sqe); +} + /** * @brief Submit operations in the queue to iodevs * @@ -54,46 +60,52 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe) */ void rtio_executor_submit(struct rtio *r) { + const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE); struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq); while (node != NULL) { struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); - uint16_t canceled_mask = iodev_sqe->sqe.flags & RTIO_SQE_CANCELED; + /* If this submission was cancelled before submit, then generate no response */ + if (iodev_sqe->sqe.flags & RTIO_SQE_CANCELED) { + iodev_sqe->sqe.flags |= cancel_no_response; + } iodev_sqe->r = r; - if (iodev_sqe->sqe.iodev == NULL) { - rtio_executor_op(iodev_sqe); - } else { - struct rtio_iodev_sqe *curr = iodev_sqe, *next; + struct rtio_iodev_sqe *curr = iodev_sqe, *next; - /* Link up transaction or queue list if needed */ - while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) { + /* Link up transaction or queue list if needed */ + while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) { #ifdef CONFIG_ASSERT - bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION; - bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED; + bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION; + bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED; - __ASSERT(transaction != chained, - "Expected chained or transaction flag, not both"); + __ASSERT(transaction != chained, + "Expected chained or transaction flag, not both"); #endif - node = rtio_mpsc_pop(&iodev_sqe->r->sq); - next = CONTAINER_OF(node, struct rtio_iodev_sqe, q); - next->sqe.flags |= canceled_mask; - curr->next = next; - curr = next; - curr->r = r; + node = rtio_mpsc_pop(&iodev_sqe->r->sq); + next = CONTAINER_OF(node, struct rtio_iodev_sqe, q); - __ASSERT( - curr != NULL, - "Expected a valid sqe following transaction or chain flag"); + /* If the current submission was cancelled before submit, + * then cancel the next one and generate no response + */ + if (curr->sqe.flags & RTIO_SQE_CANCELED) { + next->sqe.flags |= cancel_no_response; } - - curr->next = NULL; + curr->next = next; + curr = next; curr->r = r; - rtio_iodev_submit(iodev_sqe); + __ASSERT( + curr != NULL, + "Expected a valid sqe following transaction or chain flag"); } + curr->next = NULL; + curr->r = r; + + rtio_iodev_submit(iodev_sqe); + node = rtio_mpsc_pop(&r->sq); } } @@ -160,7 +172,7 @@ static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int resu } } while (sqe_flags & RTIO_SQE_TRANSACTION); - /* Curr should now be the last sqe in the transaction if that is what completed */ + /* curr should now be the last sqe in the transaction if that is what completed */ if (sqe_flags & RTIO_SQE_CHAINED) { rtio_iodev_submit(curr); } diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index 67a412d819e..f478ec695cc 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -16,7 +16,10 @@ struct rtio_iodev_test_data { /* k_timer for an asynchronous task */ struct k_timer timer; - /* Currently executing sqe */ + /* Queue of requests */ + struct rtio_mpsc io_q; + + /* Currently executing transaction */ struct rtio_iodev_sqe *txn_head; struct rtio_iodev_sqe *txn_curr; @@ -27,86 +30,76 @@ struct rtio_iodev_test_data { struct k_spinlock lock; }; -static void rtio_iodev_test_next(struct rtio_iodev *iodev) +static void rtio_iodev_test_next(struct rtio_iodev_test_data *data, bool completion) { - struct rtio_iodev_test_data *data = iodev->data; - /* The next section must be serialized to ensure single consumer semantics */ k_spinlock_key_t key = k_spin_lock(&data->lock); - if (data->txn_head != NULL) { + /* Already working on something, bail early */ + if (!completion && data->txn_head != NULL) { goto out; } - struct rtio_mpsc_node *next = rtio_mpsc_pop(&iodev->iodev_sq); + struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->io_q); - if (next != NULL) { - struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); - - TC_PRINT("next task in queue %p\n", (void *)next_sqe); - - data->txn_head = next_sqe; - data->txn_curr = next_sqe; - k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); - } else { - TC_PRINT("no more tasks in the queue\n"); + /* Nothing left to do, cleanup */ + if (next == NULL) { + data->txn_head = NULL; + data->txn_curr = NULL; + goto out; } + struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); + + data->txn_head = next_sqe; + data->txn_curr = next_sqe; + k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + out: k_spin_unlock(&data->lock, key); } -static void rtio_iodev_timer_fn(struct k_timer *tm) +static void rtio_iodev_test_complete(struct rtio_iodev_test_data *data, int status) { - static struct rtio_iodev_sqe *last_iodev_sqe; - static int consecutive_sqes; - - struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); - struct rtio_iodev_sqe *iodev_sqe = data->txn_curr; - struct rtio_iodev *iodev = (struct rtio_iodev *)data->txn_head->sqe.iodev; - - if (iodev_sqe == last_iodev_sqe) { - consecutive_sqes++; - } else { - consecutive_sqes = 0; - } - last_iodev_sqe = iodev_sqe; - - if (iodev_sqe->sqe.op == RTIO_OP_RX) { - uint8_t *buf; - uint32_t buf_len; - - int rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len); - - if (rc != 0) { - iodev_sqe = data->txn_head; - data->txn_head = NULL; - data->txn_curr = NULL; - rtio_iodev_sqe_err(iodev_sqe, rc); - rtio_iodev_test_next(iodev); - return; - } - - for (int i = 0; i < 16; ++i) { - buf[i] = ((uint8_t *)iodev_sqe->sqe.userdata)[i]; - } + if (status < 0) { + rtio_iodev_sqe_err(data->txn_head, status); + rtio_iodev_test_next(data, true); } - if (iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION) { - data->txn_curr = rtio_txn_next(data->txn_curr); - TC_PRINT("iodev_sqe %p marked transaction, next %p\n", iodev_sqe, data->txn_curr); - k_timer_start(tm, K_MSEC(10), K_NO_WAIT); + data->txn_curr = rtio_txn_next(data->txn_curr); + if (data->txn_curr) { + k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); return; } - iodev_sqe = data->txn_head; - data->txn_head = NULL; - data->txn_curr = NULL; - rtio_iodev_test_next(iodev); - if (consecutive_sqes == 0) { - rtio_iodev_sqe_ok(iodev_sqe, 0); - } else { - rtio_iodev_sqe_err(iodev_sqe, consecutive_sqes); + rtio_iodev_sqe_ok(data->txn_head, status); + rtio_iodev_test_next(data, true); +} + +static void rtio_iodev_timer_fn(struct k_timer *tm) +{ + struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); + struct rtio_iodev_sqe *iodev_sqe = data->txn_curr; + uint8_t *buf; + uint32_t buf_len; + int rc; + + switch (iodev_sqe->sqe.op) { + case RTIO_OP_NOP: + rtio_iodev_test_complete(data, 0); + break; + case RTIO_OP_RX: + rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len); + if (rc != 0) { + rtio_iodev_test_complete(data, rc); + return; + } + /* For reads the test device copies from the given userdata */ + memcpy(buf, ((uint8_t *)iodev_sqe->sqe.userdata), 16); + rtio_iodev_test_complete(data, 0); + break; + default: + rtio_iodev_test_complete(data, -ENOTSUP); } } @@ -117,10 +110,10 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) atomic_inc(&data->submit_count); - /* The only safe operation is enqueuing */ - rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); + /* The only safe operation is enqueuing without a lock */ + rtio_mpsc_push(&data->io_q, &iodev_sqe->q); - rtio_iodev_test_next(iodev); + rtio_iodev_test_next(data, false); } const struct rtio_iodev_api rtio_iodev_test_api = { @@ -131,7 +124,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test) { struct rtio_iodev_test_data *data = test->data; - rtio_mpsc_init(&test->iodev_sq); + rtio_mpsc_init(&data->io_q); data->txn_head = NULL; data->txn_curr = NULL; k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index 05fcb724db1..7a92859eba1 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -365,24 +365,46 @@ static void test_rtio_chain_cancel_(struct rtio *r) struct rtio_sqe *handle; /* Prepare the chain */ - TC_PRINT("1\n"); - k_msleep(20); rtio_sqe_prep_nop(&sqe[0], (struct rtio_iodev *)&iodev_test_simple, NULL); rtio_sqe_prep_nop(&sqe[1], (struct rtio_iodev *)&iodev_test_simple, NULL); sqe[0].flags |= RTIO_SQE_CHAINED; /* Copy the chain */ - TC_PRINT("2\n"); - k_msleep(20); rtio_sqe_copy_in_get_handles(r, sqe, &handle, 2); - TC_PRINT("3\n"); - k_msleep(20); rtio_sqe_cancel(handle); - TC_PRINT("Submitting 2 to RTIO\n"); k_msleep(20); rtio_submit(r, 0); - /* Check that we don't get a CQE */ + /* Check that we don't get cancelled completion notifications */ + zassert_equal(0, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15))); + + /* Check that the SQE pool is empty by filling it all the way */ + for (int i = 0; i < SQE_POOL_SIZE; ++i) { + rtio_sqe_prep_nop(&sqe[i], (struct rtio_iodev *)&iodev_test_simple, NULL); + } + zassert_ok(rtio_sqe_copy_in(r, sqe, SQE_POOL_SIZE)); + + /* Since there's no good way to just reset the RTIO context, wait for the nops to finish */ + rtio_submit(r, SQE_POOL_SIZE); + for (int i = 0; i < SQE_POOL_SIZE; ++i) { + zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER)); + } + + /* Try cancelling the middle sqe in a chain */ + rtio_sqe_prep_nop(&sqe[0], (struct rtio_iodev *)&iodev_test_simple, NULL); + rtio_sqe_prep_nop(&sqe[1], (struct rtio_iodev *)&iodev_test_simple, NULL); + rtio_sqe_prep_nop(&sqe[2], (struct rtio_iodev *)&iodev_test_simple, NULL); + sqe[0].flags |= RTIO_SQE_CHAINED; + sqe[1].flags |= RTIO_SQE_CHAINED | RTIO_SQE_CANCELED; + + /* Copy in the first non cancelled sqe */ + rtio_sqe_copy_in_get_handles(r, sqe, &handle, 3); + rtio_submit(r, 1); + + /* Check that we get one completion no cancellation notifications */ + zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15))); + + /* Check that we get no more completions for the cancelled submissions */ zassert_equal(0, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(15))); /* Check that the SQE pool is empty by filling it all the way */ @@ -488,7 +510,7 @@ static inline void test_rtio_simple_multishot_(struct rtio *r, int idx) TC_PRINT("Waiting for next cqe\n"); zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER)); - zassert_equal(1, cqe.result, "Result should be ok but got %d", cqe.result); + zassert_ok(cqe.result, "Result should be ok but got %d", cqe.result); zassert_equal_ptr(cqe.userdata, mempool_data, "Expected userdata back"); rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len); rtio_release_buffer(r, buffer, buffer_len); @@ -538,9 +560,7 @@ void test_rtio_transaction_(struct rtio *r) sqe = rtio_sqe_acquire(r); zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, NULL, - &userdata[0]); - + rtio_sqe_prep_nop(sqe, NULL, &userdata[0]); sqe = rtio_sqe_acquire(r); zassert_not_null(sqe, "Expected a valid sqe"); @@ -632,6 +652,86 @@ ZTEST(rtio_api, test_rtio_throughput) _test_rtio_throughput(&r_throughput); } +RTIO_DEFINE(r_callback_chaining, SQE_POOL_SIZE, CQE_POOL_SIZE); +RTIO_IODEV_TEST_DEFINE(iodev_test_callback_chaining0); + +/** + * Callback for testing with + */ +void rtio_callback_chaining_cb(struct rtio *r, const struct rtio_sqe *sqe, void *arg0) +{ + TC_PRINT("chaining callback with userdata %p\n", arg0); +} + +/** + * @brief Test callback chaining requests + * + * Ensures that we can setup an RTIO context, enqueue a transaction of requests, + * receive completion events, and catch a callback at the end in the correct + * order + */ +void test_rtio_callback_chaining_(struct rtio *r) +{ + + int res; + int32_t userdata[4] = {0, 1, 2, 3}; + int32_t ordering[4] = { -1, -1, -1, -1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + uintptr_t cq_count = atomic_get(&r->cq_count); + + rtio_iodev_test_init(&iodev_test_callback_chaining0); + + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_callback(sqe, &rtio_callback_chaining_cb, sqe, &userdata[0]); + sqe->flags |= RTIO_SQE_CHAINED; + + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_callback_chaining0, &userdata[1]); + sqe->flags |= RTIO_SQE_TRANSACTION; + + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_callback_chaining0, &userdata[2]); + sqe->flags |= RTIO_SQE_CHAINED; + + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_callback(sqe, &rtio_callback_chaining_cb, sqe, &userdata[3]); + + TC_PRINT("submitting\n"); + res = rtio_submit(r, 4); + TC_PRINT("checking cq, completions available, count at start %lu, current count %lu\n", + cq_count, atomic_get(&r->cq_count)); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(atomic_get(&r->cq_count) - cq_count, 4, "Should have 4 pending completions"); + + for (int i = 0; i < 4; i++) { + TC_PRINT("consume %d\n", i); + cqe = rtio_cqe_consume(r); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + + int32_t idx = *(int32_t *)cqe->userdata; + + TC_PRINT("userdata is %p, value %d\n", cqe->userdata, idx); + ordering[idx] = i; + + rtio_cqe_release(r, cqe); + } + + for (int i = 0; i < 4; i++) { + zassert_equal(ordering[i], i, + "Execpted ordering of completions to match submissions"); + } +} + +ZTEST(rtio_api, test_rtio_callback_chaining) +{ + test_rtio_callback_chaining_(&r_callback_chaining); +} static void *rtio_api_setup(void) {